NSQ客户端

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。 它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

组件要求

  • php: >=5.3.0
  • ext-json: *
  • easyswoole/easyswoole: 3.x
  • easyswoole/http-client: ^1.2.5
  • easyswoole/pool: ^1.0
  • easyswoole/spl: ^1.1
  • monolog/monolog: ~1.0
  • react/react: >=0.2.1

安装方法

composer require easyswoole/nsq

仓库地址

easyswoole/nsq

基本使用

注册Nsq服务

  1. namespace EasySwoole\EasySwoole;
  2. use App\Producer\Process as ProducerProcess;
  3. use App\Consumer\Process as ConsumerProcess;
  4. use EasySwoole\EasySwoole\Swoole\EventRegister;
  5. use EasySwoole\EasySwoole\AbstractInterface\Event;
  6. use EasySwoole\Http\Request;
  7. use EasySwoole\Http\Response;
  8. class EasySwooleEvent implements Event
  9. {
  10. public static function initialize()
  11. {
  12. // TODO: Implement initialize() method.
  13. date_default_timezone_set('Asia/Shanghai');
  14. }
  15. public static function mainServerCreate(EventRegister $register)
  16. {
  17. // TODO: Implement mainServerCreate() method.
  18. // 生产者
  19. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ProducerProcess());
  20. // 消费者
  21. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess());
  22. }
  23. ......
  24. }

生产者

  1. namespace App\Producer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. class Process extends AbstractProcess
  4. {
  5. protected function run($arg)
  6. {
  7. go(function () {
  8. $config = new \EasySwoole\Nsq\Config();
  9. $topic = "topic.test";
  10. $nsqlookup = new \EasySwoole\Nsq\Lookup\Nsqlookupd($config->getNsqdUrl());
  11. $hosts = $nsqlookup->lookupHosts($topic);
  12. foreach ($hosts as $host) {
  13. $nsq = new \EasySwoole\Nsq\Nsq();
  14. for ($i = 0; $i < 10; $i++) {
  15. $msg = new \EasySwoole\Nsq\Message\Message();
  16. $msg->setPayload("test$i");
  17. $nsq->push(
  18. new \EasySwoole\Nsq\Connection\Producer($host, $config),
  19. $topic,
  20. $msg
  21. );
  22. }
  23. }
  24. });
  25. }
  26. }

消费者

  1. namespace App\Consumer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. class Process extends AbstractProcess
  4. {
  5. protected function run($arg)
  6. {
  7. go(function () {
  8. $topic = "topic.test";
  9. $config = new \EasySwoole\Nsq\Config();
  10. $nsqlookup = new \EasySwoole\Nsq\Lookup\Nsqlookupd($config->getNsqdUrl());
  11. $hosts = $nsqlookup->lookupHosts($topic);
  12. foreach ($hosts as $host) {
  13. $nsq = new \EasySwoole\Nsq\Nsq();
  14. $nsq->subscribe(
  15. new \EasySwoole\Nsq\Connection\Consumer($host, $config, $topic, 'test.consuming'),
  16. function ($item) {
  17. var_dump($item['message']);
  18. }
  19. );
  20. }
  21. });
  22. }
  23. }

附赠

  1. Nsq 集群部署 docker-compose.yml 一份,使用方式如下
    1. 保证4150,4151,4160,4161,4171端口未被占用(占用后可以修改compose文件中的端口号)
    2. 根目录下,docker-compose up -d
    3. 访问localhost:4171,可以查看Web版 nsqadmin 状态。

https://github.com/easy-swoole/nsq/blob/master/docker-compose.yml