Spark sortby throwing exception -
i'm trying sort javapairrdd key.
configuration
spark version : 1.3.0 mode: local
can 1 code i'm doing wrong.
javapairrdd<string, hashmap<string, object>> countandsum = grupbydate .reducebykey(new function2<hashmap<string, object>, hashmap<string, object>, hashmap<string, object>>() { @override public hashmap<string, object> call( hashmap<string, object> v1, hashmap<string, object> v2) throws exception { long count = long.parselong(v1.get( sparktoolconstant.count) .tostring()) + long.parselong(v2 .get(sparktoolconstant.count) .tostring()); double sum = double.parsedouble(v1.get( sparktoolconstant.value) .tostring()) + double.parsedouble(v2 .get(sparktoolconstant.value) .tostring()); hashmap<string, object> summap = new hashmap<string, object>(); summap.put(sparktoolconstant.count, count); summap.put(sparktoolconstant.value, sum); return summap; } }); system.out.println("count before sorting : " + countandsum.count()); /** sort date */ javapairrdd<string, hashmap<string, object>> sortbydate = countandsum .sortbykey(new comparator<string>() { @override public int compare(string datestr1, string datestr2) { dateutil dateutil = new dateutil(); date date1 = dateutil.stringtodate( datestr1, dateformat); date date2 = dateutil.stringtodate( datestr2, dateformat); if (date2 == null && date1 == null) { return 0; } else if (date2 != null && date1 != null) { return date1.compareto(date2); } else if (date2 == null) { return 1; } else { return -1; } } });
getting error here
system.out.println("count after sorting : " + sortbydate.count());
stack trace when task submit in spark using spark-submit local mode
schedulerimpl:59 - cancelling stage 252 2015-04-29 14:37:19 info dagscheduler:59 - job 62 failed: count @ datavalidation.java:378, took 0.107696 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task serialization failed: java.lang.reflect.invocationtargetexception sun.reflect.nativemethodaccessorimpl.invoke0(native method) sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) java.lang.reflect.method.invoke(method.java:606) org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158) org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58) org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39) org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitmissingtasks(dagscheduler.scala:835) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply$mcvi$sp(dagscheduler.scala:1042) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) scala.option.foreach(option.scala:236) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1039) org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1038) scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) org.apache.spark.scheduler.dagscheduler.handletaskcompletion(dagscheduler.scala:1038) org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1390) org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354) org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1203) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1192) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1191) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1191) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitmissingtasks(dagscheduler.scala:847) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply$mcvi$sp(dagscheduler.scala:1042) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15$$anonfun$apply$1.apply(dagscheduler.scala:1039) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1039) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletaskcompletion$15.apply(dagscheduler.scala:1038) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.handletaskcompletion(dagscheduler.scala:1038) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1390) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)
blockquote
spark serialize function passed in reducebykey
, sorbykey
first , pass them executors. therefore, should guarantee functions serializable there
sparktoolconstant
& dateutil
in code seems reason causes error.
Comments
Post a Comment