ActiveMQ (二)
                 今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。  上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解...
            
            
今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。
上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?
这就需要使用ActiveMQ监听器来监听队列,持续消费消息。
一、ActiveMQ监听器 1.1 配置步骤说明- 创建一个监听器对象。
- 修改消费者代码,加载监听器
package com.xkt.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * @author lzx
 *
 */
public class MyListener implements MessageListener {
	@Override
	public void onMessage(Message message) {
		if (null != message) {
			if (message instanceof TextMessage) {
				try {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("监听到的消息是 " + content);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
- 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.xkt.listener.MyListener;
/**
 * @author lzx
 *
 */
public class Myconsumer {
	private ConnectionFactory factory;
	private Connection connection;
	private Session session;
	private Destination destination;
	private MessageConsumer consumer;
	public void receiveFromMq() {
		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp0.7187782661775515://0.14187442802194083192.168.109.3:61616");
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
			destination = session.createQueue("queue");
			// 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
			consumer = session.createConsumer(destination);
			// 7.加载监听器
			consumer.setMessageListener(new MyListener());
			// 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息
			System.in.read();
			// 在java项目中,可以通过IO阻塞程序,持续加载监听器
			// 在web项目中,可以通过配置文件,直接加载监听器。
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("读取失败");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}
- 多次运行生产者,发送多条消息到队列中
| 图示 | 
|---|
|  | 
- 运行消费者。观察结果
| 图示 | 
|---|
|  | 
- 查看ActiveMQ管理控制界面,所有消息都被消费了!
| 图示 | 
|---|
|  | 
在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?
二、Topic模式实现 2.1 配置步骤说明- 搭建ActiveMQ消息服务器。(略)
- 创建主题订阅者。
- 创建主题发布者。
- 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * @author lzx
 *
 */
public class MySubscriber implements Runnable {
	/**
	 * 多线程的线程安全问题 解决方案:
	 * 
	 * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐
	 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis
	 */
	private TopicConnectionFactory factory;
	private TopicConnection connection;
	private TopicSession session;
	private Topic topic;
	private TopicSubscriber subscriber;
	private Message message;
	@Override
	public void run() {
		try {
			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp0.7187782661775515://0.14187442802194083192.168.109.3:61616");
			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();
			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");
			// 5、创建订阅者
			subscriber = session.createSubscriber(topic);
			// 6、订阅
			while (true) {
				message = subscriber.receive();
				if (null != message) {
					if (message instanceof TextMessage) {
						TextMessage tMsg = (TextMessage) message;
						String content = tMsg.getText();
						System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
					}
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
package com.xkt.test;
import com.xkt.subscriber.MySubscriber;
/**
 * @author lzx
 *
 */
public class TestMQ {
	public static void main(String[] args) {
		MySubscriber sub = new MySubscriber();
		Thread t1 = new Thread(sub);
		Thread t2 = new Thread(sub);
		t1.start();
		t2.start();
	}
}
- 查看AcitveMQ管理界面
|图示   |
| :------------: |
|   | |
package com.xkt.publish;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * @author lzx
 *
 */
public class MyPublisher {
	private TopicConnectionFactory factory;
	private TopicConnection connection;
	private TopicSession session;
	private Topic topic;
	private TopicPublisher publisher;
	private Message message;
	public void publish(String msg) {
		try {
			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp0.7187782661775515://0.14187442802194083192.168.109.3:61616");
			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();
			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");
			// 5、创建发布者
			publisher = session.createPublisher(topic);
			// 6、创建消息对象
			message = session.createTextMessage(msg);
			// 7、发布消息
			publisher.publish(message);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (null != publisher) {
				try {
					publisher.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != connection) {
				try {
					connection.stop();
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}
package com.xkt.test;
import org.junit.Test;
import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;
/**
 * @author lzx
 *
 */
public class TestMQ {
	public static void main(String[] args) {
		MySubscriber sub = new MySubscriber();
		Thread t1 = new Thread(sub);
		Thread t2 = new Thread(sub);
		t1.start();
		t2.start();
	}
	@Test
	public void publish() {
		MyPublisher publisher = new MyPublisher();
		publisher.publish("hello,欢迎收听FM 89.9频道-交通频道");
	}
}
2.2.6 查看测试结果
2.3 Topic小结- Topic模式能够实现多个订阅者同时消费消息。
- Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
- 通常可以用来解决公共消息推送的相关业务。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!
 很赞哦! (1046)
            
            
            
            
         爱学记
                爱学记
                
 微信收款码
微信收款码 支付宝收款码
支付宝收款码