简单队列(Simple Queue)
约 1071 字大约 4 分钟
2025-03-11
消息模式
简单队列(Simple Queue)
一个生产者对应一个消费者,消息直接发送到队列。 官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
面板操作
- 创建一个队列
- 在默认交换处模拟生产者发送消息 因为该队列绑定的是默认交换机,所以消息会直接发送到队列中。
注:Routing Key 写队列名
- 队列处查看消息
- 模拟消费者接收消息(查看消息内容)
AckMode : 应答模式
- Nack: 不应答,只查看,消息不会移除队列
- Ack: 应答模式,查看并移除队列
代码操作
- 导入依赖
- java原生依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
- Spring依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.5.RELEASE</version> </dependency>
- Spring Boot依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
根据自己的项目环境进行选择即可。 2. 定义生产者
package com.syh.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* @author shan
* @date 2024/5/16 14:39
*/
public class Producer {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("47.120.37.156");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 创建连接
connection = factory.newConnection();
// 创建通道
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("hello", false, false, false, null);
// 6: 发送消息
String message = "Hello World!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange,会有一个默认交换机
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
System.out.println(" [x] Unexpected exception: " + e.getMessage());
} finally {
// 关闭连接和通道
if (connection!= null && channel.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
执行发送,这个时候可以在web控制台查看到这个队列queue的信息。 3. 定义消费者
package com.syh.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author shan
* @date 2024/5/16 14:48
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("47.120.37.156");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
执行消费者,这个时候可以看到控制台输出接收到的消息。
工作队列(Work Queue)
多个生产者对应多个消费者,消息被平均分摊到多个队列,每个消费者只能从自己队列中消费消息。
发布/订阅(Publish/Subscribe)
一个生产者可以向多个消费者发送消息,消费者可以订阅多个队列,只接收感兴趣的消息。
路由(Routing)
一个生产者可以向多个消费者发送消息,消息根据路由键(routing key)分发到对应的队列。
主题(Topic)
一个生产者可以向多个消费者发送消息,消息根据主题(topic)分发到对应的队列。