scala - How to compare every element in the RDD with every other element in the RDD ? -
i'm trying perform k nearest neighbor search using spark.
i have rdd[seq[double]] , i'm planing return rdd[(seq[double],seq[seq[double]])] actual row , list of neighbors
val out = data.map(row => { val neighbours = data.top(num = 3)(new ordering[seq[double]] { override def compare(a:seq[double],b:seq[double]) = { euclideandistance(a,row).compare(euclideandistance(b,row))*(-1) } }) (row,neighbours.toseq) })
and gives following error on spark submit
15/04/29 21:15:39 warn tasksetmanager: lost task 0.0 in stage 1.0 (tid 2, 192.168.1.7): org.apache.spark.sparkexception: rdd transformations , actions can invoked driver, not inside of other transformations; example, rdd1.map(x => rdd2.values.count() * x) invalid because values transformation , count action cannot performed inside of rdd1.map transformation. more information, see spark-5063.
i understand nesting rdd not possible how perform such operations can compare every element in rdd every other element in rdd
something should it.
import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf import org.apache.spark.rdd.rdd val conf = new sparkconf().setappname("spark-scratch").setmaster("local") val sco= new sparkcontext(conf) // k number of nearest neighbors required val k = 3 // generate 5 rows of two-dimensional coordinates val rows = list.fill(5)(list.fill(2)(math.random)) val datardd = sco.parallelize(rows, 1) // no need sqrt we're comparing them def euclidean(a:list[double], b:list[double]) = (a zip b) map {case (x:double, y:double) => (x-y)*(x-y)} sum // pairs val pairs = datardd.cartesian(datardd) // case class keep things bit neater // neighbor, , distance current point case class entry(neighbor: list[double], dist:double) // map second element element , distance first val pairswithdist = pairs.map {case (x, y) => (x, entry(y, euclidean(x,y)))} // merge row of pairswithdist resultrow point def mergeone(u: list[entry], v:entry) = (v::u).sortby{_.dist}.take(k) // merge 2 results different partitions def mergelist(u: list[entry], v:list[entry]) = (u:::v).sortby{_.dist}.take(k) val nearestneighbors = pairswithdist .aggregatebykey(list[entry]())(mergeone, mergelist)
Comments
Post a Comment