Bidirectional commnunication

Example:bidirectional

In normal case, clients send requests to services and services returns reponses to clients. It is the request-response rpc model.

But for some users, they want services can send commands or notifications to clients initiatively. It can be implemented by installing a service on the prior client and a client on the prior service but it is redundant and complicated.

Rpcx implements a simple notification model.

You should cache the connection and business user ID in order that yu know you want to send notifications to which client.

Server

Server can use SendMessage to send messages to clients and data is []byte. You should use servicePath and serviceMethod to indicate which notification the data is.

You can get the net.Conn by ctx.Value(server.RemoteConnContextKey) in service.

  1. func (s *Server) SendMessage(conn net.Conn, servicePath, serviceMethod string, metadata map[string]string, data []byte) error

```go server.gofunc main() { flag.Parse()

  1. ln, _ := net.Listen("tcp", ":9981")
  2. go http.Serve(ln, nil)
  3. s := server.NewServer()
  4. //s.RegisterName("Arith", new(example.Arith), "")
  5. s.Register(new(Arith), "")
  6. go s.Serve("tcp", *addr)
  7. for !connected {
  8. time.Sleep(time.Second)
  9. }
  10. fmt.Printf("start to send messages to %s\n", clientConn.RemoteAddr().String())
  11. for {
  12. if clientConn != nil {
  13. err := s.SendMessage(clientConn, "test_service_path", "test_service_method", nil, []byte("abcde"))
  14. if err != nil {
  15. fmt.Printf("failed to send messsage to %s: %v\n", clientConn.RemoteAddr().String(), err)
  16. clientConn = nil
  17. }
  18. }
  19. time.Sleep(time.Second)
  20. }

}

  1. ## Client
  2. You must use `NewBidirectionalXClient` to create XClient by passing a channel. Then yuou can range the channel to get the message.
  3. ```go client.go
  4. func main() {
  5. flag.Parse()
  6. ch := make(chan *protocol.Message)
  7. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  8. xclient := client.NewBidirectionalXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption, ch)
  9. defer xclient.Close()
  10. args := &example.Args{
  11. A: 10,
  12. B: 20,
  13. }
  14. reply := &example.Reply{}
  15. err := xclient.Call(context.Background(), "Mul", args, reply)
  16. if err != nil {
  17. log.Fatalf("failed to call: %v", err)
  18. }
  19. log.Printf("%d * %d = %d", args.A, args.B, reply.C)
  20. for msg := range ch {
  21. fmt.Printf("receive msg from server: %s\n", msg.Payload)
  22. }
  23. }

By smallnest updated 2018-12-04 11:47:26