1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。
为什么会产生消息队列?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
2.学习五种队列
2.1简单队列
P:消息的生产者
C:消息的消费者
红色:队列
2.1.1 maven导入依赖
<dependencies>
<!-- 引入队列依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
2.1.2获取MQ的连接
/**rabbitMQ ConnectionUtil
* @author by cyf
* @date 2020/5/20.
*/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置账号密码 端口号 主机地址
factory.setUsername("cyf");
factory.setPassword("123");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/mqtest");
Connection connection = factory.newConnection();
return connection;
}
}
linux的rabbitMQ总是自动挂掉,采用本地rabbitMQ服务
2.1.3生产者发送消息到队列
/**
* 生产者
*
* @author by cyf
* @date 2020/5/20.
*/
public class Send {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello queue";
System.out.println("message:" + message);
//消息传入
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
}
}
2.1.4消费者从队列中获取消息
/**消费者
* @author by cyf
* @date 2020/5/20.
*/
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String smg = new String(body,"utf-8");
System.out.println(new Date());
System.out.println("recv:" + smg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,defaultConsumer);
}
2.2 Work模式
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
2.2.1 生产者
/**生产者
* @author by cyf
* @date 2020/5/21.
*/
public class Send {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
boolean durable = false;//消息持久化
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
for (int i = 0; i < 30; i++) {
String msg = ""+i;
//发送消息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("send:msg"+i);
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
2.2.2 消费者1
/**消费者1
* @author by cyf
* @date 2020/5/21.
*/
public class Rec1 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[1] Recv:msg"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
}
}
};
Boolean autoAsk = true;
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
2.2.3 消费者2
/**消费者2
* @author by cyf
* @date 2020/5/21.
*/
public class Rec2 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
bol
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[2] Recv:msg"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
}
}
};
Boolean autoAsk = true;
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
2.24.测试
测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。
怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。
2个概念
轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
//生产者里面添加
//保证每次发生只发送一条给消费者
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//消费者里面添加
channel.basicQos(1);//保证一次只分发一个
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
2.3 消息应答与持久化
2.3.1消息应答
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
自动模式:
boolean autoAsk = true;
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);//自动确认
手动确认
boolean autoAsk = false;
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);//手动确认
2.3.2消息持久化
//声明队列
boolean durable = false;//消息持久化
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
可以将队列数据持久化,这样rabbitMQ挂了数据也不会丢失
2.4 订阅模式
解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
2.4.1 消息的生产者(看作是后台系统)
向交换机中发送消息。
/**生产者 订阅
* @author by cyf
* @date 2020/5/21.
*/
public class Send {
public static final String EXCHANGE_NAME="exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//消息
String msg = "hello ps!";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
2.4.2 消息消费者
/**消费者1 看作发向邮箱
* @author by cyf
* @date 2020/5/21.
*/
public class Rec1 {
public static final String QUEUE_NAME = "queue_email";
public static final String EXCHANGE_NAME ="exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[1] Recv:msg"+msg);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
/**消费者2 看作发向手机
* @author by cyf
* @date 2020/5/21.
*/
public class Rec2 {
public static final String QUEUE_NAME = "queue_phone";
public static final String EXCHANGE_NAME ="exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[2] Recv:msg"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
2.5 路由模式
2.5.1 生产者
/**路由模式 生产者
* @author by cyf
* @date 2020/5/21.
*/
public class Send {
public static final String EXCHANGE_NAME="exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机, “direct”是交换机的类型,此处为路由模式
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String routingKey = "create";//key
//消息
String msg = "direct!";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}
2.5.2 消费者
/**
* @author by cyf
* @date 2020/5/21.
*/
public class Rec1 {
public static final String QUEUE_NAME = "queue_routing-1";
public static final String EXCHANGE_NAME ="exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定到交换机,绑定两个key,分别为 delete和updatta
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[1] Recv:msg"+msg);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
/**
* @author by cyf
* @date 2020/5/21.
*/
public class Rec2 {
public static final String QUEUE_NAME = "queue_routing-2";
public static final String EXCHANGE_NAME ="exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定到交换机,绑定了三个key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"create");
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[2] Recv:msg"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
2.6 主题模式
类似于路由模式,交换机带有key
匹配时 * 表示匹配一个
#表示匹配所有
2.6.1 生产者
/**主题模式 生产者
* @author by cyf
* @date 2020/5/21.
*/
public class Send {
public static final String EXCHANGE_NAME="exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//String routingKey = "goods.create";//key
String routingKey = "goods.delete";
//消息
String msg = "商品";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}
2.6.2 消者者
/**
* @author by cyf
* @date 2020/5/21.
*/
public class Rec1 {
public static final String QUEUE_NAME = "queue_topic_1";
public static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.delete");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.insert");
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "UTF-8");
System.out.println("[1] Recv:msg:" + msg);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME, autoAsk, consumer);
}
/**
* @author by cyf
* @date 2020/5/21.
*/
public class Rec2 {
public static final String QUEUE_NAME = "queue_topic_2";
public static final String EXCHANGE_NAME ="exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");//#表示匹配所有
channel.basicQos(1);//保证一次只分发一个
//消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body,"UTF-8");
System.out.println("[2] Recv:msg"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
//每次消费完 通知队列继续发送
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//
Boolean autoAsk = false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAsk,consumer);
}
}
3.springboot-rabbitMQ
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=cyf
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/mqtest
配置类
/**
* @author by cyf
* @date 2020/5/22.
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("q_hello");
}
}
1、简单队列
生产者
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
public class Send {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String data = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
String msg = "hello rabbit" +data;
System.out.println("Send:"+msg);
//简单对列的情况下routingKey即为Q名
rabbitTemplate.convertAndSend("q_hell",msg);
}
消费者
@Component
@RabbitListener(queues = "q_hello")
public class Receiver {
@RabbitHandler
public void process(String message){
System.out.println(message);
}
}
test
@SpringBootTest
class DemoApplicationTests {
@Autowired
private Send sender;
@Test
void helloSimple() {
sender.send();
}
输出:
Send:hello rabbit2020-05-22 11:14:53
hello rabbit2020-05-22 11:14:53
2、Work模式
/**生产者
* @author by cyf
* @date 2020/5/22.
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String data = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
String msg = "hello rabbit " + i +" "+ data;
System.out.println("Send:" + msg);
//简单对列的情况下routingKey即为Q名
rabbitTemplate.convertAndSend("q_hello", msg);
}
}
/**消费者1
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_hello")
public class Receiver {
@RabbitHandler
public void process(String message){
System.out.println("Receiver1:"+message);
}
}
/**消费者2
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_hello")
public class Receiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}
测试类
@Autowired
private Sender workSender;
@Test
void workQueue() throws InterruptedException {
for (int i = 0; i <50 ; i++) {
workSender.send(i);
Thread.sleep(200);
}
}
输出:
Send:hello rabbit 0 2020-05-22 11:38:00
Receiver1:hello rabbit 0 2020-05-22 11:38:00
Send:hello rabbit 1 2020-05-22 11:38:00
Receiver2:hello rabbit 1 2020-05-22 11:38:00
Send:hello rabbit 2 2020-05-22 11:38:00
Receiver1:hello rabbit 2 2020-05-22 11:38:00
Send:hello rabbit 3 2020-05-22 11:38:01
Receiver2:hello rabbit 3 2020-05-22 11:38:01
Send:hello rabbit 4 2020-05-22 11:38:01
Receiver1:hello rabbit 4 2020-05-22 11:38:01
...
3、Topic Exchange(主题模式)
- topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列(消费者)来演示。
1)配置队列,绑定交换机
/**创建两个队列并绑定到同一个交换机,设置对应的routingkey
* @author by cyf
* @date 2020/5/22.
*/
@Configuration
public class TopicRabbitConfig {
public static final String MESSAGE = "q_topic_message";
public static final String MESSAGES = "q_topic_messages";
//创建队列
@Bean
public Queue queueMessage(){
return new Queue(MESSAGE);
}
@Bean
public Queue queueMessages(){
return new Queue(MESSAGES);
}
//声明topic交换机
@Bean
public TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
//将队列绑定到交换机,并指定routingkey
@Bean
public Binding bingExchangeMessage(Queue queueMessage,TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
public Binding bingExchangeMessages(Queue queueMessages,TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
生产者 和两个消费者
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
public class MagSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void Send1(){
String message = "hello,i am message1";
System.out.println("send:"+message);
rabbitTemplate.convertAndSend("topicExchange","topic.message",message);
}
public void Send2(){
String message = "hello,i am message2";
System.out.println("send:"+message);
rabbitTemplate.convertAndSend("topicExchange","topic.messages",message);
}
}
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_topic_message")
public class TopicReceiver1 {
@RabbitHandler
public void receiver(String message){
System.out.println("Receiver1:"+message);
}
}
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_topic_messages")
public class TopicReceiver2 {
@RabbitHandler
public void receiver(String message){
System.out.println("Receiver2:"+message);
}
}
send1方法会匹配到topic.#和topic.message,
两个Receiver都可以收到消息,
发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
测试
@Test
void topicQueue(){
magSender.Send1();
magSender.Send2();
}
输出:
send:hello,i am message1
send:hello,i am message2
Receiver1:hello,i am message1
Receiver2:hello,i am message1
Receiver2:hello,i am message2
4、订阅模式
-
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
1、配置队列,绑定交换机
/**
* @author by cyf
* @date 2020/5/22.
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue aMessage() {
return new Queue("q_fanout_A");
}
@Bean
public Queue bMessage() {
return new Queue("q_fanout_B");
}
@Bean
public Queue cMessage() {
return new Queue("q_fanout_C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("mybootfanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}
}
2、生产者 消费者
@Component
public class MySend {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
}
}
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_fanout_A")
public class FanoutRec1 {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
/**
* @author by cyf
* @date 2020/5/22.
*/
@Component
@RabbitListener(queues = "q_fanout_B")
public class FanoutRec2 {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
@Test
void fanoutQueue(){
mySend.send();
}
输出:
Sender : hi, fanout msg
AReceiver : hi, fanout msg /n
BReceiver : hi, fanout msg /n