apache storm, load balance, json -
i using kafka storm, kafka sends/emits json string storm, in storm, want distribute load couple of workers based on key/field in json. how that? in case, groupid field in json string.
for example, have json that:
{groupid: 1234, userid: 145, comments:"i want distribute group 1234 1 worker", size:50,type:"group json"} {groupid: 1235, userid: 134, comments:"i want distribute group 1234 worker", size:90,type:"group json"} {groupid: 1234, userid: 158, comments:"i want sent same worker group 1234", size:50,type:"group json"}
i try use following codes:
1. topologybuilder builder = new topologybuilder(); 2. builder.setspout(spoutname, kafkaspout, 1); 3. builder.setbolt(mydistributedworker, new distributedbolt()).setfieldsgroup(spoutname,new fields("groupid")); <---???
i wondering how put arguments in setfieldsgroup method in line 3. give me hint?
juhani
==testing using storm 0.9.4 ============
=============source codes==============
import java.util.list; import java.util.map; import java.util.concurrent.atomic.atomicinteger; import storm.kafka.kafkaspout; import storm.kafka.spoutconfig; import storm.kafka.stringscheme; import storm.kafka.zkhosts; import backtype.storm.config; import backtype.storm.localcluster; import backtype.storm.spout.schemeasmultischeme; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.topology.topologybuilder; import backtype.storm.topology.base.baserichbolt; import backtype.storm.tuple.fields; import backtype.storm.tuple.tuple; import backtype.storm.tuple.values; public class kafkaboltmain { private static final string spoutname="topicspout"; private static final string analysisbolt = "analysisworker"; private static final string clientid = "storm"; private static final string topologyname = "localtopology"; private static class appanalysisbolt extends baserichbolt { private static final long serialversionuid = -6885792881303198646l; private outputcollector _collector; private long groupid=-1l; private string log="test"; public void prepare(map conf, topologycontext context, outputcollector collector) { _collector = collector; } public void execute(tuple tuple) { list<object> objs = tuple.getvalues(); int i=0; for(object obj:objs){ system.out.println(""+i+"th object's value is:"+obj.tostring()); i++; } // _collector.emit(new values(groupid,log)); _collector.ack(tuple); } public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("groupid","log")); } } public static void main(string[] args){ string zookeepers = null; string topicname = null; if(args.length == 2 ){ zookeepers = args[0]; topicname = args[1]; }else if(args.length == 1 && args[0].equalsignorecase("help")){ system.out.println("xxxx"); system.exit(0); } else{ system.out.println("you need have 2 arguments: kafka zookeeper:port , topic name"); system.out.println("xxxx"); system.exit(-1); } spoutconfig spoutconfig = new spoutconfig(new zkhosts(zookeepers), topicname, "",// zookeeper root path offset storing clientid); spoutconfig.scheme = new schemeasmultischeme(new stringscheme()); kafkaspout kafkaspout = new kafkaspout(spoutconfig); topologybuilder builder = new topologybuilder(); builder.setspout(spoutname, kafkaspout, 1); builder.setbolt(analysisbolt, new appanalysisbolt(),2) .fieldsgrouping(spoutname,new fields("groupid")); //configuration config conf = new config(); conf.setdebug(false); //topology run conf.put(config.topology_max_spout_pending, 1); localcluster cluster = new localcluster(); cluster.submittopology(topologyname, conf, builder.createtopology()); } }
================================================== when start submit topology(local cluster), gives following error:
11658 [syncthread:0] info org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14d097d338c0009 negotiated timeout 20000 client /127.0.0.1:34656 11658 [main-sendthread(localhost:2000)] info org.apache.storm.zookeeper.clientcnxn - session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x14d097d338c0009, negotiated timeout = 20000 11659 [main-eventthread] info org.apache.storm.curator.framework.state.connectionstatemanager - state change: connected 12670 [main] info backtype.storm.daemon.supervisor - starting supervisor id ccc57de0-29ff-4cb4-89de-fea1ea9b6e28 @ host storm-virtualbox 12794 [main] warn backtype.storm.daemon.nimbus - topology submission exception. (topology name='localtopology') #<invalidtopologyexception invalidtopologyexception(msg:component: [analysisworker] subscribes stream: [default] of component [topicspout] non-existent fields: #{"groupid"})> 12800 [main] error org.apache.storm.zookeeper.server.nioservercnxnfactory - thread thread[main,5,main] died backtype.storm.generated.invalidtopologyexception: null @ backtype.storm.daemon.common$validate_structure_bang_.invoke(common.clj:178) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.daemon.common$system_topology_bang_.invoke(common.clj:307) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submittopologywithopts(nimbus.clj:948) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submittopology(nimbus.clj:966) ~[storm-core-0.9.4.jar:0.9.4] @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) ~[na:1.7.0_80] @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) ~[na:1.7.0_80] @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) ~[na:1.7.0_80] @ java.lang.reflect.method.invoke(method.java:606) ~[na:1.7.0_80] @ clojure.lang.reflector.invokematchingmethod(reflector.java:93) ~[clojure-1.5.1.jar:na] @ clojure.lang.reflector.invokeinstancemethod(reflector.java:28) ~[clojure-1.5.1.jar:na] @ backtype.storm.testing$submit_local_topology.invoke(testing.clj:264) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.localcluster$_submittopology.invoke(localcluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] @ backtype.storm.localcluster.submittopology(unknown source) ~[storm-core-0.9.4.jar:0.9.4] @ com.callstats.stream.analyzer.kafkaboltmain.main(kafkaboltmain.java:94) ~[streamanalyzer-1.0-snapshot-jar-with-dependencies.jar:na]
i'm not sure version of storm using, of 0.9.4, requirement can implemented follows.
builder.setbolt(mydistributedworker, new distributedbolt()).fieldsgrouping(spoutname, new fields("groupid"));
in prepare method of distributedbolt,
public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("groupid", "log")); }
somewhere in execute method of it, call
collector.emit(new values(groupid, log));
then tuples have same groupid delivered same instance of next bolt.
Comments
Post a Comment