1. 简单的分布式server

1.1.1. 简单的分布式server

目前分布式系统已经很流行了,一些开源框架也被广泛应用,如dubbo、Motan等。对于一个分布式服务,最基本的一项功能就是服务的注册和发现,而利用zk的EPHEMERAL节点则可以很方便的实现该功能。EPHEMERAL节点正如其名,是临时性的,其生命周期是和客户端会话绑定的,当会话连接断开时,节点也会被删除。下边我们就来实现一个简单的分布式server:

server:

服务启动时,创建zk连接,并在go_servers节点下创建一个新节点,节点名为"ip:port",完成服务注册 服务结束时,由于连接断开,创建的节点会被删除,这样client就不会连到该节点

client:

先从zk获取go_servers节点下所有子节点,这样就拿到了所有注册的server 从server列表中选中一个节点(这里只是随机选取,实际服务一般会提供多种策略),创建连接进行通信 这里为了演示,我们每次client连接server,获取server发送的时间后就断开。主要代码如下:

server.go

  1. package main
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "time"
  7. "github.com/samuel/go-zookeeper/zk"
  8. )
  9. func main() {
  10. go starServer("127.0.0.1:8897")
  11. go starServer("127.0.0.1:8898")
  12. go starServer("127.0.0.1:8899")
  13. a := make(chan bool, 1)
  14. <-a
  15. }
  16. func checkError(err error) {
  17. if err != nil {
  18. fmt.Println(err)
  19. }
  20. }
  21. func starServer(port string) {
  22. tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
  23. fmt.Println(tcpAddr)
  24. checkError(err)
  25. listener, err := net.ListenTCP("tcp", tcpAddr)
  26. checkError(err)
  27. //注册zk节点q
  28. // 链接zk
  29. conn, err := GetConnect()
  30. if err != nil {
  31. fmt.Printf(" connect zk error: %s ", err)
  32. }
  33. defer conn.Close()
  34. // zk节点注册
  35. err = RegistServer(conn, port)
  36. if err != nil {
  37. fmt.Printf(" regist node error: %s ", err)
  38. }
  39. for {
  40. conn, err := listener.Accept()
  41. if err != nil {
  42. fmt.Fprintf(os.Stderr, "Error: %s", err)
  43. continue
  44. }
  45. go handleCient(conn, port)
  46. }
  47. fmt.Println("aaaaaa")
  48. }
  49. func handleCient(conn net.Conn, port string) {
  50. defer conn.Close()
  51. daytime := time.Now().String()
  52. conn.Write([]byte(port + ": " + daytime))
  53. }
  54. func GetConnect() (conn *zk.Conn, err error) {
  55. zkList := []string{"localhost:2181"}
  56. conn, _, err = zk.Connect(zkList, 10*time.Second)
  57. if err != nil {
  58. fmt.Println(err)
  59. }
  60. return
  61. }
  62. func RegistServer(conn *zk.Conn, host string) (err error) {
  63. _, err = conn.Create("/go_servers/"+host, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  64. return
  65. }
  66. func GetServerList(conn *zk.Conn) (list []string, err error) {
  67. list, _, err = conn.Children("/go_servers")
  68. return
  69. }

1.1.2. client.go

  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "math/rand"
  7. "net"
  8. "time"
  9. "github.com/samuel/go-zookeeper/zk"
  10. )
  11. func checkError(err error) {
  12. if err != nil {
  13. fmt.Println(err)
  14. }
  15. }
  16. func main() {
  17. for i := 0; i < 100; i++ {
  18. startClient()
  19. time.Sleep(1 * time.Second)
  20. }
  21. }
  22. func startClient() {
  23. // service := "127.0.0.1:8899"
  24. //获取地址
  25. serverHost, err := getServerHost()
  26. if err != nil {
  27. fmt.Printf("get server host fail: %s \n", err)
  28. return
  29. }
  30. fmt.Println("connect host: " + serverHost)
  31. tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
  32. checkError(err)
  33. conn, err := net.DialTCP("tcp", nil, tcpAddr)
  34. checkError(err)
  35. defer conn.Close()
  36. _, err = conn.Write([]byte("timestamp"))
  37. checkError(err)
  38. result, err := ioutil.ReadAll(conn)
  39. checkError(err)
  40. fmt.Println(string(result))
  41. return
  42. }
  43. func getServerHost() (host string, err error) {
  44. conn, err := GetConnect()
  45. if err != nil {
  46. fmt.Printf(" connect zk error: %s \n ", err)
  47. return
  48. }
  49. defer conn.Close()
  50. serverList, err := GetServerList(conn)
  51. if err != nil {
  52. fmt.Printf(" get server list error: %s \n", err)
  53. return
  54. }
  55. count := len(serverList)
  56. if count == 0 {
  57. err = errors.New("server list is empty \n")
  58. return
  59. }
  60. //随机选中一个返回
  61. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  62. host = serverList[r.Intn(3)]
  63. return
  64. }
  65. func GetConnect() (conn *zk.Conn, err error) {
  66. zkList := []string{"localhost:2181"}
  67. conn, _, err = zk.Connect(zkList, 10*time.Second)
  68. if err != nil {
  69. fmt.Println(err)
  70. }
  71. return
  72. }
  73. func GetServerList(conn *zk.Conn) (list []string, err error) {
  74. list, _, err = conn.Children("/go_servers")
  75. return
  76. }

