AMQP组件

hyperf/amqp 是实现 AMQP 标准的组件,主要适用于对 RabbitMQ 的使用。

安装

  1. composer require hyperf/amqp

默认配置

  1. <?php
  2. return [
  3. 'default' => [
  4. 'host' => 'localhost',
  5. 'port' => 5672,
  6. 'user' => 'guest',
  7. 'password' => 'guest',
  8. 'vhost' => '/',
  9. 'pool' => [
  10. 'min_connections' => 1,
  11. 'max_connections' => 10,
  12. 'connect_timeout' => 10.0,
  13. 'wait_timeout' => 3.0,
  14. 'heartbeat' => -1,
  15. ],
  16. 'params' => [
  17. 'insist' => false,
  18. 'login_method' => 'AMQPLAIN',
  19. 'login_response' => null,
  20. 'locale' => 'en_US',
  21. 'connection_timeout' => 3.0,
  22. 'read_write_timeout' => 3.0,
  23. 'context' => null,
  24. 'keepalive' => false,
  25. 'heartbeat' => 0,
  26. ],
  27. ],
  28. ];

投递消息

使用 generator 工具新建一个 producer

  1. php bin/hyperf.php gen:amqp-producer DemoProducer

在DemoProducer文件中,我们可以修改Producer注解对应的字段来替换对应的 exchangeroutingKey。其中 payload 就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。示例如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Producers;
  4. use Hyperf\Amqp\Annotation\Producer;
  5. use Hyperf\Amqp\Message\ProducerMessage;
  6. use App\Models\User;
  7. /**
  8. * DemoProducer
  9. * @Producer(exchange="hyperf", routingKey="hyperf")
  10. */
  11. class DemoProducer extends ProducerMessage
  12. {
  13. public function __construct($id)
  14. {
  15. $user = User::where('id', $id)->first();
  16. $this->payload = [
  17. 'id' => $id,
  18. 'data' => $user->toArray()
  19. ];
  20. }
  21. }

通过container获取Producer实例,即可投递消息。以下实例直接使用ApplicationContext获取Producer其实并不合理,container具体使用请到di模块中查看。

  1. <?php
  2. use Hyperf\Amqp\Producer;
  3. use App\Amqp\Producers\DemoProducer;
  4. use Hyperf\Utils\ApplicationContext;
  5. $message = new DemoProducer(1);
  6. $producer = ApplicationContext::getContainer()->get(Producer::class);
  7. $result = $producer->produce($message);

消费消息

使用 generator 工具新建一个 consumer

  1. php bin/hyperf.php gen:amqp-consumer DemoConsumer

在DemoConsumer文件中,我们可以修改Consumer注解对应的字段来替换对应的 exchangeroutingKeyqueue。其中 $data 就是解析后的元数据。示例如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumers;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. /**
  8. * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
  9. */
  10. class DemoConsumer extends ConsumerMessage
  11. {
  12. public function consume($data): string
  13. {
  14. print_r($data);
  15. return Result::ACK;
  16. }
  17. }

框架会根据Consumer注解自动创建Process进程,进程意外退出后会被重新拉起。