EasySwoole Tcp服务

tcp 服务以及tcp客户端 demo:https://github.com/easy-swoole/demo/tree/3.x-subtcp

创建tcp服务

通过EasySwooleEvent.php文件的mainServerCreate 事件,进行添加子服务监听,例如:

  1. <?php
  2. public static function mainServerCreate(EventRegister $register)
  3. {
  4. $server = ServerManager::getInstance()->getSwooleServer();
  5. ################# tcp 服务器1 没有处理粘包 #####################
  6. $subPort1 = $server->addlistener('0.0.0.0', 9502, SWOOLE_TCP);
  7. $subPort1->set(
  8. [
  9. 'open_length_check' => false,//不验证数据包
  10. ]
  11. );
  12. $subPort1->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
  13. echo "tcp服务1 fd:{$fd} 已连接\n";
  14. $str = '恭喜你连接成功服务器1';
  15. $server->send($fd, $str);
  16. });
  17. $subPort1->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
  18. echo "tcp服务1 fd:{$fd} 已关闭\n";
  19. });
  20. $subPort1->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) {
  21. echo "tcp服务1 fd:{$fd} 发送消息:{$data}\n";
  22. });
  23. }

粘包问题

由于tcp的特性,可能会出现数据粘包情况,例如

  • A连接Server
  • A发送 hello
  • A又发送了一条 hello
  • Server可能会一次性收到一条”hellohello”的数据
  • Server也可能收到”he” ,”llohello”类似这样的中断数据

粘包解决

  • 通过标识EOF,例如http协议,通过\r\n\r\n 的方式去表示该数据已经完结,我们可以自定义一个协议,例如当接收到 “结尾666” 字符串时,代表该字符串已经结束,如果没有获取到,则存入缓冲区,等待结尾字符串,或者如果获取到多条,则通过该字符串剪切出其他数据
  • 定义消息头,通过特定长度的消息头进行获取,例如我们定义一个协议,前面10位字符串都代表着之后数据主体的长度,那么我们传输数据时,只需要000000000512346(前10位为协议头,表示了这条数据的大小,后面的为数据),每次我们读取只先读取10位,获取到消息长度,再读取消息长度那么多的数据,这样就可以保证数据的完整性了.(但是为了不被混淆,协议头也得像EOF一样标识)
  • 通过pack二进制处理,相当于于方法2,将数据通过二进制封装拼接进消息中,通过验证二进制数据去读取信息,sw采用的就是这种方式

可查看swoole官方文档:https://wiki.swoole.com/wiki/page/287.html

实现粘包处理

服务端:

  1. <?php
  2. ################# tcp 服务器2 没有处理粘包 #####################
  3. $subPort2 = $server->addlistener('0.0.0.0', 9503, SWOOLE_TCP);
  4. $subPort2->set(
  5. [
  6. 'open_length_check' => true,
  7. 'package_max_length' => 81920,
  8. 'package_length_type' => 'N',
  9. 'package_length_offset' => 0,
  10. 'package_body_offset' => 4,
  11. ]
  12. );
  13. $subPort2->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
  14. echo "tcp服务2 fd:{$fd} 已连接\n";
  15. $str = '恭喜你连接成功服务器2';
  16. $server->send($fd, pack('N', strlen($str)) . $str);
  17. });
  18. $subPort2->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
  19. echo "tcp服务2 fd:{$fd} 已关闭\n";
  20. });
  21. $subPort2->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) {
  22. echo "tcp服务2 fd:{$fd} 发送原始消息:{$data}\n";
  23. echo "tcp服务2 fd:{$fd} 发送消息:" . substr($data, '4') . "\n";
  24. });

客户端:

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Tioncico
  5. * Date: 2019/3/6 0006
  6. * Time: 16:22
  7. */
  8. include "../vendor/autoload.php";
  9. define('EASYSWOOLE_ROOT', realpath(dirname(getcwd())));
  10. \EasySwoole\EasySwoole\Core::getInstance()->initialize();
  11. /**
  12. * tcp 客户端2,验证数据包,并处理粘包
  13. */
  14. go(function () {
  15. $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
  16. $client->set(
  17. [
  18. 'open_length_check' => true,
  19. 'package_max_length' => 81920,
  20. 'package_length_type' => 'N',
  21. 'package_length_offset' => 0,
  22. 'package_body_offset' => 4,
  23. ]
  24. );
  25. if (!$client->connect('127.0.0.1', 9503, 0.5)) {
  26. exit("connect failed. Error: {$client->errCode}\n");
  27. }
  28. $str = 'hello world';
  29. $client->send(encode($str));
  30. $data = $client->recv();//服务器已经做了pack处理
  31. var_dump($data);//未处理数据,前面有4 (因为pack 类型为N)个字节的pack
  32. $data = decode($data);//需要自己剪切解析数据
  33. var_dump($data);
  34. // $client->close();
  35. });
  36. /**
  37. * 数据包 pack处理
  38. * encode
  39. * @param $str
  40. * @return string
  41. * @author Tioncico
  42. * Time: 9:50
  43. */
  44. function encode($str)
  45. {
  46. return pack('N', strlen($str)) . $str;
  47. }
  48. function decode($str)
  49. {
  50. $data = substr($str, '4');
  51. return $data;
  52. }

