scala - Spark streaming DStream RDD to get file name -


spark streaming textfilestream , filestream can monitor directory , process new files in dstream rdd.

how file names being processed dstream rdd @ particular interval?

filestream produces unionrdd of newhadooprdds. part newhadooprdds created sc.newapihadoopfile names set paths.

here's example of can knowledge:

def namedtextfilestream(ssc: streamingcontext, directory: string): dstream[string] =   ssc.filestream[longwritable, text, textinputformat](directory)     .transform( rdd =>       new unionrdd(rdd.context,         rdd.dependencies.map( dep =>           dep.rdd.asinstanceof[rdd[(longwritable, text)]].map(_._2.tostring).setname(dep.rdd.name)         )       )     )  def transformbyfile[u: classtag](unionrdd: rdd[string],                                  transformfunc: string => rdd[string] => rdd[u]): rdd[u] = {   new unionrdd(unionrdd.context,     unionrdd.dependencies.map{ dep =>       if (dep.rdd.isempty) none       else {         val filename = dep.rdd.name         some(           transformfunc(filename)(dep.rdd.asinstanceof[rdd[string]])             .setname(filename)         )       }     }.flatten   ) }  def main(args: array[string]) = {   val conf = new sparkconf()     .setappname("process file")     .setmaster("local[2]")    val ssc = new streamingcontext(conf, seconds(30))    val dstream = namestextfilestream(ssc, "/some/directory")    def byfiletransformer(filename: string)(rdd: rdd[string]): rdd[(string, string)] =     rdd.map(line => (filename, line))    val transformed = dstream.     transform(rdd => transformbyfile(rdd, byfiletransformer))    // stuff transformed    ssc.start()   ssc.awaittermination() } 

Comments

Popular posts from this blog

php - failed to open stream: HTTP request failed! HTTP/1.0 400 Bad Request -

java - How to filter a backspace keyboard input -

java - Show Soft Keyboard when EditText Appears -