先启动server,可以看到有三个节点注册到zk:

  1. 127.0.0.1:8897
  2. 127.0.0.1:8899
  3. 127.0.0.1:8898
  4. 2018/08/27 14:04:58 Connected to 127.0.0.1:2181
  5. 2018/08/27 14:04:58 Connected to 127.0.0.1:2181
  6. 2018/08/27 14:04:58 Connected to 127.0.0.1:2181
  7. 2018/08/27 14:04:58 Authenticated: id=100619932030205976, timeout=10000
  8. 2018/08/27 14:04:58 Re-submitting `0` credentials after reconnect
  9. 2018/08/27 14:04:58 Authenticated: id=100619932030205977, timeout=10000
  10. 2018/08/27 14:04:58 Re-submitting `0` credentials after reconnect
  11. 2018/08/27 14:04:58 Authenticated: id=100619932030205978, timeout=10000
  12. 2018/08/27 14:04:58 Re-submitting `0` credentials after reconnect

启动client,可以看到每次client都会随机连接到一个节点进行通信:

  1. 2018/08/27 14:05:21 Connected to 127.0.0.1:2181
  2. 2018/08/27 14:05:21 Authenticated: id=100619932030205979, timeout=10000
  3. 2018/08/27 14:05:21 Re-submitting `0` credentials after reconnect
  4. 2018/08/27 14:05:21 Recv loop terminated: err=EOF
  5. connect host: 127.0.0.1:8899
  6. 2018/08/27 14:05:21 Send loop terminated: err=<nil>
  7. read tcp 127.0.0.1:54062->127.0.0.1:8899: read: connection reset by peer
  8. 127.0.0.1:8899: 2018-08-27 14:05:21.291641 +0800 CST m=+22.480149656
  9. 2018/08/27 14:05:22 Connected to [::1]:2181
  10. 2018/08/27 14:05:22 Authenticated: id=100619932030205980, timeout=10000
  11. 2018/08/27 14:05:22 Re-submitting `0` credentials after reconnect
  12. 2018/08/27 14:05:22 Recv loop terminated: err=EOF
  13. 2018/08/27 14:05:22 Send loop terminated: err=<nil>
  14. connect host: 127.0.0.1:8897
  15. read tcp 127.0.0.1:54064->127.0.0.1:8897: read: connection reset by peer
  16. 127.0.0.1:8897: 2018-08-27 14:05:22.302322 +0800 CST m=+23.490801385
  17. 2018/08/27 14:05:23 Connected to 127.0.0.1:2181
  18. 2018/08/27 14:05:23 Authenticated: id=100619932030205981, timeout=10000
  19. 2018/08/27 14:05:23 Re-submitting `0` credentials after reconnect
  20. 2018/08/27 14:05:23 Recv loop terminated: err=EOF
  21. 2018/08/27 14:05:23 Send loop terminated: err=<nil>
  22. connect host: 127.0.0.1:8897
  23. read tcp 127.0.0.1:54070->127.0.0.1:8897: read: connection reset by peer
  24. 127.0.0.1:8897: 2018-08-27 14:05:23.312873 +0800 CST m=+24.501324228
  25. 2018/08/27 14:05:24 Connected to 127.0.0.1:2181
  26. 2018/08/27 14:05:24 Authenticated: id=100619932030205982, timeout=10000
  27. 2018/08/27 14:05:24 Re-submitting `0` credentials after reconnect
  28. 2018/08/27 14:05:24 Recv loop terminated: err=EOF
  29. connect host: 127.0.0.1:8899
  30. 2018/08/27 14:05:24 Send loop terminated: err=<nil>
  31. read tcp 127.0.0.1:54072->127.0.0.1:8899: read: connection reset by peer
  32. 127.0.0.1:8899: 2018-08-27 14:05:24.323668 +0800 CST m=+25.512090155
  33. 2018/08/27 14:05:25 Connected to 127.0.0.1:2181
  34. 2018/08/27 14:05:25 Authenticated: id=100619932030205983, timeout=10000
  35. 2018/08/27 14:05:25 Re-submitting `0` credentials after reconnect
  36. 2018/08/27 14:05:25 Recv loop terminated: err=EOF
  37. 2018/08/27 14:05:25 Send loop terminated: err=<nil>
  38. connect host: 127.0.0.1:8897
  39. read tcp 127.0.0.1:54074->127.0.0.1:8897: read: connection reset by peer
  40. 127.0.0.1:8897: 2018-08-27 14:05:25.330257 +0800 CST m=+26.518650566
  41. 2018/08/27 14:05:26 Connected to [::1]:2181
  42. 2018/08/27 14:05:26 Authenticated: id=100619932030205984, timeout=10000
  43. 2018/08/27 14:05:26 Re-submitting `0` credentials after reconnect
  44. 2018/08/27 14:05:26 Recv loop terminated: err=EOF
  45. 2018/08/27 14:05:26 Send loop terminated: err=<nil>
  46. connect host: 127.0.0.1:8897
  47. read tcp 127.0.0.1:54080->127.0.0.1:8897: read: connection reset by peer
  48. 127.0.0.1:8897: 2018-08-27 14:05:26.357251 +0800 CST m=+27.545614616
  49. 2018/08/27 14:05:27 Connected to 127.0.0.1:2181
  50. 2018/08/27 14:05:27 Authenticated: id=100619932030205985, timeout=10000
  51. 2018/08/27 14:05:27 Re-submitting `0` credentials after reconnect
  52. connect host: 127.0.0.1:8899
  53. 2018/08/27 14:05:27 Recv loop terminated: err=EOF
  54. 2018/08/27 14:05:27 Send loop terminated: err=<nil>
  55. read tcp 127.0.0.1:54082->127.0.0.1:8899: read: connection reset by peer
  56. 127.0.0.1:8899: 2018-08-27 14:05:27.369096 +0800 CST m=+28.557430764
  57. 2018/08/27 14:05:28 Connected to [::1]:2181
  58. 2018/08/27 14:05:28 Authenticated: id=100619932030205986, timeout=10000
  59. 2018/08/27 14:05:28 Re-submitting `0` credentials after reconnect
  60. 2018/08/27 14:05:28 Recv loop terminated: err=EOF
  61. 2018/08/27 14:05:28 Send loop terminated: err=<nil>
  62. connect host: 127.0.0.1:8898
  63. read tcp 127.0.0.1:54084->127.0.0.1:8898: read: connection reset by peer
  64. 127.0.0.1:8898: 2018-08-27 14:05:28.380455 +0800 CST m=+29.568760988
  65. ......

至此,我们的分布式server就实现了