kafka

本例子以https://github.com/weiboad/kafka-php作为客户端。使用composer安装时请先看EasySwoole文档中自动加载的章节,
为EasySwoole引入composer。

如何在EasySwoole中添加自定义阻塞进程

EasySwoole支持在beforeWorker事件中添加自定义进程参与swoole底层的事件循环,具体实例代码为:

  1. $server->addProcess(new \swoole_process(function (){
  2. while(1){
  3. }
  4. }));

kafka消费者

  1. $server->addProcess(new \swoole_process(function (){
  2. $config = \Kafka\ConsumerConfig::getInstance();
  3. $config->setMetadataRefreshIntervalMs(10000);
  4. $config->setMetadataBrokerList('0.0.0.0:9092');
  5. $config->setGroupId('test');
  6. $config->setBrokerVersion('0.9.0.1');
  7. $config->setTopics(array('test'));
  8. $consumer = new \Kafka\Consumer();
  9. $consumer->start(function($topic, $part, $message) {
  10. Logger::getInstance()->log($message);
  11. });
  12. }));

我们向该topic发生一个消息,可见

  1. array(3) {
  2. ["offset"]=>
  3. int(0)
  4. ["size"]=>
  5. int(19)
  6. ["message"]=>
  7. array(6) {
  8. ["crc"]=>
  9. int(2275900082)
  10. ["magic"]=>
  11. int(0)
  12. ["attr"]=>
  13. int(0)
  14. ["timestamp"]=>
  15. int(0)
  16. ["key"]=>
  17. string(0) ""
  18. ["value"]=>
  19. string(5) "hello"
  20. }
  21. }

以上例子仅为示例,具体使用请见kafka-php文档