Tars的push功能

环境和背景

但是在实际的应用场景中,需要在TARS服务框架中支持其他服务端到客户端的push模式

具体程序示例,参见examples/pushDemo/.

push模式的流程图

下面是push模式的示意图

Tars的push功能 - 图1

  • 黑色线代表了数据流向:数据(客户端)-〉请求包的编码器(客户端)-〉协议解析器(服务端)-〉doRequest协议处理器(服务端)-〉生成返回数据(服务端)-〉响应包的解码器(客户端)-〉响应数据(客户端)
  • 黄色线代表客户端访问服务端
  • 蓝色线代表服务端向客户端push消息
  • 其中请求包的编码器(客户端)负责对客户端发送的数据进行打包编码,协议解析器(服务端)负责对收到的数据进行解包并交给*
  • 协议处理器(服务端)去处理并生成返回数据,而响应包的解码器(客户端)负责对返回的数据进行解码。

Tars中实现服务端到客户端的push模式:

  • 对于服务端,首先服务端需要按照开发第三方协议的模式(即非tars协议),实现协议包的解析器,并将其加载到服务中,同时需要建立一个非TARS框架的服务对象,该类继续继承TARS框架的Servant类,通过重载Servant类中的doRequest方法建立客户端和服务端之间的协议处理器,同时在该方法中保存连接到服务端的客户信息,以便服务端向客户端push消息,另外需要重载Servant类中的doClose方法,在服务器得知客户关闭连接后,释放doRequest方法中保存的客户信息,这样就可以不需要对该客户进行push消息。另外,服务端需要建立一个专门用于向客户端push消息的线程。
  • 对应客户端,首先要按照第三方协议的模式,实现协议包的编解码函数,并将其设置到相应的ServantProxy代理的协议解析器中,通过ServantProxy类tars_set_protocol方法实现;然后需要自定义一个回调类,该类继承ServantProxyCallback类,(因为服务端push消息给客户端时,客户端收到消息是异步的,所以客户端对消息的处理以异步方法进行),同时需要重载其中的onDispatch方法,在该方法中,对客户端和服务端之间定义的协议进行解析处理;最后需要new一个上面自定义的回调类,然后将其作为参数传入到ServantProxy的tars_set_push_callback方法中。另外,客户端需要定期的发送消息给服务端(相当于心跳包),以便告诉服务端,客户端是存活的(因为服务端在一定时间内没收到来自客户端的消息,会自动关闭其之间的连接)。 另外,在服务端与客户端push模式交互之前,客户端要访问服务,需要通过调用ServantProxy类的rpc相关函数。

服务端功能的实现

服务端实现概述

首先我们按照第三方协议代码部署一个TestPushServant 服务 如下图所示在管理平台部署一个服务端

Tars的push功能 - 图2

参考tars 支持第三方协议

  • 其中TestPushServer类的initialize( ) 加载服务对象TestPushServantImp,并设置第三方协议解析器parse,这里解析器不做任何处理,把接收到的数据包原封不动的传给服务对象去处理(但通常情况下,要对数据进行解析后才交给服务对象去处理),
  • 而TestPushServantImp重载继承自Servant类的doRequest方法,该方法为第三方服务的协议处理器,该处理器负责处理协议解析器传送给其的数据,并负责生成返回给客户端的response(本服务为echo服务,因此直接让response等于收到的数据包),同时保存客户的信息状态,以便让pushThread线程对客户进行push消息;
  • 另外TestPushServantImp重载继承自Servant类的doClose方法,用于客户关闭连接或者连接超时后清除保存相关的客户信息。

服务端代码实现

TestPushServantImp.h

  1. #ifndef _TestPushServantImp_H_
  2. #define _TestPushServantImp_H_
  3. #include "servant/Application.h"
  4. //#include "TestPushServant.h"
  5. /**
  6. *
  7. *
  8. */
  9. class TestPushServantImp : public tars::Servant
  10. {
  11. public:
  12. /**
  13. *
  14. */
  15. virtual ~TestPushServantImp() {}
  16. /**
  17. *
  18. */
  19. virtual void initialize();
  20. /**
  21. *
  22. */
  23. virtual void destroy();
  24. /**
  25. *
  26. */
  27. virtual int test(tars::TarsCurrentPtr current) { return 0;};
  28. //重载Servant的doRequest方法
  29. int doRequest(tars::TarsCurrentPtr current, vector<char>& response);
  30. //重载Servant的doClose方法
  31. int doClose(tars::TarsCurrentPtr current);
  32. };
  33. /////////////////////////////////////////////////////
  34. #endif

