scala - How to compare every element in the RDD with every other element in the RDD ? -


i'm trying perform k nearest neighbor search using spark.

i have rdd[seq[double]] , i'm planing return rdd[(seq[double],seq[seq[double]])] actual row , list of neighbors

val out = data.map(row => {     val neighbours = data.top(num = 3)(new ordering[seq[double]] {       override def compare(a:seq[double],b:seq[double]) = {         euclideandistance(a,row).compare(euclideandistance(b,row))*(-1)       }     })   (row,neighbours.toseq) }) 

and gives following error on spark submit

15/04/29 21:15:39 warn tasksetmanager: lost task 0.0 in stage 1.0 (tid 2, 192.168.1.7): org.apache.spark.sparkexception: rdd transformations , actions can invoked driver, not inside of other transformations; example, rdd1.map(x => rdd2.values.count() * x) invalid because values transformation , count action cannot performed inside of rdd1.map transformation. more information, see spark-5063. 

i understand nesting rdd not possible how perform such operations can compare every element in rdd every other element in rdd

something should it.

import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf import org.apache.spark.rdd.rdd  val conf = new sparkconf().setappname("spark-scratch").setmaster("local") val sco= new sparkcontext(conf)  // k number of nearest neighbors required  val k = 3  // generate 5 rows of two-dimensional coordinates val rows = list.fill(5)(list.fill(2)(math.random)) val datardd = sco.parallelize(rows, 1)  // no need sqrt we're comparing them def euclidean(a:list[double], b:list[double]) =   (a zip b) map {case (x:double, y:double) => (x-y)*(x-y)} sum  // pairs val pairs = datardd.cartesian(datardd)  // case class keep things bit neater // neighbor, , distance current point case class entry(neighbor: list[double], dist:double)  // map second element element , distance first val pairswithdist = pairs.map {case (x, y) => (x, entry(y, euclidean(x,y)))}  // merge row of pairswithdist resultrow point def mergeone(u: list[entry], v:entry) = (v::u).sortby{_.dist}.take(k)  // merge 2 results different partitions def mergelist(u: list[entry], v:list[entry]) = (u:::v).sortby{_.dist}.take(k)  val nearestneighbors = pairswithdist                       .aggregatebykey(list[entry]())(mergeone, mergelist) 

Comments

Popular posts from this blog

php - failed to open stream: HTTP request failed! HTTP/1.0 400 Bad Request -

java - How to filter a backspace keyboard input -

java - Show Soft Keyboard when EditText Appears -