Spark sortby throwing exception -


i'm trying sort javapairrdd key.

configuration

spark version : 1.3.0 mode: local

can 1 code i'm doing wrong.

 javapairrdd<string, hashmap<string, object>> countandsum = grupbydate                                         .reducebykey(new function2<hashmap<string, object>, hashmap<string, object>, hashmap<string, object>>() {                                             @override                                             public hashmap<string, object> call(                                                     hashmap<string, object> v1,                                                     hashmap<string, object> v2)                                                     throws exception {                                                 long count = long.parselong(v1.get(                                                         sparktoolconstant.count)                                                         .tostring())                                                         + long.parselong(v2                                                                 .get(sparktoolconstant.count)                                                                 .tostring());                                                 double sum = double.parsedouble(v1.get(                                                         sparktoolconstant.value)                                                         .tostring())                                                         + double.parsedouble(v2                                                                 .get(sparktoolconstant.value)                                                                 .tostring());                                                 hashmap<string, object> summap = new hashmap<string, object>();                                                 summap.put(sparktoolconstant.count,                                                         count);                                                 summap.put(sparktoolconstant.value, sum);                                                 return summap;                                             }                                         });   system.out.println("count before sorting : "                                         + countandsum.count());       /**     sort date       */                                 javapairrdd<string, hashmap<string, object>> sortbydate = countandsum                                         .sortbykey(new comparator<string>() {                                             @override                                             public int compare(string datestr1,                                                     string datestr2) {                                                 dateutil dateutil = new dateutil();                                                 date date1 = dateutil.stringtodate(                                                         datestr1, dateformat);                                                 date date2 = dateutil.stringtodate(                                                         datestr2, dateformat);                                                 if (date2 == null && date1 == null) {                                                     return 0;                                                 } else if (date2 != null                                                         && date1 != null) {                                                     return date1.compareto(date2);                                                 } else if (date2 == null) {                                                     return 1;                                                 } else {                                                     return -1;                                                 }                                             }                                         }); 

getting error here

                        system.out.println("count after sorting : "                                 + sortbydate.count()); 

stack trace when task submit in spark using spark-submit local mode

schedulerimpl:59 - cancelling stage 252 2015-04-29 14:37:19 info dagscheduler:59 - job 62 failed: count @ datavalidation.java:378, took 0.107696 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task serialization failed: java.lang.reflect.invocationtargetexception sun.reflect.nativemethodaccessorimpl.invoke0(native method) sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) java.lang.reflect.method.invoke(method.java:606) org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58) org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39) org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitmissingtasks(dagscheduler.scala:835) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply$mcvi$sp(dagscheduler.scala:1042) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) scala.option.foreach(option.scala:236) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1039) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1038) scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) org.apache.spark.scheduler.dagscheduler.handletaskcompletion(dagscheduler.scala:1038) org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1390) org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354) org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1203) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1192) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1191) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1191) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitmissingtasks(dagscheduler.scala:847) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply$mcvi$sp(dagscheduler.scala:1042) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1039) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1038) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.handletaskcompletion(dagscheduler.scala:1038) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1390) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)

blockquote

spark serialize function passed in reducebykey , sorbykey first , pass them executors. therefore, should guarantee functions serializable there

sparktoolconstant & dateutil in code seems reason causes error.


Comments

Popular posts from this blog

php - failed to open stream: HTTP request failed! HTTP/1.0 400 Bad Request -

java - How to filter a backspace keyboard input -

java - Show Soft Keyboard when EditText Appears -