Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python -


i want share particular apache spark python solution because documentation quite poor.

i wanted calculate average value of k/v pairs (stored in pairwise rdd), key. here sample data looks like:

>>> rdd1.take(10) # show small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.249999999999996), (u'2013-10-13', 10.693069306930692)] 

now following code sequence less optimal way it, work. doing before figured out better solution. it's not terrible -- you'll see in answer section -- there more concise, efficient way.

>>> import operator >>> countsbykey = sc.broadcast(rdd1.countbykey()) # sample output of countsbykey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} >>> rdd1 = rdd1.reducebykey(operator.add) # calculate numerators (i.e. sums). >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsbykey.value[x[0]])) # divide each sum it's denominator (i.e. count) >>> print(rdd1.collect())   [(u'2013-10-09', 11.235365503035176),    (u'2013-10-07', 23.39500642456595),    ... snip ...   ] 

now better way use rdd.aggregatebykey() method. because method poorly documented in apache spark python documentation (which why i'm writing this), until had been using above code sequence. again, it's less efficient, so don't way unless need to.

here's how same using rdd.aggregatebykey() method (recommended) ...

by key, simultaneously calculate sum (numerator average want compute), , count (denominator average want compute).

>>> atuple = (0,0) # of python3, can't pass literal sequence function. >>> rdd1 = rdd1.aggregatebykey(atuple, lambda a,b: (a[0] + b,    a[1] + 1),                                        lambda a,b: (a[0] + b[0], a[1] + b[1])) 

where following true meaning of each 'a' , 'b' pair above (just can visualize what's happening):

   first lambda expression within-partition reduction step::    a: tuple holds: (runningsum, runningcount).    b: scalar holds next value     second lambda expression cross-partition reduction step::    a: tuple holds: (runningsum, runningcount).    b: tuple holds: (nextpartitionssum, nextpartitionscount). 

finally, calculate average each key, , collect results.

>>> finalresult = rdd1.mapvalues(lambda v: v[0]/v[1]).collect() >>> print(finalresult)       [(u'2013-09-09', 11.235365503035176),        (u'2013-09-01', 23.39500642456595),        (u'2013-09-03', 13.53240060820617),        (u'2013-09-05', 13.141148418977687),    ... snip ...   ] 

i hope question , illustrative answer aggregatebykey() help. if did, don't forget upvote question, too. thank you. (◠﹏◠)


Comments