JeroMQ 简介 | Baeldung
JeroMQ 简介 | Baeldung
在这篇文章中,我们将深入了解JeroMQ,这是ZeroMQ的一个纯Java实现。我们将看看它是什么,以及它在我们的应用程序中能为我们做些什么。
2. ZeroMQ 是什么?
ZeroMQ 是一个消息基础设施,它不需要任何实际的基础设施服务来设置。我们不需要像使用ActiveMQ或Kafka这样的实现中的单独消息代理。相反,我们应用程序中的ZeroMQ依赖项有能力为我们完成所有这些工作。
那么,我们能用这个做什么呢?我们可以实现我们通常想要的所有标准消息模式:
- 请求/响应
- 发布/订阅
- 同步与异步
- 等等
2.1. 套接字
ZeroMQ 使用套接字的概念。这些在概念上非常类似于我们在低级网络编程中使用的套接字。
所有套接字都有一个类型,我们将在本文中看到一些。然后它们要么监听来自其他套接字的连接,要么打开到其他套接字的连接。一旦一对套接字连接上,我们就准备好在它们之间发送消息了。注意,只有某些套接字组合可以一起使用,这取决于我们想要实现的确切目标。
JeroMQ 还支持套接字之间的几种不同的传输机制。例如,常见的包括:
- tcp://
<host>:<port>– 这使用TCP/IP网络在套接字之间发送消息。这可以允许套接字在不同的过程和不同的主机上,但也带来了网络所具有的一些可靠性问题。 - ipc://
<endpoint>– 这使用系统依赖机制在套接字之间发送消息。这允许套接字在不同的过程上,但它们必须在同一主机上,可能还有其他系统限制,定哪些进程可以通信。 - inproc://
<name>– 这允许在同一进程中的套接字之间进行通信。具体来说,它们必须在同一个JeroMQ上下文中。
传输的确切选择将取决于我们的需求。根据传输和套接字类型的确切情况,我们还可以使用它与其他ZeroMQ实现进行通信,包括使用其他语言的实现。
3. 开始使用
JeroMQ 是 ZeroMQ 的纯 Java 实现。让我们快速看看如何在应用程序中使用它。
3.1. 依赖项
首先,我们需要添加依赖项:
`<dependency>`
`<groupId>`org.zeromq`</groupId>`
`<artifactId>`jeromq`</artifactId>`
`<version>`0.5.3`</version>`
`</dependency>`
我们可以在 Maven 中央仓库中找到最新版本。
3.2. JeroMQ 上下文
在我们可以使用 JeroMQ 之前,我们需要设置一个上下文。这是一个 ZContext 类的实例,负责管理一切。
创建我们的上下文没有什么特别的——我们可以使用 new ZContext()。我们还必须确保正确关闭它——使用 close() 方法。这确保我们正确释放任何网络资源。
我们使用的实例必须至少在我们做的任何事情的生命周期内,所以我们需要确保它在应用程序开始时创建,直到结束时才关闭。
如果我们正在编写标准的Java应用程序,我们可以简单地使用 try-with-resources 模式。如果我们使用像Spring这样的框架,那么我们可以将其设置为具有配置销毁方法的bean。根据我们使用的框架需要的其他模式。
3.3. 创建套接字
一旦我们有了上下文,我们就可以使用它来创建套接字。这些套接字然后是我们所有消息传递的基础。
我们使用 ZContext.createSocket() 方法创建套接字,提供我们想要使用的套接字类型。完成此操作后,我们通常需要调用 ZMQ.Socket.bind() 来监听连接,或者调用 ZMQ.Socket.connect() 打开到另一个套接字的连接。
在这一点上,我们现在可以使用我们的套接字了。使用 send() 等方法发送消息,使用 recv() 等方法接收消息。
我们可以关闭我们的套接字以断开连接,当我们完成时。我们可以通过显式调用 Socket.close() 来做到这一点,或者通过关闭 ZContext 然后自动关闭所有从中创建的套接字。
注意,套接字不是线程安全的。我们可以在线程之间传递它们,但重要的是,一次只有一个线程访问它们。
4. 请求/响应消息传递
让我们从一个简单的请求/响应设置开始。我们首先需要一个服务器。这是监听传入连接的部分:
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
byte[] reply = socket.recv();
// 在这里做一些事情。
String response = "world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
在这里,我们创建了一个新的套接字,类型为 REP——代表回复。我们可以指示它开始在给定地址上监听,然后进入一个循环,在循环中我们从套接字接收下一条消息,对其进行一些处理,然后发送一个响应。
接下来,我们需要一个客户端。这是打开到服务器的连接的一方。这也是必须发送初始请求的一方——我们的服务器只能回复它收到的请求:
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.REQ);
socket.connect("tcp://localhost:5555");
String request = "Hello";
socket.send(request.getBytes(ZMQ.CHARSET), 0);
byte[] reply = socket.recv();
}
和之前一样,我们创建了一个新的套接字。只是这次,它是 REQ 类型——代表请求。然后我们指示它连接到另一个套接字的某个地方,然后发送消息并接收响应。
REQ 和 REP 之间的主要区别是它们被允许发送消息的时间。REQ 端可以随时发送消息,而 REP 端只能在收到消息后回复消息——因此是请求和响应。
4.1. 多个客户端
我们已经看到了如何让一个客户端向一个服务器发送消息。但如果我们想要有多个客户端呢?
好消息是,它就是可以工作。JeroMQ 将允许任意数量的客户端连接到同一个服务器地址,并且它会为我们处理所有的网络需求。
但是,这是如何工作的呢?在我们的服务器中没有说明要向哪个客户端发送响应。这是因为我们不需要它。JeroMQ 为我们跟踪了所有这些。当服务器调用 send() 时,消息将发送给我们最后收到消息的那个客户端。这使我们的代码不需要关心任何这些。
缺点是,我们的处理必须是完全单线程的。由于这种方式,我们在接收下一条消息之前必须完成一条消息的所有处理并发送回复。对于一些场景来说,这是可以的,但通常这将是一个很大的瓶颈。
4.2. 异步处理
如果我们想能够异步处理传入的请求并以无序的方式发送响应怎么办? 我们不能很容易地用 REQ/REP 设置做到这一点,因为每个响应都直接发送到最后收到的请求。
相反,我们可以使用一种不同类型的套接字——ROUTER。它的工作方式非常类似于 REP,只是它成为我们的责任来指示谁是消息的接收者。
让我们看看服务器组件:
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
String identity = broker.recvStr();
broker.recv(); // 信封分隔符
String message = broker.recvStr(0);
// 在这里做一些事情。
broker.sendMore(identity);
broker.sendMore("");
broker.send("Hello back");
}
这看起来非常相似,但并不完全相同。我们将套接字类型设置为 ROUTER 而不是 REP。这种套接字类型允许服务器通过知道他们的身份来将消息路由到特定的客户端。
当我们在这里接收消息时,我们实际上接收了三种不同的数据。首先,我们接收客户端的身份,然后是一个信封分隔符,然后是实际的消息。
同样地,当我们发送消息时,我们需要做同样的事情。我们发送消息的客户端的身份,然后是一个信封分隔符——可以是任何字符串——然后是实际的消息。
让我们看看客户端:
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.REQ);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
worker.send("Hello ");
String workload = worker.recvStr();
// 对响应做一些事情。
}
这几乎和我们之前的客户端一样。我们现在给客户端一个身份,这样服务器就知道哪个客户端是哪个。如果没有这个,服务器将无法将响应定向到正确的客户端。 除此之外,这和我们之前看到的一样因为篇幅限制,我将从上次停止的地方继续翻译:
由于我们的服务器现在可以指明消息是给哪个客户端的,我们可以突然同时处理多个请求——例如,使用一个执行器服务。唯一的要求是我们永远不要让多个线程同时访问套接字。
5. 发布/订阅消息传递
到目前为止,我们看到的情况是客户端发送了一个初始请求,然后服务器发送回一个响应。如果我们想让服务器只是广播事件,客户端可以消费怎么办?
我们可以使用发布/订阅模式来实现这一点。服务器将发布消息,订阅者将消费这些消息。那么这是什么样子的呢?
首先我们需要我们的发布者:
try (ZContext context = new ZContext()) {
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:5555");
// 等待某些事情发生。
pub.send("Hello");
}
这看起来非常简单,但这是因为JeroMQ为我们管理了大部分复杂性。我们所做的只是创建一个类型为 PUB 的套接字——发布的意思,监听连接,然后通过它发送一条消息。
接下来,我们需要一个订阅者:
try (ZContext context = new ZContext()) {
ZMQ.Socket sub = context.createSocket(SocketType.SUB);
sub.connect("tcp://localhost:5555");
sub.subscribe("".getBytes());
String message = sub.recvStr();
}
这稍微复杂一些,但仍然不是很多。在这里我们创建了一个类型为 SUB 的套接字——订阅的意思,并将其连接到我们的发布者。然后我们需要订阅消息。这需要一组字节,作为所有传入消息的前缀——或者为空集合的字节,以订阅所有消息。
一旦我们这样做了,我们就可以接收消息。我们接收到任何适当的由发布者发送的消息。注意,我们只能接收到我们在订阅后发送的消息——在那之前发送的任何内容都将丢失。
5.1. 多个客户端
和之前一样,如果我们想要有多个客户端,我们可以这样做。每个连接的订阅者都会收到所有适当的由发布者发送的消息,这意味着这充当了多播——例如,类似于JMS主题,而不是JMS队列。
我们也可以有不同的客户端有不同的订阅。这意味着他们每个人都只得到广播消息的一个适当的子集。所有这些工作都完全符合我们的预期,不需要我们额外的努力。
5.2. 异步处理
我们在这里遇到的一个问题是 recv() 方法会阻塞直到有消息可用。 如果我们的订阅者只是永远等待这个套接字的消息,然后对它们做出反应,那没问题。但是,如果我们想要我们的订阅者在做其他事情——例如,等待多个套接字——那么这就不行了。
我们使用的 recv() 或 recvStr() 方法有一个替代签名,允许提供一些标志。如果提供了标志 ZMQ.DONTWAIT,这将导致方法立即返回而不是阻塞。如果没有消息准备好读取,那么它将返回 null。
这将允许我们轮询套接字,看看是否有消息在等待,如果有,就处理它,如果没有,那么就在间歇期间做其他事情。
6. 结论
在这里,我们对使用JeroMQ可以实现的内容进行了非常简短的介绍。 然而,我们可以利用它做更多的事情,而不仅仅是我们在这里所涵盖的。下次你需要在应用程序中进行任何形式的消息传递时,为什么不尝试一下呢?
和往常一样,我们可以在 GitHub 上找到本文的所有代码。
OK