java - Spark Streaming batch processing causes data drop -


from understanding of spark streaming, create spark entry point, continuous udp data, using:

sparkconf conf = new sparkconf().setmaster("local[2]").setappname("networkwordcount"); javastreamingcontext jssc = new javastreamingcontext(conf, new duration(10000)); javareceiverinputdstream<string> lines = jssc.receiverstream(new customreceiver(8060)); 

now, when process input stream using,

javadstream hash=lines.flatmap(<my-code>) javapairdstream tuple= hash.maptopair(<my-code>) javapairdstream output= tuple.reducebykey(<my-code>) output.foreachrdd(                 new function2<javapairrdd<string,arraylist<string>>,time,void>(){                     @override                     public void call(                             javapairrdd<string, arraylist<string>> arg0,                             time arg1) throws exception {                         // todo auto-generated method stub                         new asyncrddactions(arg0.rdd(), null);                         arg0.foreachpartition(                                 new voidfunction<iterator<tuple2<string,arraylist<string>>>>(){                                      @override                                     public void call(                                             iterator<tuple2<string, arraylist<string>>> arg0)                                             throws exception {                                          // todo auto-generated method stub                                         graphdatabaseservice graphdb = new graphdatabasefactory().newembeddeddatabasebuilder("/dev/shm/advertisement/data/")                                                 .setconfig("remote_shell_enabled", "true")                                                 .newgraphdatabase();                                              try (transaction tx = graphdb.begintx()) {                                             while (arg0.hasnext()) {                                                 tuple2 < string, arraylist < string >> tuple = arg0.next();                                                 node hmac=neo4joperations.gethmacfromvalue(graphdb, tuple._1);                                                 boolean oldhmac=false;                                                 if (hmac!= null){                                                     system.out.println("alread in database:" + tuple._1);                                                     oldhmac=true;                                                 }                                                 else                                                     hmac=neo4joperations.createhmac(graphdb, tuple._1);                                                  arraylist<string> zipcodes=tuple._2;                                                 for(string zipcode : zipcodes){                                                     node zipcode=neo4joperations.getzipcodefromvalue(graphdb, zipcode);                                                     if(zipcode!=null){                                                         system.out.println("already in database:" + zipcode);                                                         if(oldhmac==true && neo4joperations.getrelationshipbetween(hmac, zipcode)!=null)                                                             neo4joperations.updatetocurrenttime(hmac, zipcode);                                                         else                                                             neo4joperations.travelto(hmac, zipcode);                                                     }                                                     else{                                                         zipcode=neo4joperations.createzipcode(graphdb, zipcode);                                                         neo4joperations.travelto(hmac, zipcode);                                                     }                                                 }                                             }                                             tx.success();                                         }                                         graphdb.shutdown();                                     }                                        });                         return null;                     }                 }); 

in section of output.foreachrdd push output database(neo4j). understand time consuming process.

my doubt is, in whole process, see not data being pushed database data dropping. i'm unable understand exact reason this, best guess is, when i'm pushing data database spark stops receiving data , making batch it. so, solution push asynchronously. there standard way achieve this?


Comments