1. 概述
软件组件的解耦是软件设计中最重要的部分之一,实现此目的的一种方法是使用消息传递系统,它提供组件(服务)之间的异步通信方式。在本文中,我们将介绍其中一个系统:RabbitMQ。
RabbitMQ是一个实现高级消息队列协议(AMQP)的消息代理,它提供主要编程语言的客户端库。
除了用于解耦软件组件外,RabbitMQ还可用于:
- 执行后台操作
- 执行异步操作
2. 消息传递模型
首先,让我们快速、高层次地了解一下消息传递的工作原理。
简单地说,有两种类型的应用程序与消息传递系统交互:生产者和消费者。生产者是那些向代理发送(发布)消息的服务,而消费者则是从代理接收消息的服务。通常,这些程序(软件组件)运行在不同的机器上,RabbitMQ充当它们之间的通信中间件。
在本文中,我们将演示一个简单示例,包含两个使用RabbitMQ进行通信的服务。其中一个服务将向RabbitMQ发布消息,另一个服务消费该消息。
3. 设置
首先,我们使用此处的官方设置指南运行RabbitMQ。
我们自然会使用Java客户端与RabbitMQ服务器进行交互;此客户端的Maven依赖项是:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
使用官方指南运行RabbitMQ代理后,我们需要使用Java客户端连接到它:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
我们使用ConnectionFactory来设置与服务器的连接,它还负责协议(AMQP)和身份验证。这里我们连接到localhost上的服务器,我们可以使用setHost函数修改主机名。
如果RabbitMQ服务器没有使用默认端口,我们可以使用setPort来设置端口;RabbitMQ的默认端口是15672:
factory.setPort(15678);
我们还可以设置用户名和密码:
factory.setUsername("user1");
factory.setPassword("MyPassword");
此外,我们将使用此连接来发布和消费消息。
4. 生产者
考虑一个简单的场景,其中Web应用程序允许用户向网站添加新产品。每当添加新产品时,我们都需要向客户发送电子邮件。
首先,我们定义一个队列:
channel.queueDeclare("products_queue", false, false, false, null);
每次用户添加新产品时,我们都会向队列发布一条消息:
String message = "product details";
channel.basicPublish("", "products_queue", null, message.getBytes());
最后,我们关闭通道和连接:
channel.close();
connection.close();
此消息将由另一个负责向客户发送电子邮件的服务消费。
5. 消费者
在消费者端的实现中,我们将声明相同的队列:
channel.queueDeclare("products_queue", false, false, false, null);
以下是我们如何定义异步处理队列消息的消费者:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
// process the message
}
};
channel.basicConsume("products_queue", true, consumer);
6. 总结
这篇简单的文章介绍了RabbitMQ的基本概念,并讨论了使用它的一个简单示例。
与往常一样,本教程的完整源代码可在GitHub上获得。