08 JMS.txt

UP 返回
对应视频文件夹:13消息中间件解决方案JMS

系统结构变动:
	!!@@D:\MyJAR_Project\WordsCollection\refer\2022030513.png_865_493_1@@!!

1. 定义	(以下对应视频文件夹1)
	JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,JMS本身只定义了一系列的接口规范,用来访问消息收发系统,类似于 JDBC
	JMS 定义了五种不同的消息正文格式:
		· TextMessage--一个字符串对象
		· MapMessage--一套名称-值对	和上面的用的最多
		· ObjectMessage--一个序列化的 Java 对象
		· BytesMessage--一个字节的数据流
		· StreamMessage -- Java 原始值的数据流		类似于MapMessage,不过他的存取是有序的
	消息的传递有两种类型:
		!!@@D:\MyJAR_Project\WordsCollection\refer\202203021.png_577_442_1@@!!

2. ActiveMQ安装
	官方网站下载:http://activemq.apache.org/
	将apache-activemq-5.12.0-bin.tar.gz 上传至服务器(参照以前的方法)
		进入ftp,put D:\EnvironmentDevs\apache-activemq-5.12.0-bin.tar.gz
	解压	tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
	为apache-activemq-5.12.0目录赋权	chmod 777 apache-activemq-5.12.0
	进入	apache-activemq-5.12.0\bin
	赋与执行权限	chmod 755 activemq 
		linux 命令chmod 755的意思:
			chmod是Linux下设置文件权限的命令,后面的数字表示不同用户或用户组的权限。一般是三个数字:
				第一个数字表示文件所有者的权限
				第二个数字表示与文件所有者同属一个用户组的其他用户的权限
				第三个数字表示其它用户组的权限。
		      权限分为三种:读(r=4),写(w=2),执行(x=1) 。 综合起来还有可读可执行(rx=5=4+1)、可读可写(rw=6=4+2)、可读可写可执行(rwx=7=4+2+1)。
		      所以,chmod 755 设置用户的权限为: 
				1.文件所有者可读可写可执行                                    --7
				2.与文件所有者同属一个用户组的其他用户可读可执行 --5 
				3.其它用户组可读可执行                                          --5
	启动	./activemq start
		!!@@D:\MyJAR_Project\WordsCollection\refer\202203051.png_826_79_1@@!!
	访问服务器对应的地址		http://192.168.25.130:8161/
		!!@@D:\MyJAR_Project\WordsCollection\refer\202203052.png_860_491_0.5@@!!
	点击Manage ActiveMQ broker,输入admin admin即可进入主页面。常用的就是Queues(点对点)和Topics(发布订阅),如果有消息是可以在下面看到列表的
		!!@@D:\MyJAR_Project\WordsCollection\refer\2022030512.png_1037_380_0.5@@!!
		列表各列信息含义如下:
			Number Of Pending Messages  :等待消费的消息 这个是当前未出队列的数量。
			Number Of Consumers  :消费者 这个是消费者端的消费者数量
			Messages Enqueued  :进入队列的消息  进入队列的总数量,包括出队列的。
			Messages Dequeued  :出了队列的消息  可以理解为是消费这消费掉的数量。

