分享好友 健康常识首页 频道列表

activemq消息持久化方式(activemq和kafka区别)

运动健身  2023-03-23 02:540

activemq消息持久化方式(activemq和kafka区别)

队列模式(点对点模式,P2P)特点:

1、客户端包括生产者和消费者;

2、队列中的消息只能被一个消费者消费;

3、消费者可以随时消费队列中的消息;

队列模式和主题模式的区别:

1、提前订阅,队列模式:消费者不需要提前订阅也可以消费消息;主题模式:只有提前进行订阅的消费者才能成功消费消息;

2、多个消费者分配消息:队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费;主题模式:每个订阅者都可以消费主题模式中的每一条消息;

案例代码:

生产者:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂获取connection连接 并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建会话session 需要两个参数,第一个事务,第二个签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地(选择是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //通过使用消息生产者messageProducer生产3条消息发送到队列中 for (int i = 1; i <= 7; i++) { //创建消息 一个字符串消息 TextMessage textMessage = session.createTextMessage("msg---->" + i); //通过messageProducer 发布消息 messageProducer.send(textMessage); } //关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送到MQ成功"); }}

消费者1:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME="queue01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂获取connection连接 并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建会话session 需要两个参数,第一个事务,第二个签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地(选择是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while (true){ //从队列中获取消息 receive未设置最大时间 是阻塞的, TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (textMessage !=null){ System.out.println("消费者接受到消息---->"+textMessage.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); }}

输出:

INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->2消费者接受到消息---->msg---->4消费者接受到消息---->msg---->6

消费者2:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class ActiveMQConsumerListener { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂获取connection连接 并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建会话session 需要两个参数,第一个事务,第二个签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地(选择是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //通过监听的机制消费消息 messageConsumer.setMessageListener((message) -> { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者接受到消息---->" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //不关闭控制台 如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题 System.in.read(); messageConsumer.close(); session.close(); connection.close(); }}

输出:

INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->1消费者接受到消息---->msg---->3消费者接受到消息---->msg---->5消费者接受到消息---->msg---->7

Number Of Consumers:表示消费者数量;

Number Of Pending Messages:等待消费的消息,这个是当前未出队列的数量;

Messages Enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);

Messages Dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);

持久化案例代码:

ActiveMQ持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除。

生产者:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue02"; public static void main(String[] args) throws JMSException { //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂获取connection连接 并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建会话session 需要两个参数,第一个事务,第二个签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地(选择是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); // 消息持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //通过使用消息生产者messageProducer生产3条消息发送到队列中 for (int i = 1; i <= 7; i++) { //创建消息 一个字符串消息 TextMessage textMessage = session.createTextMessage("msg---->" + i); //通过messageProducer 发布消息 messageProducer.send(textMessage); } //关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送到MQ成功"); }}

代码:
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

消费者:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME="queue02"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂获取connection连接 并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID("client-queue02-01"); connection.start(); //创建会话session 需要两个参数,第一个事务,第二个签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地(选择是队列还是主题) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while (true){ //从队列中获取消息 receive未设置最大时间 是阻塞的, TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (textMessage !=null){ System.out.println("消费者接受到消息---->"+textMessage.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); }}

测试:

1、先运行生产者,ActiveMQProducer

2、查看数据库:

3、在运行消费者,ActiveMQConsumer,输出:

INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->1消费者接受到消息---->msg---->2消费者接受到消息---->msg---->3消费者接受到消息---->msg---->4消费者接受到消息---->msg---->5消费者接受到消息---->msg---->6消费者接受到消息---->msg---->7

4、再次查看数据库,消息已删除。

查看更多关于【运动健身】的文章

展开全文
相关推荐
反对 0
举报 0
评论 0
图文资讯
热门推荐
优选好物
更多推荐文章
更多推荐