TestPushServantImp.cpp

  1. #include "TestPushServantImp.h"
  2. #include "servant/Application.h"
  3. #include "TestPushThread.h"
  4. using namespace std;
  5. //////////////////////////////////////////////////////
  6. void TestPushServantImp::initialize()
  7. {
  8. //initialize servant here:
  9. //...
  10. }
  11. //////////////////////////////////////////////////////
  12. void TestPushServantImp::destroy()
  13. {
  14. //destroy servant here:
  15. //...
  16. }
  17. int TestPushServantImp::doRequest(tars::TarsCurrentPtr current, vector<char>& response)
  18. {
  19. //保存客户端的信息,以便对客户端进行push消息
  20. (PushUser::mapMutex).lock();
  21. map<string, TarsCurrentPtr>::iterator it = PushUser::pushUser.find(current->getIp());
  22. if(it == PushUser::pushUser.end())
  23. {
  24. PushUser::pushUser.insert(map<string, TarsCurrentPtr>::value_type(current->getIp(), current));
  25. LOG->debug() << "connect ip: " << current->getIp() << endl;
  26. }
  27. (PushUser::mapMutex).unlock();
  28. //返回给客户端它自己请求的数据包,即原包返回
  29. const vector<char>& request = current->getRequestBuffer();
  30. response = request;
  31. return 0;
  32. }
  33. //客户端关闭到服务端的连接,或者服务端发现客户端长时间未发送包过来,然后超过60s就关闭连接
  34. //调用的方法
  35. int TestPushServantImp::doClose(TarsCurrentPtr current)
  36. {
  37. (PushUser::mapMutex).lock();
  38. map<string, TarsCurrentPtr>::iterator it = PushUser::pushUser.find(current->getIp());
  39. if(it != PushUser::pushUser.end())
  40. {
  41. PushUser::pushUser.erase(it);
  42. LOG->debug() << "close ip: " << current->getIp() << endl;
  43. }
  44. (PushUser::mapMutex).unlock();
  45. return 0;
  46. }

TestPushThread.h

  1. #ifndef __TEST_PUSH_THREAD_H
  2. #define __TEST_PUSH_THREAD_H
  3. #include "servant/Application.h"
  4. class PushUser
  5. {
  6. public:
  7. static map<string, TarsCurrentPtr> pushUser;
  8. static TC_ThreadMutex mapMutex;
  9. };
  10. class PushInfoThread : public TC_Thread, public TC_ThreadLock
  11. {
  12. public:
  13. PushInfoThread():_bTerminate(false),_tLastPushTime(0),_tInterval(10),_iId(0){}
  14. virtual void run();
  15. void terminate();
  16. void setPushInfo(const string &sInfo);
  17. private:
  18. bool _bTerminate;
  19. time_t _tLastPushTime;
  20. time_t _tInterval;
  21. unsigned int _iId;
  22. string _sPushInfo;
  23. };
  24. #endif