3.入门demo	(以下对应视频文件夹2)
	创建maven工程jmsDemo
	创建类和main方法(队列生产者)
		public class QueueProducer {
			public static void main(String[] args) throws JMSException {
				//1.创建连接工厂
				ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
				//2.获取连接
				Connection connection = connectionFactory.createConnection();
				//3.启动连接
				connection.start();
				//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
				//•	AUTO_ACKNOWLEDGE = 1    自动确认
				//•	CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
				//•	DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
				//•	SESSION_TRANSACTED = 0    事务提交并确认
				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
				//5.创建队列对象
				Queue queue = session.createQueue("test-queue");
				//6.创建消息生产者
				MessageProducer producer = session.createProducer(queue);
				//7.创建消息
				TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
				//8.发送消息
				producer.send(textMessage);
				//9.关闭资源
				producer.close();
				session.close();
				connection.close();
			}
		}
	运行上述代码以后可以看到queue列表中多了一条记录
	创建队列消费者
		public class QueueConsumer {
			public static void main(String[] args) throws JMSException, IOException {
				//1.创建连接工厂
				ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
				//2.获取连接
				Connection connection = connectionFactory.createConnection();
				//3.启动连接
				connection.start();
				//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
				//5.创建队列对象 队列名要和生产者保持一致
				Queue queue = session.createQueue("test-queue");
				//6.创建消息消费
				MessageConsumer consumer = session.createConsumer(queue);
		
				//7.监听消息
				consumer.setMessageListener(new MessageListener() {
					public void onMessage(Message message) {
						TextMessage textMessage = (TextMessage) message;
						try {
							System.out.println("接收到消息:" + textMessage.getText());
						} catch (JMSException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				});
				//8.等待键盘输入 因为是控制台程序,这一句是为了等待控制台输入产生等待,不然直接执行到后面,监听还没产生就结束了,无法达到效果
				System.in.read();
				//9.关闭资源
				consumer.close();
				session.close();
				connection.close();
			}
		}
	执行消费者可以看到消息的消费信息。如果停掉消费者,再执行两次生产者产生消息,再执行一次消费者可以看到一下消费两条信息。对应结果可以看列表
	如果启动多个消费者,只会有一个消费到消息
	主题生产者和消费者只需要将第五步改成创建主题就行了:Topic topic = session.createTopic("test-topic");
	运行多个消费者时,一个生产消息会被多个消费到

4.spring整合	(以下对应视频文件夹3)
  4.1 点对点方式
	生产者:创建maven项目springjms_producer,引入相关依赖:
	  <properties>
	  	<spring.version>4.2.4.RELEASE</spring.version>
	  </properties>
	  
	  <dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>
	  	<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.9</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-client</artifactId>
			<version>5.13.4</version>
		 </dependency>
	  </dependencies>
	复制配置文件applicationContext-jms-producer.xml到资源目录下: (E:\2019年4月黑马程序员教程\05-黑马JavaEE49期全套\17品优购电商系统开发\资源\配置文件\JMS)
		<?xml version="1.0" encoding="UTF-8"?>
		<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
			xmlns:jms="http://www.springframework.org/schema/jms"
			xsi:schemaLocation="http://www.springframework.org/schema/beans   
				http://www.springframework.org/schema/beans/spring-beans.xsd
				http://www.springframework.org/schema/context   
				http://www.springframework.org/schema/context/spring-context.xsd">
				
				
			<context:component-scan base-package="cn.itcast.demo"></context:component-scan>     
			
			   
		    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
			<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
			    <property name="brokerURL" value="tcp://192.168.25.130:61616"/>  
			</bean>
			   
		    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
			<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
			<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
			    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
			</bean>  
				   
		    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
			<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
			    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
			    <property name="connectionFactory" ref="connectionFactory"/>  
			</bean>      
		    <!--这个是队列目的地,点对点的  文本信息-->  
			<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
				<!-- 队列名 -->
			    <constructor-arg value="queue_text"/>  
			</bean>    
			
			<!--这个是订阅模式  文本信息-->  
			<!-- <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
			    <constructor-arg value="topic_text"/>  
			</bean>   -->
			
		</beans>
	创建消息生产类:
		@Component
		public class QueueProducer {
		
			@Autowired
			private JmsTemplate jmsTemplate;
		
			@Autowired
			private Destination queueTextDestination;// 和配置文件中的名字保持一致
		
			public void sendTextMessage(final String text) {
				jmsTemplate.send(queueTextDestination, new MessageCreator() {
					public Message createMessage(Session session) throws JMSException {
						return session.createTextMessage(text);
					}
				});
			}
		}
	测试类,执行即可在消息列表看到消息:
		@RunWith(SpringJUnit4ClassRunner.class)
		@ContextConfiguration(locations = "classpath:applicationContext-jms-producer.xml")
		public class TestQueue {
			@Autowired
			private QueueProducer queueProducer;
		
			@Test
			public void testSend() {
				queueProducer.sendTextMessage("测试发送文本消息");
			}
		}

	消费者:创建maven工程springjms_consumer,依赖同上,复制配置文件applicationContext-jms-consumer-queue.xml:
		<?xml version="1.0" encoding="UTF-8"?>
		<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
			xmlns:jms="http://www.springframework.org/schema/jms"
			xsi:schemaLocation="http://www.springframework.org/schema/beans   
				http://www.springframework.org/schema/beans/spring-beans.xsd
				http://www.springframework.org/schema/context   
				http://www.springframework.org/schema/context/spring-context.xsd">
			
		    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
			<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
			    <property name="brokerURL" value="tcp://192.168.25.130:61616"/>  
			</bean>
			   
		    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
			<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
			<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
			    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
			</bean>  
			
		    <!--这个是队列目的地,点对点的  文本信息-->  
			<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
			    <constructor-arg value="queue_text"/>  
			</bean>    
			
			<!-- 我的监听类 -->
			<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>
			<!-- 消息监听容器 -->
			<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
				<property name="connectionFactory" ref="connectionFactory" />
				<property name="destination" ref="queueTextDestination" />
				<property name="messageListener" ref="myMessageListener" />
			</bean>
			
		</beans>
	创建消费者:
		//这里不用写注解,因为配置文件中已经指定了该类。也可以像producer中的配置一样,写一个扫描包配置然后加Component注解
		public class MyMessageListener implements MessageListener {
		
			public void onMessage(Message message) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("接收到消息: " + textMessage.getText());
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	测试类:
		@RunWith(SpringJUnit4ClassRunner.class)
		@ContextConfiguration(locations = "classpath:applicationContext-jms-consumer-queue.xml")
		public class TestQueue {
			/**
			 * 测试方法不用些什么语句的,只要加载环境即可消费消息
			 * 不过还是要使程序发生暂停才能监听
			 */
			@Test
			public void testQueue() {
				try {
					System.in.read();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

  4.2 发布订阅方式
	生产者:
		只需将配置文件中订阅模式的注释放开,复制一份QueueProducer.java为TopicProducer.java,将Destination改名和配置文件中topicTextDestination一致,测试类也相应改成Topic即可
	消费者:
		复制queue的配置文件为applicationContext-jms-consumer-topic.xml,修改配置。后面ref的内容也同步修改:
			<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
			    <constructor-arg value="topic_text"/>  
			</bean> 
		复制创建测试类TestTopic.java加载topic的配置文件即可
	测试时可以启动三次消费者,然后启动一次生产者,可以看到三个消费者都有消息打印

5. 项目改造
  涉及的模块:pinyougou-manager-web作为生产者,pinyougou-search-service作为消费者。管理台web审核通过商品时,发送消息,有搜索服务消费消息,对审核通过的商品如solr索引库
  这个操作原先是写在managerweb模块的,web通过对searchservice的远程依赖完成导入;现在这样做就可以完成解耦

  删除商品时需要删除solr索引库的内容,也添加了消息监听类ItemDeleteListener.java

  继续删除managerweb对pageinterface的依赖,详情页生成的方式也改为消息模式,到那时因为详情页需要部署在多个服务器上,所以这里的消息选择主题类型。
  对应pageservice服务不再需要通过dubbo被web使用,所以去掉dubbo的依赖,同时加入activemq的依赖,spring的配置文件也去除对应配置,添加activemq的配置文件

  pageservice添加html删除的消息监听类PageDeleteListener.java



DOWN 返回