08 JMS.txt
UP 返回
对应视频文件夹:13消息中间件解决方案JMS
系统结构变动:
!!@@D:\MyJAR_Project\WordsCollection\refer\2022030513.png_865_493_1@@!!
1. 定义 (以下对应视频文件夹1)
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,JMS本身只定义了一系列的接口规范,用来访问消息收发系统,类似于 JDBC
JMS 定义了五种不同的消息正文格式:
· TextMessage--一个字符串对象
· MapMessage--一套名称-值对 和上面的用的最多
· ObjectMessage--一个序列化的 Java 对象
· BytesMessage--一个字节的数据流
· StreamMessage -- Java 原始值的数据流 类似于MapMessage,不过他的存取是有序的
消息的传递有两种类型:
!!@@D:\MyJAR_Project\WordsCollection\refer\202203021.png_577_442_1@@!!
2. ActiveMQ安装
官方网站下载:http://activemq.apache.org/
将apache-activemq-5.12.0-bin.tar.gz 上传至服务器(参照以前的方法)
进入ftp,put D:\EnvironmentDevs\apache-activemq-5.12.0-bin.tar.gz
解压 tar zxvf apache-activemq-5.12.0-bin.tar.gz
为apache-activemq-5.12.0目录赋权 chmod 777 apache-activemq-5.12.0
进入 apache-activemq-5.12.0\bin
赋与执行权限 chmod 755 activemq
linux 命令chmod 755的意思:
chmod是Linux下设置文件权限的命令,后面的数字表示不同用户或用户组的权限。一般是三个数字:
第一个数字表示文件所有者的权限
第二个数字表示与文件所有者同属一个用户组的其他用户的权限
第三个数字表示其它用户组的权限。
权限分为三种:读(r=4),写(w=2),执行(x=1) 。 综合起来还有可读可执行(rx=5=4+1)、可读可写(rw=6=4+2)、可读可写可执行(rwx=7=4+2+1)。
所以,chmod 755 设置用户的权限为:
1.文件所有者可读可写可执行 --7
2.与文件所有者同属一个用户组的其他用户可读可执行 --5
3.其它用户组可读可执行 --5
启动 ./activemq start
!!@@D:\MyJAR_Project\WordsCollection\refer\202203051.png_826_79_1@@!!
访问服务器对应的地址 http://192.168.25.130:8161/
!!@@D:\MyJAR_Project\WordsCollection\refer\202203052.png_860_491_0.5@@!!
点击Manage ActiveMQ broker,输入admin admin即可进入主页面。常用的就是Queues(点对点)和Topics(发布订阅),如果有消息是可以在下面看到列表的
!!@@D:\MyJAR_Project\WordsCollection\refer\2022030512.png_1037_380_0.5@@!!
列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
3.入门demo (以下对应视频文件夹2)
创建maven工程jmsDemo
创建类和main方法(队列生产者)
public class QueueProducer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
//• AUTO_ACKNOWLEDGE = 1 自动确认
//• CLIENT_ACKNOWLEDGE = 2 客户端手动确认
//• DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
//• SESSION_TRANSACTED = 0 事务提交并确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
运行上述代码以后可以看到queue列表中多了一条记录
创建队列消费者
public class QueueConsumer {
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象 队列名要和生产者保持一致
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息:" + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入 因为是控制台程序,这一句是为了等待控制台输入产生等待,不然直接执行到后面,监听还没产生就结束了,无法达到效果
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
}
执行消费者可以看到消息的消费信息。如果停掉消费者,再执行两次生产者产生消息,再执行一次消费者可以看到一下消费两条信息。对应结果可以看列表
如果启动多个消费者,只会有一个消费到消息
主题生产者和消费者只需要将第五步改成创建主题就行了:Topic topic = session.createTopic("test-topic");
运行多个消费者时,一个生产消息会被多个消费到
4.spring整合 (以下对应视频文件夹3)
4.1 点对点方式
生产者:创建maven项目springjms_producer,引入相关依赖:
<properties>
<spring.version>4.2.4.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
复制配置文件applicationContext-jms-producer.xml到资源目录下: (E:\2019年4月黑马程序员教程\05-黑马JavaEE49期全套\17品优购电商系统开发\资源\配置文件\JMS)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="cn.itcast.demo"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.130:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 队列名 -->
<constructor-arg value="queue_text"/>
</bean>
<!--这个是订阅模式 文本信息-->
<!-- <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text"/>
</bean> -->
</beans>
创建消息生产类:
@Component
public class QueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueTextDestination;// 和配置文件中的名字保持一致
public void sendTextMessage(final String text) {
jmsTemplate.send(queueTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
测试类,执行即可在消息列表看到消息:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-jms-producer.xml")
public class TestQueue {
@Autowired
private QueueProducer queueProducer;
@Test
public void testSend() {
queueProducer.sendTextMessage("测试发送文本消息");
}
}
消费者:创建maven工程springjms_consumer,依赖同上,复制配置文件applicationContext-jms-consumer-queue.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.130:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueTextDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
创建消费者:
//这里不用写注解,因为配置文件中已经指定了该类。也可以像producer中的配置一样,写一个扫描包配置然后加Component注解
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息: " + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
测试类:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-jms-consumer-queue.xml")
public class TestQueue {
/**
* 测试方法不用些什么语句的,只要加载环境即可消费消息
* 不过还是要使程序发生暂停才能监听
*/
@Test
public void testQueue() {
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
4.2 发布订阅方式
生产者:
只需将配置文件中订阅模式的注释放开,复制一份QueueProducer.java为TopicProducer.java,将Destination改名和配置文件中topicTextDestination一致,测试类也相应改成Topic即可
消费者:
复制queue的配置文件为applicationContext-jms-consumer-topic.xml,修改配置。后面ref的内容也同步修改:
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text"/>
</bean>
复制创建测试类TestTopic.java加载topic的配置文件即可
测试时可以启动三次消费者,然后启动一次生产者,可以看到三个消费者都有消息打印
5. 项目改造
涉及的模块:pinyougou-manager-web作为生产者,pinyougou-search-service作为消费者。管理台web审核通过商品时,发送消息,有搜索服务消费消息,对审核通过的商品如solr索引库
这个操作原先是写在managerweb模块的,web通过对searchservice的远程依赖完成导入;现在这样做就可以完成解耦
删除商品时需要删除solr索引库的内容,也添加了消息监听类ItemDeleteListener.java
继续删除managerweb对pageinterface的依赖,详情页生成的方式也改为消息模式,到那时因为详情页需要部署在多个服务器上,所以这里的消息选择主题类型。
对应pageservice服务不再需要通过dubbo被web使用,所以去掉dubbo的依赖,同时加入activemq的依赖,spring的配置文件也去除对应配置,添加activemq的配置文件
pageservice添加html删除的消息监听类PageDeleteListener.java
DOWN 返回