TestPushThread.cpp

  1. #include "TestPushThread.h"
  2. #include <arpa/inet.h>
  3. map<string, TarsCurrentPtr> PushUser::pushUser;
  4. TC_ThreadMutex PushUser::mapMutex;
  5. void PushInfoThread::terminate(void)
  6. {
  7. _bTerminate = true;
  8. {
  9. tars::TC_ThreadLock::Lock sync(*this);
  10. notifyAll();
  11. }
  12. }
  13. void PushInfoThread::setPushInfo(const string &sInfo)
  14. {
  15. unsigned int iBuffLength = htonl(sInfo.size()+8);
  16. unsigned char * pBuff = (unsigned char*)(&iBuffLength);
  17. _sPushInfo = "";
  18. for (int i = 0; i<4; ++i)
  19. {
  20. _sPushInfo += *pBuff++;
  21. }
  22. unsigned int iRequestId = htonl(_iId);
  23. unsigned char * pRequestId = (unsigned char*)(&iRequestId);
  24. for (int i = 0; i<4; ++i)
  25. {
  26. _sPushInfo += *pRequestId++;
  27. }
  28. _sPushInfo += sInfo;
  29. }
  30. //定期向客户push消息
  31. void PushInfoThread::run(void)
  32. {
  33. time_t iNow;
  34. setPushInfo("hello world");
  35. while (!_bTerminate)
  36. {
  37. iNow = TC_TimeProvider::getInstance()->getNow();
  38. if(iNow - _tLastPushTime > _tInterval)
  39. {
  40. _tLastPushTime = iNow;
  41. (PushUser::mapMutex).lock();
  42. for(map<string, TarsCurrentPtr>::iterator it = (PushUser::pushUser).begin(); it != (PushUser::pushUser).end(); ++it)
  43. {
  44. (it->second)->sendResponse(_sPushInfo.c_str(), _sPushInfo.size());
  45. LOG->debug() << "sendResponse: " << _sPushInfo.size() <<endl;
  46. }
  47. (PushUser::mapMutex).unlock();
  48. }
  49. {
  50. TC_ThreadLock::Lock sync(*this);
  51. timedWait(1000);
  52. }
  53. }
  54. }

TestPushServer.h

  1. #ifndef _TestPushServer_H_
  2. #define _TestPushServer_H_
  3. #include <iostream>
  4. #include "servant/Application.h"
  5. #include "TestPushThread.h"
  6. using namespace tars;
  7. /**
  8. *
  9. **/
  10. class TestPushServer : public Application
  11. {
  12. public:
  13. /**
  14. *
  15. **/
  16. virtual ~TestPushServer() {};
  17. /**
  18. *
  19. **/
  20. virtual void initialize();
  21. /**
  22. *
  23. **/
  24. virtual void destroyApp();
  25. private:
  26. //用于push消息的线程
  27. PushInfoThread pushThread;
  28. };
  29. extern TestPushServer g_app;
  30. ////////////////////////////////////////////
  31. #endif

TestPushServer.cpp

  1. #include "TestPushServer.h"
  2. #include "TestPushServantImp.h"
  3. using namespace std;
  4. TestPushServer g_app;
  5. /////////////////////////////////////////////////////////////////
  6. static int parse(string &in, string &out)
  7. {
  8. if(in.length() < sizeof(unsigned int))
  9. {
  10. return TC_EpollServer::PACKET_LESS;
  11. }
  12. unsigned int iHeaderLen;
  13. memcpy(&iHeaderLen, in.c_str(), sizeof(unsigned int));
  14. iHeaderLen = ntohl(iHeaderLen);
  15. if(iHeaderLen < (unsigned int)(sizeof(unsigned int))|| iHeaderLen > 1000000)
  16. {
  17. return TC_EpollServer::PACKET_ERR;
  18. }
  19. if((unsigned int)in.length() < iHeaderLen)
  20. {
  21. return TC_EpollServer::PACKET_LESS;
  22. }
  23. out = in.substr(0, iHeaderLen);
  24. in = in.substr(iHeaderLen);
  25. return TC_EpollServer::PACKET_FULL;
  26. }
  27. void
  28. TestPushServer::initialize()
  29. {
  30. //initialize application here:
  31. //...
  32. addServant<TestPushServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".TestPushServantObj");
  33. addServantProtocol("Test.TestPushServer.TestPushServantObj", parse);
  34. pushThread.start();
  35. }
  36. /////////////////////////////////////////////////////////////////
  37. void
  38. TestPushServer::destroyApp()
  39. {
  40. //destroy application here:
  41. //...
  42. pushThread.terminate();
  43. pushThread.getThreadControl().join();
  44. }
  45. /////////////////////////////////////////////////////////////////
  46. int
  47. main(int argc, char* argv[])
  48. {
  49. try
  50. {
  51. g_app.main(argc, argv);
  52. g_app.waitForShutdown();
  53. }
  54. catch (std::exception& e)
  55. {
  56. cerr << "std::exception:" << e.what() << std::endl;
  57. }
  58. catch (...)
  59. {
  60. cerr << "unknown exception." << std::endl;
  61. }
  62. return -1;
  63. }
  64. /////////////////////////////////////////////////////////////////

