Nsq

Nsq 是一个开源、轻量级、高性能的分布式消息中间件, 使用 go 语言实现

使用

配置

  1. return [
  2. 'default' => [
  3. 'host' => '127.0.0.1',
  4. 'port' => 4150,
  5. 'pool' => [
  6. 'min_connections' => 1,
  7. 'max_connections' => 10,
  8. 'connect_timeout' => 10.0,
  9. 'wait_timeout' => 3.0,
  10. 'heartbeat' => -1,
  11. 'max_idle_time' => 60.0,
  12. ],
  13. ],
  14. ];

创建消费者

  1. $ php bin/hyperf.php gen:nsq-consumer DemoConsumer

使用 \Hyperf\Nsq\Annotation\Consumer 注解可以是设置 topic / channel / name / nums, 使用 $pool 属性可以切换不同连接

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Nsq\Consumer;
  4. use Hyperf\Nsq\AbstractConsumer;
  5. use Hyperf\Nsq\Annotation\Consumer;
  6. use Hyperf\Nsq\Message;
  7. use Hyperf\Nsq\Result;
  8. /**
  9. * @Consumer(
  10. * topic="hyperf",
  11. * channel="hyperf",
  12. * name ="TestNsqConsumer",
  13. * nums=1
  14. * )
  15. */
  16. class TestNsqConsumer extends AbstractConsumer
  17. {
  18. public function consume(Message $payload): string
  19. {
  20. var_dump($payload->getBody());
  21. return Result::ACK;
  22. }
  23. }

投递消息

使用 \Hyperf\Nsq\Nsq::publish() 投递消息, 同样可以使用 $pool 属性来切换不同连接

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Command;
  4. use Hyperf\Command\Command as HyperfCommand;
  5. use Hyperf\Command\Annotation\Command;
  6. use Hyperf\Nsq\Nsq;
  7. /**
  8. * @Command
  9. */
  10. class NsqCommand extends HyperfCommand
  11. {
  12. protected $name = 'nsq:pub';
  13. public function handle()
  14. {
  15. /** @var Nsq $nsq */
  16. $nsq = make(Nsq::class); // 可以设置 `$pool` 属性
  17. $nsq->publish('hyperf', 'test'. time());
  18. $this->line('nsq pub success', 'info');
  19. }
  20. }

Nsq 协议

https://nsq.io/clients/tcp_protocol_spec.html

  • socket 基础
@startuml

autonumber
hide footbox
title **socket 基础**

participant "客户端" as client
participant "服务器" as server #orange

activate client
activate server

note right of server: 建立连接
client -> server: socket->connect(ip, port)

...
note right of server: 多次通信 send/recv
client -> server: socket->send()
server-> client: socket->recv()
...

note right of server: 关闭连接
client->server: socket->close()

deactivate client
deactivate server

@enduml
  • Nsq 协议流程
@startuml

autonumber
hide footbox
title **Nsq 协议**

participant "客户端" as client
participant "服务器" as server #orange

activate client
activate server

== connect ==
note left of client: connect 后都为 socket->send/recv
client -> server: socket->connect(ip, host)
note left of client: protocol version
client->server: magic: V2

== auth ==
note left of client: client metadata
client->server: IDENTIFY
note right of server: 如果需要 auth
server->client: auth_required=true
client->server: AUTH
...

== pub ==
note left of client: 发送一条消息
client -> server: PUB <topic_name>
note left of client: 发送多条消息
client -> server: MPUB
note left of client: 发送一条延时消息
client -> server: DPUB
...

== sub ==
note left of client: client 使用 channel 订阅 topic
note right of server: SUB 成功后, client 出于 RDY 0 阶段
client -> server: SUB <topic_name> <channel_name>
note left of client: 使用 RDY 告诉 server 准备好消费 <count> 条消息
client -> server: RDY <count>
note right of server: server 返回 client <count> 条消息
server -> client: <count> msg
note left of client: 标记消息完成消费(消费成功)
client -> server: FIN <message_id>
note left of client: 消息重新入队(消费失败, 重新入队)
client -> server: REQ <message_id> <timeout>
note left of client: 重置消息超时时间
client -> server: TOUCH <message_id>
...

== heartbeat ==
server -> client: _heartbeat_
note right of server: client 2 次没有应答 NOP, server 将断开连接
client -> server: NOP
...

== close ==
note left of client: clean close connection, 表示没有消息了, 关闭连接
client -> server: CLS
note right of server: server 端成功应答
server -> client: CLOSE_WAIT

deactivate client
deactivate server

@enduml