tcp控制器实现

协议规则与解析

在本文档中,传输json数据 使用pack N进行二进制处理,json数据有3个参数,例如:

  1. {"controller":"Index","action":"index","param":{"name":"\u4ed9\u58eb\u53ef"}}

实现解析器Parser.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Tioncico
  5. * Date: 2018/10/17 0017
  6. * Time: 9:10
  7. */
  8. namespace App\TcpController;
  9. use EasySwoole\Socket\Bean\Caller;
  10. use EasySwoole\Socket\Bean\Response;
  11. use EasySwoole\Socket\AbstractInterface\ParserInterface;
  12. use EasySwoole\Utility\CommandLine;
  13. class Parser implements ParserInterface
  14. {
  15. public function decode($raw, $client): ?Caller
  16. {
  17. $data = substr($raw, '4');
  18. //为了方便,我们将json字符串作为协议标准
  19. $data = json_decode($data, true);
  20. $bean = new Caller();
  21. $controller = !empty($data['controller']) ? $data['controller'] : 'Index';
  22. $action = !empty($data['action']) ? $data['action'] : 'index';
  23. $param = !empty($data['param']) ? $data['param'] : [];
  24. $controller = "App\\TcpController\\{$controller}";
  25. $bean->setControllerClass($controller);
  26. $bean->setAction($action);
  27. $bean->setArgs($param);
  28. return $bean;
  29. }
  30. /**
  31. * 只处理pack,json交给控制器
  32. * encode
  33. * @param Response $response
  34. * @param $client
  35. * @return string|null
  36. * @author Tioncico
  37. * Time: 10:33
  38. */
  39. public function encode(Response $response, $client): ?string
  40. {
  41. return pack('N', strlen($response->getMessage())) . $response->getMessage();
  42. }
  43. }

实现控制器Index.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Tioncico
  5. * Date: 2018/10/17 0017
  6. * Time: 9:15
  7. */
  8. namespace App\TcpController;
  9. use App\Rpc\RpcServer;
  10. use EasySwoole\EasySwoole\ServerManager;
  11. use EasySwoole\EasySwoole\Swoole\Task\TaskManager;
  12. use EasySwoole\Socket\AbstractInterface\Controller;
  13. use http\Env\Response;
  14. class Index extends Controller{
  15. function actionNotFound(?string $actionName)
  16. {
  17. $this->response()->setMessage("{$actionName} not found \n");
  18. }
  19. public function index(){
  20. $this->response()->setMessage(time());
  21. }
  22. public function args()
  23. {
  24. $this->response()->setMessage('your args is:'.json_encode($this->caller()->getArgs()).PHP_EOL);
  25. }
  26. public function delay()
  27. {
  28. $client = $this->caller()->getClient();
  29. TaskManager::async(function ()use($client){
  30. sleep(1);
  31. ServerManager::getInstance()->getSwooleServer()->send($client->getFd(),'this is delay message at '.time());
  32. });
  33. }
  34. public function close()
  35. {
  36. $this->response()->setMessage('you are goging to close');
  37. $client = $this->caller()->getClient();
  38. TaskManager::async(function ()use($client){
  39. sleep(2);
  40. ServerManager::getInstance()->getSwooleServer()->send($client->getFd(),'this is delay message at '.time());
  41. });
  42. }
  43. public function who()
  44. {
  45. $this->response()->setMessage('you fd is '.$this->caller()->getClient()->getFd());
  46. }
  47. }

开启子服务