客户端实现

客户端实现概述

本节介绍客户端通过proxy的方式来访问服务端,具体步骤如下:

  • 客户端首先建立通信器(Communicator _comm),并通过该通信器获取proxy,代码格式如下:
  1. string sObjName = "Test.TestPushServer.TestPushServantObj";
  2. string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
  3. _prx = _comm.stringToProxy<ServantPrx>(sObjName+"@"+sObjHost);
  • 编写proxy的请求包的编码器和响应包的解码器并设置,代码格式如下:
  1. 请求包的编码器格式:
  2. static void FUN1(const RequestPacket& request, string& buff)
  3. 响应包的解码器格式:
  4. static size_t FUN2(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
  5. proxy设置代码为:
  6. ProxyProtocol prot;
  7. prot.requestFunc = FUN1;
  8. prot.responseFunc = FUN2 ;
  9. _prx->tars_set_protocol(prot);
  • 同步方法或者异步方法访问服务端

    • 同步方法通过调用proxy的rpc_call方法访问服务
    1. virtual void rpc_call(uint32_t requestId, const string& sFuncName,const char* buff, uint32_t len, ResponsePacket& rsp);

    其中参数requestId需要在一个object内唯一,可以通过proxy的 uint32_t tars_gen_requestid()接口获得一个该object内唯一的id。sFuncName主要用于框架层对接口调用的统计分析,可以缺省为””。buff为要发送的内容,len为buff的长度。rsp则为本次调用得到的ResponsePacket包。

    • 异步方法通过调用proxy的rpc_call_asyc方法访问服务

      1. virtual void rpc_call_async(uint32_t requestId, const string& sFuncName, const char* buff, uint32_t len, const ServantProxyCallbackPtr& callback);

      其中参数requestId需要在一个object内唯一,可以通过proxy的 uint32_t tars_gen_requestid()接口获得一个该object内唯一的id。sFuncName为回调对象响应后调用的函数名。buff为要发送的内容,len为buff的长度。callback则为本次调用返回结果后,即服务端返回处理结果后,此回调对象会被响应。

  • 设置接受服务端的push消息方法:

  1. TestPushCallBackPtr cbPush = new TestPushCallBack();
  2. _prx->tars_set_push_callback(cbPush);

客户端具体实现

main.cpp

  1. #include "servant/Application.h"
  2. #include "TestRecvThread.h"
  3. #include <iostream>
  4. using namespace std;
  5. using namespace tars;
  6. int main(int argc,char**argv)
  7. {
  8. try
  9. {
  10. RecvThread thread;
  11. thread.start();
  12. int c;
  13. while((c = getchar()) != 'q');
  14. thread.terminate();
  15. thread.getThreadControl().join();
  16. }
  17. catch(std::exception&e)
  18. {
  19. cerr<<"std::exception:"<<e.what()<<endl;
  20. }
  21. catch(...)
  22. {
  23. cerr<<"unknown exception"<<endl;
  24. }
  25. return 0;
  26. }

TestRecvThread.h

  1. #ifndef __TEST_RECV_THREAD_H
  2. #define __TEST_RECV_THREAD_H
  3. #include "servant/Application.h"
  4. class TestPushCallBack : public ServantProxyCallback
  5. {
  6. public:
  7. virtual int onDispatch(ReqMessagePtr msg);
  8. };
  9. typedef tars::TC_AutoPtr<TestPushCallBack> TestPushCallBackPtr;
  10. class RecvThread : public TC_Thread, public TC_ThreadLock
  11. {
  12. public:
  13. RecvThread();
  14. virtual void run();
  15. void terminate();
  16. private:
  17. bool _bTerminate;
  18. Communicator _comm;
  19. ServantPrx _prx;
  20. };
  21. #endif

TestRecvThread.cpp

  1. #include "TestRecvThread.h"
  2. #include <iostream>
  3. #include <arpa/inet.h>
  4. /*
  5. 响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket
  6. */
  7. static size_t pushResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
  8. {
  9. size_t pos = 0;
  10. while (pos < length)
  11. {
  12. unsigned int len = length - pos;
  13. if(len < sizeof(unsigned int))
  14. {
  15. break;
  16. }
  17. unsigned int iHeaderLen = ntohl(*(unsigned int*)(recvBuffer + pos));
  18. //做一下保护,长度大于M
  19. if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
  20. {
  21. throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
  22. }
  23. //包没有接收全
  24. if (len < iHeaderLen)
  25. {
  26. break;
  27. }
  28. else
  29. {
  30. ResponsePacket rsp;
  31. rsp.iRequestId = ntohl(*((unsigned int *)(recvBuffer + pos + sizeof(unsigned int))));
  32. rsp.sBuffer.resize(iHeaderLen - 2*sizeof(unsigned int));
  33. ::memcpy(&rsp.sBuffer[0], recvBuffer + pos + 2*sizeof(unsigned int), iHeaderLen - 2*sizeof(unsigned int));
  34. pos += iHeaderLen;
  35. done.push_back(rsp);
  36. }
  37. }
  38. return pos;
  39. }
  40. /*
  41. 请求包编码函数,本函数的打包格式为
  42. 整个包长度(字节)+iRequestId(字节)+包内容
  43. */
  44. static void pushRequest(const RequestPacket& request, string& buff)
  45. {
  46. unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
  47. unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
  48. buff = "";
  49. for (int i = 0; i<4; ++i)
  50. {
  51. buff += *bufflengthptr++;
  52. }
  53. unsigned int netrequestId = htonl(request.iRequestId);
  54. unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
  55. for (int i = 0; i<4; ++i)
  56. {
  57. buff += *netrequestIdptr++;
  58. }
  59. string tmp;
  60. tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
  61. buff+=tmp;
  62. }
  63. static void printResult(int iRequestId, const string &sResponseStr)
  64. {
  65. cout << "request id: " << iRequestId << endl;
  66. cout << "response str: " << sResponseStr << endl;
  67. }
  68. static void printPushInfo(const string &sResponseStr)
  69. {
  70. cout << "push message: " << sResponseStr << endl;
  71. }
  72. int TestPushCallBack::onDispatch(ReqMessagePtr msg)
  73. {
  74. if(msg->request.sFuncName == "printResult")
  75. {
  76. string sRet;
  77. cout << "sBuffer: " << msg->response.sBuffer.size() << endl;
  78. sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
  79. printResult(msg->request.iRequestId, sRet);
  80. return 0;
  81. }
  82. else if(msg->response.iRequestId == 0)
  83. {
  84. string sRet;
  85. sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
  86. printPushInfo(sRet);
  87. return 0;
  88. }
  89. else
  90. {
  91. cout << "no match func!" <<endl;
  92. }
  93. return -3;
  94. }
  95. RecvThread::RecvThread():_bTerminate(false)
  96. {
  97. string sObjName = "Test.TestPushServer.TestPushServantObj";
  98. string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
  99. _prx = _comm.stringToProxy<ServantPrx>(sObjName+"@"+sObjHost);
  100. ProxyProtocol prot;
  101. prot.requestFunc = pushRequest;
  102. prot.responseFunc = pushResponse;
  103. _prx->tars_set_protocol(prot);
  104. }
  105. void RecvThread::terminate()
  106. {
  107. _bTerminate = true;
  108. {
  109. tars::TC_ThreadLock::Lock sync(*this);
  110. notifyAll();
  111. }
  112. }
  113. void RecvThread::run(void)
  114. {
  115. TestPushCallBackPtr cbPush = new TestPushCallBack();
  116. _prx->tars_set_push_callback(cbPush);
  117. string buf("heartbeat");
  118. while(!_bTerminate)
  119. {
  120. {
  121. try
  122. {
  123. TestPushCallBackPtr cb = new TestPushCallBack();
  124. _prx->rpc_call_async(_prx->tars_gen_requestid(), "printResult", buf.c_str(), buf.length(), cb);
  125. }
  126. catch(TarsException& e)
  127. {
  128. cout << "TarsException: " << e.what() << endl;
  129. }
  130. catch(...)
  131. {
  132. cout << "unknown exception" << endl;
  133. }
  134. }
  135. {
  136. TC_ThreadLock::Lock sync(*this);
  137. timedWait(5000);
  138. }
  139. }
  140. }

客户端测试结果

如果push 成功,结果如下

Tars的push功能 - 图3