apache storm, load balance, json -


i using kafka storm, kafka sends/emits json string storm, in storm, want distribute load couple of workers based on key/field in json. how that? in case, groupid field in json string.

for example, have json that:

{groupid: 1234, userid: 145, comments:"i want distribute group 1234  1 worker", size:50,type:"group json"} {groupid: 1235, userid: 134, comments:"i want distribute group 1234 worker", size:90,type:"group json"} {groupid: 1234, userid: 158, comments:"i want sent same worker group 1234", size:50,type:"group json"}    

i try use following codes:

      1.  topologybuilder builder = new topologybuilder();       2.  builder.setspout(spoutname, kafkaspout, 1);       3.  builder.setbolt(mydistributedworker, new distributedbolt()).setfieldsgroup(spoutname,new fields("groupid"));  <---??? 

i wondering how put arguments in setfieldsgroup method in line 3. give me hint?

juhani

==testing using storm 0.9.4 ============

=============source codes==============

import java.util.list; import java.util.map; import java.util.concurrent.atomic.atomicinteger;  import storm.kafka.kafkaspout; import storm.kafka.spoutconfig; import storm.kafka.stringscheme; import storm.kafka.zkhosts; import backtype.storm.config; import backtype.storm.localcluster; import backtype.storm.spout.schemeasmultischeme; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.topology.topologybuilder; import backtype.storm.topology.base.baserichbolt; import backtype.storm.tuple.fields; import backtype.storm.tuple.tuple; import backtype.storm.tuple.values;   public class kafkaboltmain {    private static final string spoutname="topicspout";     private static final string analysisbolt = "analysisworker";    private static final string clientid = "storm";    private static final string topologyname = "localtopology";      private static class appanalysisbolt extends baserichbolt {        private static final long serialversionuid = -6885792881303198646l;         private outputcollector _collector;        private long groupid=-1l;        private string log="test";         public void prepare(map conf, topologycontext context, outputcollector collector) {            _collector = collector;        }         public void execute(tuple tuple) {            list<object> objs = tuple.getvalues();            int i=0;            for(object obj:objs){                system.out.println(""+i+"th object's value is:"+obj.tostring());                i++;            }  //         _collector.emit(new values(groupid,log));            _collector.ack(tuple);        }          public void declareoutputfields(outputfieldsdeclarer declarer) {             declarer.declare(new fields("groupid","log"));         }    }      public static void main(string[] args){        string zookeepers = null;        string topicname = null;        if(args.length == 2 ){            zookeepers = args[0];            topicname = args[1];         }else if(args.length == 1 && args[0].equalsignorecase("help")){             system.out.println("xxxx");            system.exit(0);         }        else{            system.out.println("you need have 2 arguments: kafka zookeeper:port , topic name");            system.out.println("xxxx");            system.exit(-1);         }                 spoutconfig spoutconfig = new spoutconfig(new zkhosts(zookeepers),                 topicname,                 "",// zookeeper root path offset storing                 clientid);         spoutconfig.scheme = new schemeasmultischeme(new stringscheme());         kafkaspout kafkaspout = new kafkaspout(spoutconfig);          topologybuilder builder = new topologybuilder();         builder.setspout(spoutname, kafkaspout, 1);         builder.setbolt(analysisbolt, new appanalysisbolt(),2)             .fieldsgrouping(spoutname,new fields("groupid"));          //configuration         config conf = new config();         conf.setdebug(false);         //topology run         conf.put(config.topology_max_spout_pending, 1);         localcluster cluster = new localcluster();         cluster.submittopology(topologyname, conf, builder.createtopology());     } } 

================================================== when start submit topology(local cluster), gives following error:

11658 [syncthread:0] info  org.apache.storm.zookeeper.server.zookeeperserver - established session 0x14d097d338c0009 negotiated timeout 20000 client /127.0.0.1:34656 11658 [main-sendthread(localhost:2000)] info  org.apache.storm.zookeeper.clientcnxn - session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x14d097d338c0009, negotiated timeout = 20000 11659 [main-eventthread] info  org.apache.storm.curator.framework.state.connectionstatemanager - state change: connected 12670 [main] info  backtype.storm.daemon.supervisor - starting supervisor id ccc57de0-29ff-4cb4-89de-fea1ea9b6e28 @ host storm-virtualbox 12794 [main] warn  backtype.storm.daemon.nimbus - topology submission exception. (topology name='localtopology') #<invalidtopologyexception invalidtopologyexception(msg:component: [analysisworker] subscribes stream: [default] of component [topicspout] non-existent fields: #{"groupid"})> 12800 [main] error org.apache.storm.zookeeper.server.nioservercnxnfactory - thread thread[main,5,main] died backtype.storm.generated.invalidtopologyexception: null         @ backtype.storm.daemon.common$validate_structure_bang_.invoke(common.clj:178) ~[storm-core-0.9.4.jar:0.9.4]         @ backtype.storm.daemon.common$system_topology_bang_.invoke(common.clj:307) ~[storm-core-0.9.4.jar:0.9.4]         @ backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submittopologywithopts(nimbus.clj:948) ~[storm-core-0.9.4.jar:0.9.4]         @ backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submittopology(nimbus.clj:966) ~[storm-core-0.9.4.jar:0.9.4]         @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) ~[na:1.7.0_80]         @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) ~[na:1.7.0_80]         @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) ~[na:1.7.0_80]         @ java.lang.reflect.method.invoke(method.java:606) ~[na:1.7.0_80]         @ clojure.lang.reflector.invokematchingmethod(reflector.java:93) ~[clojure-1.5.1.jar:na]         @ clojure.lang.reflector.invokeinstancemethod(reflector.java:28) ~[clojure-1.5.1.jar:na]         @ backtype.storm.testing$submit_local_topology.invoke(testing.clj:264) ~[storm-core-0.9.4.jar:0.9.4]         @ backtype.storm.localcluster$_submittopology.invoke(localcluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]         @ backtype.storm.localcluster.submittopology(unknown source) ~[storm-core-0.9.4.jar:0.9.4]         @ com.callstats.stream.analyzer.kafkaboltmain.main(kafkaboltmain.java:94) ~[streamanalyzer-1.0-snapshot-jar-with-dependencies.jar:na] 

i'm not sure version of storm using, of 0.9.4, requirement can implemented follows.

builder.setbolt(mydistributedworker, new distributedbolt()).fieldsgrouping(spoutname, new fields("groupid")); 

in prepare method of distributedbolt,

public void declareoutputfields(outputfieldsdeclarer declarer) {     declarer.declare(new fields("groupid", "log")); } 

somewhere in execute method of it, call

collector.emit(new values(groupid, log)); 

then tuples have same groupid delivered same instance of next bolt.


Comments

Popular posts from this blog

php - failed to open stream: HTTP request failed! HTTP/1.0 400 Bad Request -

java - How to filter a backspace keyboard input -

java - Show Soft Keyboard when EditText Appears -