路由模式(Routing)
约 818 字大约 3 分钟
2025-03-11
2,图形化界面理解
2.1 创建direct交换机
2.2 交换机与队列绑定
:要指定routingkey2.3 模拟生产者发送消息
2.4 查看队列消息
package com.syh.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author shan
* @date 2024/5/16 21:10
*/
public class Product {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
String message = "你好,学相伴!!!";
String exchangeName = "dicrect-exchange";
String routingKey1 = "email";
String routingKey2 = "sms";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
3.2 消费者
package com.syh.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author shan
* @date 2024/5/16 21:14
*/
public class Consumer {
private static Runnable runnable = () -> {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.120.37.156");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去执行
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
}
}