下载安装
RabbitMQ官网:https://www.rabbitmq.com/
下载地址
RabbitMQ下载链接在Github中,可以从官网处https://www.rabbitmq.com/#getstarted点击进入,也可以直接输入地址进入:https://github.com/rabbitmq/rabbitmq-server/releases。选择版本
Linux CentOS版本:rabbitmq-server-3.11.15-1.el8.noarch.rpm
Windows版本:rabbitmq-server-3.11.15.exe安装ErLang
由于RabbitMQ是采用ErLang语言开发的,所以需要ErLang支持,下载地址
https://www.erlang.org/downloads,选择对应的版本即可。
Windows安装ErLang相对简单,就是一直下一步即可,Linux安装较麻烦,以下介绍CentOS中安装ErLang步骤。
安装Erlang环境
检查Linux是否有erlang环境
1erl
安装GCC、GCC-C++、Openssl等模块,安装过就不需要安装了
1yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
安装驱动UnixODBC
1yum install unixODBC unixODBC-devel
解压ErLang安装包,并进入文件夹
1tar -zxvf otp_src_25.3.2.tar.gz
2cd otp_src_25.3.2
进行编译配置
5.1. 如果没有JDK,或者就是新的Linux环境用如下命令:
1./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
5.2. 如果有JDK环境用上面或者下面命令(建议直接用上面命令):
1./configure --prefix=/usr/local/erlang
其中APPLICATIONS DISABLED 标示是必须要安装的,另外两个项目可以忽略,我们上上面--without-javac忽略了java编译,出现这个就算配置成功了。
编译安装
1./configure --prefix=/usr/local/erlang
将erlang的启动软链到/usr/local/bin/下
7.1. 配置快捷方式(比较偷懒的方法)
1ln -s /usr/local/erlang/bin/erl /usr/local/bin/
7.2. 配置erlang环境
1vim /etc/profile
追加如下配置
1export ERLANG_HOME=/usr/local/erlang
2export PATH=$PATH:$ERLANG_HOME/bin
重新加载profile
1source /etc/profile
测试
1erl
安装RabbitMQ
1yum install rabbitmq-server-3.11.15-1.el8.noarch.rpm
常用命令
1# 启动
2service rabbitmq-server start
3# 查看状态
4rabbitmqctl status
5# 启用插件 (图像化界面)
6rabbitmq-plugins enable rabbitmq_management
7# 重启
8service rabbitmq-server restart
其它操作在图形化界面中均可完成,这里不做详细解释
Springboot 链接 RabbitMQ
Maven依赖
1<!--rabbitmq-->
2<dependency>
3 <groupId>org.springframework.boot</groupId>
4 <artifactId>spring-boot-starter-amqp</artifactId>
5</dependency>
application.yml
1spring:
2 # 配置RabbitMq 服务器
3 rabbitmq:
4 host: 127.0.0.1
5 port: 5672
6 username: guest
7 password: guest
8 # 虚拟host可以不设置,默认/
9 virtual-host: /
注意:里面的虚拟host配置项不是必须的,我自己在rabbitmq服务上创建了自己的虚拟host,所以我配置了;你们不创建,就不用加这个配置项(默认写 :virtual-host: /)。
接着我们先使用下direct exchange(直连型交换机),创建RabbitDirectProducerConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
1package cn.xuexiluxian.crm.websocket.config;
2
3import cn.xuexiluxian.open.common.constant.Constants;
4import org.springframework.amqp.core.Binding;
5import org.springframework.amqp.core.BindingBuilder;
6import org.springframework.amqp.core.DirectExchange;
7import org.springframework.amqp.core.Queue;
8import org.springframework.context.annotation.Bean;
9import org.springframework.context.annotation.Configuration;
10
11
12/**
13 * RabbitMQ直连配置
14 */
15@Configuration
16public class RabbitDirectProducerConfig {
17 //队列 起名:TestDirectQueue
18 @Bean
19 public Queue crmProducerDirectQueue() {
20 // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
21 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
22 // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
23 // return new Queue("TestDirectQueue",true,true,false);
24 //一般设置一下队列的持久化就好,其余两个就是默认false
25 return new Queue(Constants.RABBITMQ_DEFAULT_QUEUE,true);
26 }
27
28 //Direct交换机 起名:TestDirectExchange
29 @Bean
30 DirectExchange crmProducerDirectExchange() {
31 // return new DirectExchange("TestDirectExchange",true,true);
32 return new DirectExchange(Constants.RABBITMQ_DEFAULT_DIRECT_EXCHANGE,true,false);
33 }
34
35 //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
36 @Bean
37 Binding bindingProducerDirect() {
38 return BindingBuilder.bind(crmProducerDirectQueue()).to(crmProducerDirectExchange()).with(Constants.RABBITMQ_DEFAULT_DIRECT_ROUTING);
39 }
40
41 @Bean
42 DirectExchange lonelyDirectExchange() {
43 return new DirectExchange("lonelyDirectExchange");
44 }
45}
测试生产者
4.1. 注入
1@Autowired
2private RabbitTemplate rabbitTemplate;
4.1. 发送消息
1rabbitTemplate.convertAndSend(Constants.RABBITMQ_DEFAULT_DIRECT_EXCHANGE, Constants.RABBITMQ_DEFAULT_DIRECT_ROUTING, "Hello RabbitMQ");
配置消费者
5.1. Maven依赖参考上面第一步
5.2. 消费者配置类RabbitDirectConsumerConfig.java
1package cn.xuexiluxian.crm.websocket.config;
2
3import cn.xuexiluxian.open.common.constant.Constants;
4import org.springframework.amqp.core.Binding;
5import org.springframework.amqp.core.BindingBuilder;
6import org.springframework.amqp.core.DirectExchange;
7import org.springframework.amqp.core.Queue;
8import org.springframework.context.annotation.Bean;
9import org.springframework.context.annotation.Configuration;
10
11/**
12 * @Author : JCccc
13 * @CreateTime : 2019/9/3
14 * @Description :
15 **/
16@Configuration
17public class RabbitDirectConsumerConfig {
18 @Bean
19 public Queue crmConsumerDirectQueue() {
20 return new Queue(Constants.RABBITMQ_DEFAULT_QUEUE,true);
21 }
22
23 //Direct交换机
24 @Bean
25 DirectExchange crmConsumerDirectExchange() {
26 return new DirectExchange(Constants.RABBITMQ_DEFAULT_DIRECT_EXCHANGE);
27 }
28
29 //绑定:将队列和交换机绑定,并设置用于匹配键
30 @Bean
31 Binding bindingConsumerDirect() {
32 return BindingBuilder.bind(crmConsumerDirectQueue()).to(crmConsumerDirectExchange()).with(Constants.RABBITMQ_DEFAULT_DIRECT_ROUTING);
33 }
34}
监听器
1@Component
2@RabbitListener(queues = Constants.RABBITMQ_DEFAULT_QUEUE) //监听的队列名称
3public class CrmRabbitMQListener {
4 @RabbitHandler
5 public void callback(Map testMessage) {
6 System.out.println("消费者收到消息 : " + testMessage.toString());
7 }
8}
常量类
1package cn.xuexiluxian.open.common.constant;
2
3public class Constants {
4 public static final String RABBITMQ_DEFAULT_QUEUE = "xiaoluxian-crm-queue";
5 public static final String RABBITMQ_DEFAULT_DIRECT_EXCHANGE = "xiaoluxian-crm-direct_exchange";
6 public static final String RABBITMQ_DEFAULT_DIRECT_ROUTING = "xiaoluxian-crm-direct-routing";
7}