EasySwooleEvent中注册。

  1. <?php
  2. public static function mainServerCreate(EventRegister $register)
  3. {
  4. ############# tcp 服务器3 tcp控制器实现+处理粘包############
  5. $subPort3 = $server->addListener(Config::getInstance()->getConf('MAIN_SERVER.LISTEN_ADDRESS'), 9504, SWOOLE_TCP);
  6. $socketConfig = new \EasySwoole\Socket\Config();
  7. $socketConfig->setType($socketConfig::TCP);
  8. $socketConfig->setParser(new \App\TcpController\Parser());
  9. //设置解析异常时的回调,默认将抛出异常到服务器
  10. $socketConfig->setOnExceptionHandler(function ($server, $throwable, $raw, $client, $response) {
  11. echo "tcp服务3 fd:{$client->getFd()} 发送数据异常 \n";
  12. $server->close($client->getFd());
  13. });
  14. $dispatch = new \EasySwoole\Socket\Dispatcher($socketConfig);
  15. $subPort3->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) use ($dispatch) {
  16. echo "tcp服务3 fd:{$fd} 发送消息:{$data}\n";
  17. $dispatch->dispatch($server, $data, $fd, $reactor_id);
  18. });
  19. $subPort3->set(
  20. [
  21. 'open_length_check' => true,
  22. 'package_max_length' => 81920,
  23. 'package_length_type' => 'N',
  24. 'package_length_offset' => 0,
  25. 'package_body_offset' => 4,
  26. ]
  27. );
  28. $subPort3->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
  29. echo "tcp服务3 fd:{$fd} 已连接\n";
  30. });
  31. $subPort3->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
  32. echo "tcp服务3 fd:{$fd} 已关闭\n";
  33. });
  34. }

测试客户端

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Tioncico
  5. * Date: 2019/3/6 0006
  6. * Time: 16:22
  7. */
  8. include "../vendor/autoload.php";
  9. define('EASYSWOOLE_ROOT', realpath(dirname(getcwd())));
  10. \EasySwoole\EasySwoole\Core::getInstance()->initialize();
  11. /**
  12. * tcp 客户端3,验证数据包处理粘包 以及转发到控制器写法
  13. */
  14. go(function () {
  15. $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
  16. $client->set(
  17. [
  18. 'open_length_check' => true,
  19. 'package_max_length' => 81920,
  20. 'package_length_type' => 'N',
  21. 'package_length_offset' => 0,
  22. 'package_body_offset' => 4,
  23. ]
  24. );
  25. if (!$client->connect('127.0.0.1', 9504, 0.5)) {
  26. exit("connect failed. Error: {$client->errCode}\n");
  27. }
  28. $data = [
  29. 'controller' => 'Index',
  30. 'action' => 'index',
  31. 'param' => [
  32. 'name' => '仙士可'
  33. ],
  34. ];
  35. $str = json_encode($data);
  36. var_dump($str);
  37. $client->send(encode($str));
  38. $data = $client->recv();//服务器已经做了pack处理
  39. $data = decode($data);//需要自己剪切解析数据
  40. echo "服务端回复: $data \n";
  41. $data = [
  42. 'controller' => 'Index',
  43. 'action' => 'args',
  44. 'param' => [
  45. 'name' => '仙士可'
  46. ],
  47. ];
  48. $str = json_encode($data);
  49. $client->send(encode($str));
  50. $data = $client->recv();//服务器已经做了pack处理
  51. $data = decode($data);//需要自己剪切解析数据
  52. echo "服务端回复: $data \n";
  53. // $client->close();
  54. });
  55. /**
  56. * 数据包 pack处理
  57. * encode
  58. * @param $str
  59. * @return string
  60. * @author Tioncico
  61. * Time: 9:50
  62. */
  63. function encode($str)
  64. {
  65. return pack('N', strlen($str)) . $str;
  66. }
  67. function decode($str)
  68. {
  69. $data = substr($str, '4');
  70. return $data;
  71. }

HTTP往TCP推送

HTTP控制器

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Apple
  5. * Date: 2018/11/1 0001
  6. * Time: 11:10
  7. */
  8. namespace App\HttpController;
  9. use EasySwoole\EasySwoole\ServerManager;
  10. use EasySwoole\Http\AbstractInterface\Controller;
  11. class Index extends Controller
  12. {
  13. function index()
  14. {
  15. // TODO: Implement index() method.
  16. }
  17. function push(){
  18. $fd = intval($this->request()->getRequestParam('fd'));
  19. $info = ServerManager::getInstance()->getSwooleServer()->connection_info($fd);
  20. if(is_array($info)){
  21. ServerManager::getInstance()->getSwooleServer()->send($fd,'push in http at '.time());
  22. }else{
  23. $this->response()->write("fd {$fd} not exist");
  24. }
  25. }
  26. }

实际生产中,一般是用户TCP连接上来后,做验证,然后以userName=>fd的格式,存在redis中,需要http,或者是其他地方,比如定时器往某个连接推送的时候,就是以userName去redis中取得对应的fd,再send。注意,通过addServer形式创建的子服务器,以再完全注册自己的网络事件,你可以注册onclose事件,然后在连接断开的时候,删除userName=>fd对应。