什么是MQ
消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
MQ是一个消息中间件,ActiveMQ、RabbitMQ、kafka
ActiveMQ
什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
- 主要特点:
- 1、多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 4、通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 5、支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 6、支持通过JDBC和journal提供高速的消息持久化
- 7、从设计上保证了高性能的集群,客户端-服务器,点对点
- 8、支持Ajax
- 9、支持与Axis的整合
- 10、可以很容易得调用内嵌JMS provider,进行测试
什么情况下使用ActiveMQ?
- 多个项目之间集成
- 跨平台
- 多语言
- 多项目
- 降低系统间模块的耦合度,解耦
- 软件扩展性
- 系统前后端隔离
- 前后端隔离,屏蔽高安全区
ActiveMQ的消息形式
消息的传递有两种类型:
- 一种是点对点的,即一个生产者和一个消费者一一对应;(一对一)
- 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。(广播)
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage – Java原始值的数据流
- MapMessage–一套名称-值对
- TextMessage–一个字符串对象(使用的最多)
- ObjectMessage–一个序列化的 Java对象
- BytesMessage–一个字节的数据流
ActiveMQ的安装
解压压缩文件, 进入/bin
目录,
- 运行:
./activemq start
- 关闭:
./activemq stop
- 查询状态:
./activemq status
如果服务器安装了防火墙请开放8161
端口, 该端口是ActiveMQ默认的管理端口.
然后浏览器打开http://服务器ip地址:8161/admin
, 用户名和密码默认是admin
ActiveMQ的使用
ActiveMQ的Maven依赖:1
2
3
4
5<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
Queue
默认会持久化到服务端
Producer: 消息发送者
- 1、创建ConnectionFactory对象,需要指定服务端ip及端口号, 默认消息服务端口
61616
, 有防火墙请开放此端口。 - 2、使用ConnectionFactory对象创建一个Connection对象。
- 3、开启连接,调用Connection对象的start方法。
- 4、使用Connection对象创建一个Session对象。
- 5、使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
- 6、使用Session对象创建一个Producer对象。
- 7、创建一个Message对象,创建一个TextMessage对象。
- 8、使用Producer对象发送消息。
- 9、关闭资源。
1 | package com.test; |
Consumer: 消息接受者
- 1、创建一个ConnectionFactory对象。
- 2、从ConnectionFactory对象中获得一个Connection对象。
- 3、开启连接。调用Connection对象的start方法。
- 4、使用Connection对象创建一个Session对象。
- 5、使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
- 6、使用Session对象创建一个Consumer对象。
- 7、接收消息。
- 8、打印消息。
- 9、关闭资源
1 | package com.test; |
Topic
默认不会持久化到服务端,但是可以配置持久化到服务器, 需要先运行消息接受者.
Producer: 消息发送者
- 1、创建ConnectionFactory对象,需要指定服务端ip及端口号。
- 2、使用ConnectionFactory对象创建一个Connection对象。
- 3、开启连接,调用Connection对象的start方法。
- 4、使用Connection对象创建一个Session对象。
- 5、使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
- 6、使用Session对象创建一个Producer对象。
- 7、创建一个Message对象,创建一个TextMessage对象。
- 8、使用Producer对象发送消息。
- 9、关闭资源。
1 | package com.test; |
Consumer: 消息接受者
- 1、创建一个ConnectionFactory对象。
- 2、从ConnectionFactory对象中获得一个Connection对象。
- 3、开启连接。调用Connection对象的start方法。
- 4、使用Connection对象创建一个Session对象。
- 5、使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
- 6、使用Session对象创建一个Consumer对象。
- 7、接收消息。
- 8、打印消息。
- 9、关闭资源
1 | package com.test; |
Spring整合ActiveMQ
发送消息
导入相关jar包
1 | <dependency> |
spring配置文件applicationContext-activemq.xml
- 1、配置ConnectionFactory
- 2、配置生产者, 使用JMSTemplate对象
- 3、配置Destination
- 注意构造函数的参数是自定义queue/topic的名称.
1 |
|
测试代码
1 | package com.test; |
接受消息
导入相关jar包
1 | <!--导入ActiveMQ的jar包--> |
创建MessageListener实现类
1 | package com.coppco.search.listener; |
spring配置文件applicationContext-activemq.xml
- 1、配置ConnectionFactory
- 2、配置Destination
- 注意构造函数的参数是自定义queue/topic的名称要和前面设置的一样.
- 3、配置消息监听者
1 |
|
测试代码
1 |