1. 操作ETCD

这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

1.1.1. 安装

  1. go get go.etcd.io/etcd/clientv3

1.1.2. put和get操作

put命令用来设置键值对数据,get命令用来根据key获取值。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go.etcd.io/etcd/clientv3"
  7. )
  8. // etcd client put/get demo
  9. // use etcd/clientv3
  10. func main() {
  11. cli, err := clientv3.New(clientv3.Config{
  12. Endpoints: []string{"127.0.0.1:2379"},
  13. DialTimeout: 5 * time.Second,
  14. })
  15. if err != nil {
  16. // handle error!
  17. fmt.Printf("connect to etcd failed, err:%v\n", err)
  18. return
  19. }
  20. fmt.Println("connect to etcd success")
  21. defer cli.Close()
  22. // put
  23. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  24. _, err = cli.Put(ctx, "lmh", "lmh")
  25. cancel()
  26. if err != nil {
  27. fmt.Printf("put to etcd failed, err:%v\n", err)
  28. return
  29. }
  30. // get
  31. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  32. resp, err := cli.Get(ctx, "lmh")
  33. cancel()
  34. if err != nil {
  35. fmt.Printf("get from etcd failed, err:%v\n", err)
  36. return
  37. }
  38. for _, ev := range resp.Kvs {
  39. fmt.Printf("%s:%s\n", ev.Key, ev.Value)
  40. }
  41. }

1.1.3. watch操作

watch用来获取未来更改的通知。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go.etcd.io/etcd/clientv3"
  7. )
  8. // watch demo
  9. func main() {
  10. cli, err := clientv3.New(clientv3.Config{
  11. Endpoints: []string{"127.0.0.1:2379"},
  12. DialTimeout: 5 * time.Second,
  13. })
  14. if err != nil {
  15. fmt.Printf("connect to etcd failed, err:%v\n", err)
  16. return
  17. }
  18. fmt.Println("connect to etcd success")
  19. defer cli.Close()
  20. // watch key:lmh change
  21. rch := cli.Watch(context.Background(), "lmh") // <-chan WatchResponse
  22. for wresp := range rch {
  23. for _, ev := range wresp.Events {
  24. fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
  25. }
  26. }
  27. }

将上面的代码保存编译执行,此时程序就会等待etcd中lmh这个key的变化。

例如:我们打开终端执行以下命令修改、删除、设置lmh这个key。

  1. etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh1"
  2. OK
  3. etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 del lmh
  4. 1
  5. etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh2"
  6. OK

上面的程序都能收到如下通知。

  1. watch>watch.exe
  2. connect to etcd success
  3. Type: PUT Key:lmh Value:lmh1
  4. Type: DELETE Key:lmh Value:
  5. Type: PUT Key:lmh Value:lmh2

1.1.4. lease租约

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // etcd lease
  7. import (
  8. "context"
  9. "log"
  10. "go.etcd.io/etcd/clientv3"
  11. )
  12. func main() {
  13. cli, err := clientv3.New(clientv3.Config{
  14. Endpoints: []string{"127.0.0.1:2379"},
  15. DialTimeout: time.Second * 5,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. fmt.Println("connect to etcd success.")
  21. defer cli.Close()
  22. // 创建一个5秒的租约
  23. resp, err := cli.Grant(context.TODO(), 5)
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. // 5秒钟之后, /lmh/ 这个key就会被移除
  28. _, err = cli.Put(context.TODO(), "/lmh/", "lmh", clientv3.WithLease(resp.ID))
  29. if err != nil {
  30. log.Fatal(err)
  31. }
  32. }

1.1.5. keepAlive

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "go.etcd.io/etcd/clientv3"
  8. )
  9. // etcd keepAlive
  10. func main() {
  11. cli, err := clientv3.New(clientv3.Config{
  12. Endpoints: []string{"127.0.0.1:2379"},
  13. DialTimeout: time.Second * 5,
  14. })
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. fmt.Println("connect to etcd success.")
  19. defer cli.Close()
  20. resp, err := cli.Grant(context.TODO(), 5)
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. _, err = cli.Put(context.TODO(), "/lmh/", "lmh", clientv3.WithLease(resp.ID))
  25. if err != nil {
  26. log.Fatal(err)
  27. }
  28. // the key 'foo' will be kept forever
  29. ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
  30. if kaerr != nil {
  31. log.Fatal(kaerr)
  32. }
  33. for {
  34. ka := <-ch
  35. fmt.Println("ttl:", ka.TTL)
  36. }
  37. }

1.1.6. 基于etcd实现分布式锁

go.etcd.io/etcd/clientv3/concurrency在etcd之上实现并发操作,如分布式锁、屏障和选举。

导入该包:

  1. import "go.etcd.io/etcd/clientv3/concurrency"

基于etcd实现的分布式锁示例:

  1. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer cli.Close()
  6. // 创建两个单独的会话用来演示锁竞争
  7. s1, err := concurrency.NewSession(cli)
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. defer s1.Close()
  12. m1 := concurrency.NewMutex(s1, "/my-lock/")
  13. s2, err := concurrency.NewSession(cli)
  14. if err != nil {
  15. log.Fatal(err)
  16. }
  17. defer s2.Close()
  18. m2 := concurrency.NewMutex(s2, "/my-lock/")
  19. // 会话s1获取锁
  20. if err := m1.Lock(context.TODO()); err != nil {
  21. log.Fatal(err)
  22. }
  23. fmt.Println("acquired lock for s1")
  24. m2Locked := make(chan struct{})
  25. go func() {
  26. defer close(m2Locked)
  27. // 等待直到会话s1释放了/my-lock/的锁
  28. if err := m2.Lock(context.TODO()); err != nil {
  29. log.Fatal(err)
  30. }
  31. }()
  32. if err := m1.Unlock(context.TODO()); err != nil {
  33. log.Fatal(err)
  34. }
  35. fmt.Println("released lock for s1")
  36. <-m2Locked
  37. fmt.Println("acquired lock for s2")

输出:

  1. acquired lock for s1
  2. released lock for s1
  3. acquired lock for s2

查看文档了解更多