博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ P2P模型 观察者消费
阅读量:2384 次
发布时间:2019-05-10

本文共 5241 字,大约阅读时间需要 17 分钟。

生餐者:

package clc.active.listener;import org.apache.activemq.ActiveMQConnectionFactory;import org.testng.annotations.Test;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import java.util.Random;/** * ClassName: ObjectProducer
* Description:
* date: 2019/1/15 3:25 PM
* * @author chengluchao * @since JDK 1.8 */public class ObjectProducer { @Test public void sendMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616"); connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-listener"); producer = session.createProducer(destination); connection.start(); Random r = new Random(); for (int i = 0; i < 100; i++) { Integer data = i; message = session.createObjectMessage(data); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { // 回收资源 //消息发送者 if (producer != null) { try { producer.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //会话对象 if (session != null) { try { session.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //连接对象 if (connection != null) { try { connection.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } } }}

消费者:

package clc.active.listener;import org.apache.activemq.ActiveMQConnectionFactory;import org.testng.annotations.Test;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;import java.util.Random;/** * ClassName: ConsumerListener
* Description:
* date: 2019/1/15 3:25 PM
* * @author chengluchao * @since JDK 1.8 */public class ConsumerListener { @Test public void consumMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//客户端确认 destination = session.createQueue("test-listener"); consumer = session.createConsumer(destination); //注册监听器,注册成功后,队列中的消息变化,会自动触发监听器代码 consumer.setMessageListener(new MessageListener() { /* 监听器一旦注册,永久有效 永久 - consumer线程不关闭 处理消息的方式:只要有消息未处理,自动调用onMessage方法,处理消息 监听器可以注册若干。注册多个监听器,相当于集群 ActiveMQ自动的循环调用多个监听器,处理队列中的消息,并实现处理 处理消息的方法,就是监听方法 */ @Override public void onMessage(Message message) { try { //acknowledge方法,就是确认方法,代表consumer已经收到消息,确认后,MQ可以删除对应的消息 message.acknowledge(); ObjectMessage om = (ObjectMessage) message; Object data = om.getObject(); System.out.println(data); } catch (JMSException e) { e.getErrorCode(); } } }); //阻塞当前代码,保证listener代码结束,如果代码结束了,监听器自动关闭 System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { // 回收资源 if (consumer != null) { try { consumer.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //会话对象 if (session != null) { try { session.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //连接对象 if (connection != null) { try { connection.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } } }}

 

posted @
2019-01-15 17:00 阅读(
...) 评论(
...)

转载地址:http://jgcab.baihongyu.com/

你可能感兴趣的文章
缺前端是假的,缺优秀前端是真的
查看>>
前端入门那么容易,工作很难找吗?
查看>>
Web前端很难学?html、css t、JavaScrip知识架构图分享
查看>>
常见的前端开发:Javascript 面试题及回答策略
查看>>
web前端开发学习推荐这5本书
查看>>
Windows资源管理器相关信息获取
查看>>
windows资源管理器及ie监听
查看>>
No module named 'Crypto'
查看>>
常用openstack的镜像下载及密码
查看>>
详解python中的浅拷贝和深拷贝
查看>>
详解python中闭包和装饰器
查看>>
修改openstack云主机的IP地址
查看>>
ubuntu系统的定制裁剪(适用于嵌入式瘦客户端)
查看>>
嵌入式之系统移植详解(linux)
查看>>
openstack之 glance_image和instances存储目录解析
查看>>
centos7(三节点)搭建ceph环境
查看>>
将linux(ubuntu)安装到U盘下面--便携式ubuntu和使用dd制作U盘安装工具
查看>>
linux之强大的find命令
查看>>
python使用变量操作mysql语句
查看>>
linux bridge 网桥详解
查看>>