java - JEE7 + WildFly (HornetQ) - Pause queue from application -
we using wildfly + hornetq our application server , jms message queue, , have requirement able pause/resume queues application. possible?
this can done using jmx or using hornetq core management api.
for purposes of example, wildfly 8.1.0.final used running standalone-full-ha profile.
required maven dependencies:
<dependency> <groupid>org.hornetq</groupid> <artifactid>hornetq-jms-client</artifactid> <version>2.4.1.final</version> </dependency> <dependency> <groupid>org.wildfly</groupid> <artifactid>wildfly-jmx</artifactid> <version>8.1.0.final</version> </dependency>
here test class demonstrating use of jmsqueuecontrol via jmx:
package test.jmx.hornetq; import org.hornetq.api.jms.management.jmsqueuecontrol; import javax.management.*; import javax.management.remote.jmxconnector; import javax.management.remote.jmxconnectorfactory; import javax.management.remote.jmxserviceurl; public class wildflyjmscontrol { public static void main(string[] args) { try { //get connection wildfly 8 mbean server on localhost string host = "localhost"; int port = 9990; // management-web port string urlstring = system.getproperty("jmx.service.url","service:jmx:http-remoting-jmx://" + host + ":" + port); jmxserviceurl serviceurl = new jmxserviceurl(urlstring); jmxconnector jmxconnector = jmxconnectorfactory.connect(serviceurl, null); mbeanserverconnection connection = jmxconnector.getmbeanserverconnection(); string queuename = "testqueue"; // use queue name here string mbeanobjectname = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=" + queuename; objectname objectname = objectname.getinstance(mbeanobjectname); jmsqueuecontrol jmsqueuecontrol = (jmsqueuecontrol) mbeanserverinvocationhandler.newproxyinstance(connection, objectname, jmsqueuecontrol.class, false); assert jmsqueuecontrol != null; long msgcount = jmsqueuecontrol.countmessages(null); system.out.println(mbeanobjectname + " message count: " + msgcount); jmsqueuecontrol.pause(); system.out.println("queue paused"); jmsqueuecontrol.resume(); system.out.println("queue resumed"); jmxconnector.close(); } catch (exception e) { e.printstacktrace(); } } }
to access hornetq management via jms use:
package test.jms.hornetq; import org.hornetq.api.core.transportconfiguration; import org.hornetq.api.core.client.*; import org.hornetq.api.core.management.managementhelper; import org.hornetq.core.remoting.impl.invm.invmconnectorfactory; public class hornetqservice { public void testpauseresumequeue() { // class needs run in same jvm wildfly server (i.e. not remote jvm) try { serverlocator locator = hornetqclient.createserverlocatorwithoutha(new transportconfiguration( invmconnectorfactory.class.getname())); clientsession session = locator.createsessionfactory().createsession(); session.start(); clientrequestor requester = new clientrequestor(session, "jms.queue.hornetq.management"); string queuename = "testqueue"; // use queue name here // queue message count clientmessage message = session.createmessage(false); managementhelper.putattribute(message, queuename, "messagecount"); clientmessage reply = requester.request(message); int count = (integer) managementhelper.getresult(reply); system.out.println("there " + count + " messages in examplequeue"); // pause queue message = session.createmessage(false); managementhelper.putoperationinvocation(message, queuename, "pause"); requester.request(message); // queue paused message = session.createmessage(false); managementhelper.putattribute(message, queuename, "paused"); reply = requester.request(message); object result = managementhelper.getresult(reply); system.out.println("result: " + result.getclass().getname() + " : " + result.tostring()); // resume queue message = session.createmessage(false); managementhelper.putoperationinvocation(message, queuename, "resume"); requester.request(message); // queue paused message = session.createmessage(false); managementhelper.putattribute(message, queuename, "paused"); reply = requester.request(message); object result2 = managementhelper.getresult(reply); system.out.println("result2: " + result2.getclass().getname() + " : " + result2.tostring()); requester.close(); session.close(); }catch (exception e){ system.out.println("error pausing queue" + e.getmessage()); } } }
Comments
Post a Comment