scala - Spark streaming DStream RDD to get file name -
spark streaming textfilestream
, filestream
can monitor directory , process new files in dstream rdd.
how file names being processed dstream rdd @ particular interval?
filestream
produces unionrdd
of newhadooprdd
s. part newhadooprdd
s created sc.newapihadoopfile
name
s set paths.
here's example of can knowledge:
def namedtextfilestream(ssc: streamingcontext, directory: string): dstream[string] = ssc.filestream[longwritable, text, textinputformat](directory) .transform( rdd => new unionrdd(rdd.context, rdd.dependencies.map( dep => dep.rdd.asinstanceof[rdd[(longwritable, text)]].map(_._2.tostring).setname(dep.rdd.name) ) ) ) def transformbyfile[u: classtag](unionrdd: rdd[string], transformfunc: string => rdd[string] => rdd[u]): rdd[u] = { new unionrdd(unionrdd.context, unionrdd.dependencies.map{ dep => if (dep.rdd.isempty) none else { val filename = dep.rdd.name some( transformfunc(filename)(dep.rdd.asinstanceof[rdd[string]]) .setname(filename) ) } }.flatten ) } def main(args: array[string]) = { val conf = new sparkconf() .setappname("process file") .setmaster("local[2]") val ssc = new streamingcontext(conf, seconds(30)) val dstream = namestextfilestream(ssc, "/some/directory") def byfiletransformer(filename: string)(rdd: rdd[string]): rdd[(string, string)] = rdd.map(line => (filename, line)) val transformed = dstream. transform(rdd => transformbyfile(rdd, byfiletransformer)) // stuff transformed ssc.start() ssc.awaittermination() }
Comments
Post a Comment