8.4 Zinx-V0.8代码实现

好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。

zinx/znet/server.go

  1. //开启网络服务
  2. func (s *Server) Start() {
  3. //...
  4. //开启一个go去做服务端Linster业务
  5. go func() {
  6. //0 启动worker工作池机制
  7. s.msgHandler.StartWorkerPool()
  8. //1 获取一个TCP的Addr
  9. addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
  10. if err != nil {
  11. fmt.Println("resolve tcp addr err: ", err)
  12. return
  13. }
  14. //...
  15. //...
  16. }
  17. }()
  18. }

其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。

所以应该在Connection的StartReader()方法中修改:

zinx/znet/connection.go

  1. /*
  2. 读消息Goroutine,用于从客户端中读取数据
  3. */
  4. func (c *Connection) StartReader() {
  5. fmt.Println("Reader Goroutine is running")
  6. defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
  7. defer c.Stop()
  8. for {
  9. // 创建拆包解包的对象...
  10. //读取客户端的Msg head...
  11. //拆包,得到msgid 和 datalen 放在msg中...
  12. //根据 dataLen 读取 data,放在msg.Data中...
  13. //得到当前客户端请求的Request数据
  14. req := Request{
  15. conn:c,
  16. msg:msg,
  17. }
  18. if utils.GlobalObject.WorkerPoolSize > 0 {
  19. //已经启动工作池机制,将消息交给Worker处理
  20. c.MsgHandler.SendMsgToTaskQueue(&req)
  21. } else {
  22. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  23. go c.MsgHandler.DoMsgHandler(&req)
  24. }
  25. }
  26. }

这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。