confd的源码参考:https://github.com/kelseyhightower/confd

本文分析的confd的版本是v0.16.0,代码参考:https://github.com/kelseyhightower/confd/tree/v0.16.0。

1. Main

confd的入口函数 Main 函数,先解析参数,如果是打印版本信息的参数,则执行打印版本的命令。

  1. func main() {
  2. flag.Parse()
  3. if config.PrintVersion {
  4. fmt.Printf("confd %s (Git SHA: %s, Go Version: %s)\n", Version, GitSHA, runtime.Version())
  5. os.Exit(0)
  6. }
  7. ...
  8. }

其中版本信息记录在https://github.com/kelseyhightower/confd/blob/v0.16.0/version.go#L3

  1. const Version = "0.16.0"

1.1. initConfig

初始化配置文件。

  1. if err := initConfig(); err != nil {
  2. log.Fatal(err.Error())
  3. }

initConfig函数对基本的配置内容做初始化,当没有指定后端存储的时候,设置默认存储。

  1. // initConfig initializes the confd configuration by first setting defaults,
  2. // then overriding settings from the confd config file, then overriding
  3. // settings from environment variables, and finally overriding
  4. // settings from flags set on the command line.
  5. // It returns an error if any.
  6. func initConfig() error {
  7. _, err := os.Stat(config.ConfigFile)
  8. if os.IsNotExist(err) {
  9. log.Debug("Skipping confd config file.")
  10. } else {
  11. log.Debug("Loading " + config.ConfigFile)
  12. configBytes, err := ioutil.ReadFile(config.ConfigFile)
  13. if err != nil {
  14. return err
  15. }
  16. _, err = toml.Decode(string(configBytes), &config)
  17. if err != nil {
  18. return err
  19. }
  20. }
  21. // Update config from environment variables.
  22. processEnv()
  23. if config.SecretKeyring != "" {
  24. kr, err := os.Open(config.SecretKeyring)
  25. if err != nil {
  26. log.Fatal(err.Error())
  27. }
  28. defer kr.Close()
  29. config.PGPPrivateKey, err = ioutil.ReadAll(kr)
  30. if err != nil {
  31. log.Fatal(err.Error())
  32. }
  33. }
  34. if config.LogLevel != "" {
  35. log.SetLevel(config.LogLevel)
  36. }
  37. if config.SRVDomain != "" && config.SRVRecord == "" {
  38. config.SRVRecord = fmt.Sprintf("_%s._tcp.%s.", config.Backend, config.SRVDomain)
  39. }
  40. // Update BackendNodes from SRV records.
  41. if config.Backend != "env" && config.SRVRecord != "" {
  42. log.Info("SRV record set to " + config.SRVRecord)
  43. srvNodes, err := getBackendNodesFromSRV(config.SRVRecord)
  44. if err != nil {
  45. return errors.New("Cannot get nodes from SRV records " + err.Error())
  46. }
  47. switch config.Backend {
  48. case "etcd":
  49. vsm := make([]string, len(srvNodes))
  50. for i, v := range srvNodes {
  51. vsm[i] = config.Scheme + "://" + v
  52. }
  53. srvNodes = vsm
  54. }
  55. config.BackendNodes = srvNodes
  56. }
  57. if len(config.BackendNodes) == 0 {
  58. switch config.Backend {
  59. case "consul":
  60. config.BackendNodes = []string{"127.0.0.1:8500"}
  61. case "etcd":
  62. peerstr := os.Getenv("ETCDCTL_PEERS")
  63. if len(peerstr) > 0 {
  64. config.BackendNodes = strings.Split(peerstr, ",")
  65. } else {
  66. config.BackendNodes = []string{"http://127.0.0.1:4001"}
  67. }
  68. case "etcdv3":
  69. config.BackendNodes = []string{"127.0.0.1:2379"}
  70. case "redis":
  71. config.BackendNodes = []string{"127.0.0.1:6379"}
  72. case "vault":
  73. config.BackendNodes = []string{"http://127.0.0.1:8200"}
  74. case "zookeeper":
  75. config.BackendNodes = []string{"127.0.0.1:2181"}
  76. }
  77. }
  78. // Initialize the storage client
  79. log.Info("Backend set to " + config.Backend)
  80. if config.Watch {
  81. unsupportedBackends := map[string]bool{
  82. "dynamodb": true,
  83. "ssm": true,
  84. }
  85. if unsupportedBackends[config.Backend] {
  86. log.Info(fmt.Sprintf("Watch is not supported for backend %s. Exiting...", config.Backend))
  87. os.Exit(1)
  88. }
  89. }
  90. if config.Backend == "dynamodb" && config.Table == "" {
  91. return errors.New("No DynamoDB table configured")
  92. }
  93. config.ConfigDir = filepath.Join(config.ConfDir, "conf.d")
  94. config.TemplateDir = filepath.Join(config.ConfDir, "templates")
  95. return nil
  96. }

1.2. storeClient

  1. log.Info("Starting confd")
  2. storeClient, err := backends.New(config.BackendsConfig)
  3. if err != nil {
  4. log.Fatal(err.Error())
  5. }

根据配置文件中的存储后端类型构造一个存储后端的client,其中主要调用的函数为backends.New(config.BackendsConfig)

当没有设置存储后端时,默认为etcd

  1. if config.Backend == "" {
  2. config.Backend = "etcd"
  3. }
  4. backendNodes := config.BackendNodes

当存储后端为file类型的处理。

  1. if config.Backend == "file" {
  2. log.Info("Backend source(s) set to " + strings.Join(config.YAMLFile, ", "))
  3. } else {
  4. log.Info("Backend source(s) set to " + strings.Join(backendNodes, ", "))
  5. }

最后再根据不同类型的存储后端,调用不同的存储后端构建函数,本文只分析redis类型的存储后端。

  1. switch config.Backend {
  2. case "consul":
  3. return consul.New(config.BackendNodes, config.Scheme,
  4. config.ClientCert, config.ClientKey,
  5. config.ClientCaKeys,
  6. config.BasicAuth,
  7. config.Username,
  8. config.Password,
  9. )
  10. case "etcd":
  11. // Create the etcd client upfront and use it for the life of the process.
  12. // The etcdClient is an http.Client and designed to be reused.
  13. return etcd.NewEtcdClient(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password)
  14. case "etcdv3":
  15. return etcdv3.NewEtcdClient(backendNodes, config.ClientCert, config.ClientKey, config.ClientCaKeys, config.BasicAuth, config.Username, config.Password)
  16. case "zookeeper":
  17. return zookeeper.NewZookeeperClient(backendNodes)
  18. case "rancher":
  19. return rancher.NewRancherClient(backendNodes)
  20. case "redis":
  21. return redis.NewRedisClient(backendNodes, config.ClientKey, config.Separator)
  22. case "env":
  23. return env.NewEnvClient()
  24. case "file":
  25. return file.NewFileClient(config.YAMLFile, config.Filter)
  26. case "vault":
  27. vaultConfig := map[string]string{
  28. "app-id": config.AppID,
  29. "user-id": config.UserID,
  30. "role-id": config.RoleID,
  31. "secret-id": config.SecretID,
  32. "username": config.Username,
  33. "password": config.Password,
  34. "token": config.AuthToken,
  35. "cert": config.ClientCert,
  36. "key": config.ClientKey,
  37. "caCert": config.ClientCaKeys,
  38. "path": config.Path,
  39. }
  40. return vault.New(backendNodes[0], config.AuthType, vaultConfig)
  41. case "dynamodb":
  42. table := config.Table
  43. log.Info("DynamoDB table set to " + table)
  44. return dynamodb.NewDynamoDBClient(table)
  45. case "ssm":
  46. return ssm.New()
  47. }
  48. return nil, errors.New("Invalid backend")

其中redis类型的存储后端调用了NewRedisClient方法来构造redis的client。

  1. case "redis":
  2. return redis.NewRedisClient(backendNodes, config.ClientKey, config.Separator)

其中涉及三个参数:

  • backendNodes:redis的节点地址。
  • ClientKey:redis的密码。
  • Separator:查找redis键的分隔符,该参数只用在redis类型。

NewRedisClient函数方法如下:

  1. // NewRedisClient returns an *redis.Client with a connection to named machines.
  2. // It returns an error if a connection to the cluster cannot be made.
  3. func NewRedisClient(machines []string, password string, separator string) (*Client, error) {
  4. if separator == "" {
  5. separator = "/"
  6. }
  7. log.Debug(fmt.Sprintf("Redis Separator: %#v", separator))
  8. var err error
  9. clientWrapper := &Client{machines: machines, password: password, separator: separator, client: nil, pscChan: make(chan watchResponse), psc: redis.PubSubConn{Conn: nil} }
  10. clientWrapper.client, _, err = tryConnect(machines, password, true)
  11. return clientWrapper, err
  12. }

1.3. processor

  1. stopChan := make(chan bool)
  2. doneChan := make(chan bool)
  3. errChan := make(chan error, 10)
  4. var processor template.Processor
  5. switch {
  6. case config.Watch:
  7. processor = template.WatchProcessor(config.TemplateConfig, stopChan, doneChan, errChan)
  8. default:
  9. processor = template.IntervalProcessor(config.TemplateConfig, stopChan, doneChan, errChan, config.Interval)
  10. }
  11. go processor.Process()

当开启watch参数的时候,则构造WatchProcessor,否则构造IntervalProcessor,最后起一个goroutine。

  1. go processor.Process()

这块的逻辑在本文第二部分析。

1.4. signalChan

  1. signalChan := make(chan os.Signal, 1)
  2. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  3. for {
  4. select {
  5. case err := <-errChan:
  6. log.Error(err.Error())
  7. case s := <-signalChan:
  8. log.Info(fmt.Sprintf("Captured %v. Exiting...", s))
  9. close(doneChan)
  10. case <-doneChan:
  11. os.Exit(0)
  12. }
  13. }

2. Process

  1. type Processor interface {
  2. Process()
  3. }

Processor是一个接口类型,主要的实现体有:

  • intervalProcessor:默认的实现体,即没有添加watch参数。
  • watchProcessor:添加watch参数的实现体。

2.1. intervalProcessor

  1. type intervalProcessor struct {
  2. config Config
  3. stopChan chan bool
  4. doneChan chan bool
  5. errChan chan error
  6. interval int
  7. }

intervalProcessor根据config内容和几个channel构造一个intervalProcessor。

  1. func IntervalProcessor(config Config, stopChan, doneChan chan bool, errChan chan error, interval int) Processor {
  2. return &intervalProcessor{config, stopChan, doneChan, errChan, interval}
  3. }

2.1.1. intervalProcessor.Process

  1. func (p *intervalProcessor) Process() {
  2. defer close(p.doneChan)
  3. for {
  4. ts, err := getTemplateResources(p.config)
  5. if err != nil {
  6. log.Fatal(err.Error())
  7. break
  8. }
  9. process(ts)
  10. select {
  11. case <-p.stopChan:
  12. break
  13. case <-time.After(time.Duration(p.interval) * time.Second):
  14. continue
  15. }
  16. }
  17. }

通过解析config内容获取TemplateResources,其中核心函数为process(ts),然后执行t.process(),该函数中会调用t.sync()t.process()的具体逻辑后文分析。

  1. func process(ts []*TemplateResource) error {
  2. var lastErr error
  3. for _, t := range ts {
  4. if err := t.process(); err != nil {
  5. log.Error(err.Error())
  6. lastErr = err
  7. }
  8. }
  9. return lastErr
  10. }

2.2. watchProcessor

  1. type watchProcessor struct {
  2. config Config
  3. stopChan chan bool
  4. doneChan chan bool
  5. errChan chan error
  6. wg sync.WaitGroup
  7. }

watchProcessor根据config内容和几个channel构造一个watchProcessor。

  1. func WatchProcessor(config Config, stopChan, doneChan chan bool, errChan chan error) Processor {
  2. var wg sync.WaitGroup
  3. return &watchProcessor{config, stopChan, doneChan, errChan, wg}
  4. }

2.2.1. watchProcessor.Process

  1. func (p *watchProcessor) Process() {
  2. defer close(p.doneChan)
  3. ts, err := getTemplateResources(p.config)
  4. if err != nil {
  5. log.Fatal(err.Error())
  6. return
  7. }
  8. for _, t := range ts {
  9. t := t
  10. p.wg.Add(1)
  11. go p.monitorPrefix(t)
  12. }
  13. p.wg.Wait()
  14. }

watchProcessor.Process方法实现了Processor接口中定义的方法,通过解析config内容获取TemplateResources,再遍历TemplateResources执行monitorPrefix,有多少个TemplateResources就运行多少个monitorPrefix的goroutine。

2.2.2. monitorPrefix

  1. func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
  2. defer p.wg.Done()
  3. keys := util.AppendPrefix(t.Prefix, t.Keys)
  4. for {
  5. index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
  6. if err != nil {
  7. p.errChan <- err
  8. // Prevent backend errors from consuming all resources.
  9. time.Sleep(time.Second * 2)
  10. continue
  11. }
  12. t.lastIndex = index
  13. if err := t.process(); err != nil {
  14. p.errChan <- err
  15. }
  16. }
  17. }

先对配置文件中的prefixkeys参数进行拼接。

  1. keys := util.AppendPrefix(t.Prefix, t.Keys)

AppendPrefix函数如下:

  1. func AppendPrefix(prefix string, keys []string) []string {
  2. s := make([]string, len(keys))
  3. for i, k := range keys {
  4. s[i] = path.Join(prefix, k)
  5. }
  6. return s
  7. }

接着再执行storeClientWatchPrefix方法,因为storeClient是一个接口,对应不同类型的存储后端,WatchPrefix的实现逻辑也不同,本文分析的存储类型为redis

  1. index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
  2. if err != nil {
  3. p.errChan <- err
  4. // Prevent backend errors from consuming all resources.
  5. time.Sleep(time.Second * 2)
  6. continue
  7. }

storeClient.WatchPrefix主要是获取lastIndex的值,这个值在t.process()中使用。

  1. t.lastIndex = index
  2. if err := t.process(); err != nil {
  3. p.errChan <- err
  4. }

2.3. TemplateResource.process

无论是否加watch参数,即intervalProcessorwatchProcessor最终都会调用到TemplateResource.process这个函数,而这个函数中的核心函数为t.sync()

  1. // process is a convenience function that wraps calls to the three main tasks
  2. // required to keep local configuration files in sync. First we gather vars
  3. // from the store, then we stage a candidate configuration file, and finally sync
  4. // things up.
  5. // It returns an error if any.
  6. func (t *TemplateResource) process() error {
  7. if err := t.setFileMode(); err != nil {
  8. return err
  9. }
  10. if err := t.setVars(); err != nil {
  11. return err
  12. }
  13. if err := t.createStageFile(); err != nil {
  14. return err
  15. }
  16. if err := t.sync(); err != nil {
  17. return err
  18. }
  19. return nil
  20. }

2.3.1. setFileMode

setFileMode设置文件的权限,如果没有在配置文件指定mode参数则默认为0644,否则根据配置文件中指定的mode来设置文件权限。

  1. // setFileMode sets the FileMode.
  2. func (t *TemplateResource) setFileMode() error {
  3. if t.Mode == "" {
  4. if !util.IsFileExist(t.Dest) {
  5. t.FileMode = 0644
  6. } else {
  7. fi, err := os.Stat(t.Dest)
  8. if err != nil {
  9. return err
  10. }
  11. t.FileMode = fi.Mode()
  12. }
  13. } else {
  14. mode, err := strconv.ParseUint(t.Mode, 0, 32)
  15. if err != nil {
  16. return err
  17. }
  18. t.FileMode = os.FileMode(mode)
  19. }
  20. return nil
  21. }

2.3.2. setVars

setVars将后端存储中最新的值拿出来暂存到内存中供后续进程使用。其中根据不同的后端,storeClient.GetValues的逻辑可能不同,但通过接口的方式可以让不同的存储后端实现不同的获取值的方法。

  1. // setVars sets the Vars for template resource.
  2. func (t *TemplateResource) setVars() error {
  3. var err error
  4. log.Debug("Retrieving keys from store")
  5. log.Debug("Key prefix set to " + t.Prefix)
  6. result, err := t.storeClient.GetValues(util.AppendPrefix(t.Prefix, t.Keys))
  7. if err != nil {
  8. return err
  9. }
  10. log.Debug("Got the following map from store: %v", result)
  11. t.store.Purge()
  12. for k, v := range result {
  13. t.store.Set(path.Join("/", strings.TrimPrefix(k, t.Prefix)), v)
  14. }
  15. return nil
  16. }

2.3.3. createStageFile

createStageFile通过srctemplate文件和最新内存中的变量数据生成StageFile,该文件在sync中和目标文件进行比较,看是否有修改。即StageFile实际上是根据后端存储生成的最新的配置文件,如果这份配置文件跟当前的配置文件不同,表明后端存储的数据被更新了需要重新生成一份新的配置文件。

  1. // createStageFile stages the src configuration file by processing the src
  2. // template and setting the desired owner, group, and mode. It also sets the
  3. // StageFile for the template resource.
  4. // It returns an error if any.
  5. func (t *TemplateResource) createStageFile() error {
  6. log.Debug("Using source template " + t.Src)
  7. if !util.IsFileExist(t.Src) {
  8. return errors.New("Missing template: " + t.Src)
  9. }
  10. log.Debug("Compiling source template " + t.Src)
  11. tmpl, err := template.New(filepath.Base(t.Src)).Funcs(t.funcMap).ParseFiles(t.Src)
  12. if err != nil {
  13. return fmt.Errorf("Unable to process template %s, %s", t.Src, err)
  14. }
  15. // create TempFile in Dest directory to avoid cross-filesystem issues
  16. temp, err := ioutil.TempFile(filepath.Dir(t.Dest), "."+filepath.Base(t.Dest))
  17. if err != nil {
  18. return err
  19. }
  20. if err = tmpl.Execute(temp, nil); err != nil {
  21. temp.Close()
  22. os.Remove(temp.Name())
  23. return err
  24. }
  25. defer temp.Close()
  26. // Set the owner, group, and mode on the stage file now to make it easier to
  27. // compare against the destination configuration file later.
  28. os.Chmod(temp.Name(), t.FileMode)
  29. os.Chown(temp.Name(), t.Uid, t.Gid)
  30. t.StageFile = temp
  31. return nil
  32. }

2.3.4. sync

  1. if err := t.sync(); err != nil {
  2. return err
  3. }

t.sync()是执行confd核心功能的函数,将配置文件通过模板的方式自动生成,并执行检查命令和reload命令。该部分逻辑在本文第三部分分析。

3. sync

sync通过比较源文件和目标文件的差别,如果不同则重新生成新的配置,当设置了check_cmdreload_cmd的时候,会执行check_cmd指定的检查命令,如果都没有问题则执行reload_cmd中指定的reload命令。

3.1. IsConfigChanged

IsConfigChanged比较源文件和目标文件是否相等,其中比较内容包括:UidGidModeMd5。只要其中任意值不同则认为两个文件不同。

  1. // IsConfigChanged reports whether src and dest config files are equal.
  2. // Two config files are equal when they have the same file contents and
  3. // Unix permissions. The owner, group, and mode must match.
  4. // It return false in other cases.
  5. func IsConfigChanged(src, dest string) (bool, error) {
  6. if !IsFileExist(dest) {
  7. return true, nil
  8. }
  9. d, err := FileStat(dest)
  10. if err != nil {
  11. return true, err
  12. }
  13. s, err := FileStat(src)
  14. if err != nil {
  15. return true, err
  16. }
  17. if d.Uid != s.Uid {
  18. log.Info(fmt.Sprintf("%s has UID %d should be %d", dest, d.Uid, s.Uid))
  19. }
  20. if d.Gid != s.Gid {
  21. log.Info(fmt.Sprintf("%s has GID %d should be %d", dest, d.Gid, s.Gid))
  22. }
  23. if d.Mode != s.Mode {
  24. log.Info(fmt.Sprintf("%s has mode %s should be %s", dest, os.FileMode(d.Mode), os.FileMode(s.Mode)))
  25. }
  26. if d.Md5 != s.Md5 {
  27. log.Info(fmt.Sprintf("%s has md5sum %s should be %s", dest, d.Md5, s.Md5))
  28. }
  29. if d.Uid != s.Uid || d.Gid != s.Gid || d.Mode != s.Mode || d.Md5 != s.Md5 {
  30. return true, nil
  31. }
  32. return false, nil
  33. }

如果文件发生改变则执行check_cmd命令(有配置的情况下),重新生成配置文件,并执行reload_cmd命令(有配置的情况下)。

  1. if ok {
  2. log.Info("Target config " + t.Dest + " out of sync")
  3. if !t.syncOnly && t.CheckCmd != "" {
  4. if err := t.check(); err != nil {
  5. return errors.New("Config check failed: " + err.Error())
  6. }
  7. }
  8. log.Debug("Overwriting target config " + t.Dest)
  9. err := os.Rename(staged, t.Dest)
  10. if err != nil {
  11. if strings.Contains(err.Error(), "device or resource busy") {
  12. log.Debug("Rename failed - target is likely a mount. Trying to write instead")
  13. // try to open the file and write to it
  14. var contents []byte
  15. var rerr error
  16. contents, rerr = ioutil.ReadFile(staged)
  17. if rerr != nil {
  18. return rerr
  19. }
  20. err := ioutil.WriteFile(t.Dest, contents, t.FileMode)
  21. // make sure owner and group match the temp file, in case the file was created with WriteFile
  22. os.Chown(t.Dest, t.Uid, t.Gid)
  23. if err != nil {
  24. return err
  25. }
  26. } else {
  27. return err
  28. }
  29. }
  30. if !t.syncOnly && t.ReloadCmd != "" {
  31. if err := t.reload(); err != nil {
  32. return err
  33. }
  34. }
  35. log.Info("Target config " + t.Dest + " has been updated")
  36. } else {
  37. log.Debug("Target config " + t.Dest + " in sync")
  38. }

3.2. check

check检查暂存的配置文件即stageFile,该文件是由最新的后端存储中的数据生成的。

  1. if !t.syncOnly && t.CheckCmd != "" {
  2. if err := t.check(); err != nil {
  3. return errors.New("Config check failed: " + err.Error())
  4. }
  5. }

t.check()只是执行配置文件中checkcmd参数指定的命令而已,根据是否执行成功来返回报错。当check命令产生错误的是,则直接return报错,不再执行重新生成配置文件和`reload的操作了。

  1. // check executes the check command to validate the staged config file. The
  2. // command is modified so that any references to src template are substituted
  3. // with a string representing the full path of the staged file. This allows the
  4. // check to be run on the staged file before overwriting the destination config
  5. // file.
  6. // It returns nil if the check command returns 0 and there are no other errors.
  7. func (t *TemplateResource) check() error {
  8. var cmdBuffer bytes.Buffer
  9. data := make(map[string]string)
  10. data["src"] = t.StageFile.Name()
  11. tmpl, err := template.New("checkcmd").Parse(t.CheckCmd)
  12. if err != nil {
  13. return err
  14. }
  15. if err := tmpl.Execute(&cmdBuffer, data); err != nil {
  16. return err
  17. }
  18. return runCommand(cmdBuffer.String())
  19. }

check会通过模板解析的方式解析出checkcmd中的{{.src}}部分,并用stageFile来替代。即check的命令是拉取最新后端存储的数据形成临时配置文件(stageFile),并通过指定的checkcmd来检查最新的临时配置文件是否合法,如果合法则替换会新的配置文件,否则返回错误。

3.3. Overwriting

staged文件命名为Dest文件的名字,读取staged文件中的内容并将它写入到Dest文件中,该过程实际上就是重新生成一份新的配置文件。staged文件的生成逻辑在函数createStageFile中。

  1. log.Debug("Overwriting target config " + t.Dest)
  2. err := os.Rename(staged, t.Dest)
  3. if err != nil {
  4. if strings.Contains(err.Error(), "device or resource busy") {
  5. log.Debug("Rename failed - target is likely a mount. Trying to write instead")
  6. // try to open the file and write to it
  7. var contents []byte
  8. var rerr error
  9. contents, rerr = ioutil.ReadFile(staged)
  10. if rerr != nil {
  11. return rerr
  12. }
  13. err := ioutil.WriteFile(t.Dest, contents, t.FileMode)
  14. // make sure owner and group match the temp file, in case the file was created with WriteFile
  15. os.Chown(t.Dest, t.Uid, t.Gid)
  16. if err != nil {
  17. return err
  18. }
  19. } else {
  20. return err
  21. }
  22. }

3.4. reload

如果没有指定syncOnly参数并且指定了ReloadCmd则执行reload操作。

  1. if !t.syncOnly && t.ReloadCmd != "" {
  2. if err := t.reload(); err != nil {
  3. return err
  4. }
  5. }

其中t.reload()实现如下:

  1. // reload executes the reload command.
  2. // It returns nil if the reload command returns 0.
  3. func (t *TemplateResource) reload() error {
  4. return runCommand(t.ReloadCmd)
  5. }

t.reload()t.check()都调用了runCommand函数:

  1. // runCommand is a shared function used by check and reload
  2. // to run the given command and log its output.
  3. // It returns nil if the given cmd returns 0.
  4. // The command can be run on unix and windows.
  5. func runCommand(cmd string) error {
  6. log.Debug("Running " + cmd)
  7. var c *exec.Cmd
  8. if runtime.GOOS == "windows" {
  9. c = exec.Command("cmd", "/C", cmd)
  10. } else {
  11. c = exec.Command("/bin/sh", "-c", cmd)
  12. }
  13. output, err := c.CombinedOutput()
  14. if err != nil {
  15. log.Error(fmt.Sprintf("%q", string(output)))
  16. return err
  17. }
  18. log.Debug(fmt.Sprintf("%q", string(output)))
  19. return nil
  20. }

4. redisClient.WatchPrefix

redisClient.WatchPrefix是当用户设置了watch参数的时候,并且存储后端为redis,则会调用到redis的watch机制。其中redisClient.WatchPrefix是redis存储类型的时候实现了StoreClient接口的WatchPrefix方法。

  1. // The StoreClient interface is implemented by objects that can retrieve
  2. // key/value pairs from a backend store.
  3. type StoreClient interface {
  4. GetValues(keys []string) (map[string]string, error)
  5. WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error)
  6. }

StoreClient是对后端存储类型的抽象,常用的后端存储类型有EtcdRedis等,不同的后端存储类型GetValuesWatchPrefix的具体实现不同,本文主要分析Redis类型的watch机制。

4.1. WatchPrefix

WatchPrefix的调用函数在monitorPrefix的部分,具体参考:

  1. func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
  2. defer p.wg.Done()
  3. keys := util.AppendPrefix(t.Prefix, t.Keys)
  4. for {
  5. index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
  6. if err != nil {
  7. p.errChan <- err
  8. // Prevent backend errors from consuming all resources.
  9. time.Sleep(time.Second * 2)
  10. continue
  11. }
  12. t.lastIndex = index
  13. if err := t.process(); err != nil {
  14. p.errChan <- err
  15. }
  16. }
  17. }

redis的watch主要通过pub-sub的机制,即WatchPrefix会根据传入的prefix起一个sub的监听机制,而在写入redis的数据的同时需要执行redis的publish操作,channel为符合prefix的值,value为给定命令之一,实际上是给定命令之一,具体是什么命令并没有关系,则会触发watch机制,从而自动更新配置,给定的命令列表如下:

  1. "del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"

sub监听的key的格式如下:

  1. __keyspace@0__:{prefix}/*

如果只是写入redis数据而没有自动执行publish的操作,并不会触发redis的watch机制来自动更新配置。但是如果使用etcd,则etcd的watch机制,只需要用户写入或更新数据就可以自动触发更新配置。

WatchPrefix源码如下:

  1. func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
  2. if waitIndex == 0 {
  3. return 1, nil
  4. }
  5. if len(c.pscChan) > 0 {
  6. var respChan watchResponse
  7. for len(c.pscChan) > 0 {
  8. respChan = <-c.pscChan
  9. }
  10. return respChan.waitIndex, respChan.err
  11. }
  12. go func() {
  13. if c.psc.Conn == nil {
  14. rClient, db, err := tryConnect(c.machines, c.password, false);
  15. if err != nil {
  16. c.psc = redis.PubSubConn{Conn: nil}
  17. c.pscChan <- watchResponse{0, err}
  18. return
  19. }
  20. c.psc = redis.PubSubConn{Conn: rClient}
  21. go func() {
  22. defer func() {
  23. c.psc.Close()
  24. c.psc = redis.PubSubConn{Conn: nil}
  25. }()
  26. for {
  27. switch n := c.psc.Receive().(type) {
  28. case redis.PMessage:
  29. log.Debug(fmt.Sprintf("Redis Message: %s %s\n", n.Channel, n.Data))
  30. data := string(n.Data)
  31. commands := [12]string{"del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"}
  32. for _, command := range commands {
  33. if command == data {
  34. c.pscChan <- watchResponse{1, nil}
  35. break
  36. }
  37. }
  38. case redis.Subscription:
  39. log.Debug(fmt.Sprintf("Redis Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count))
  40. if n.Count == 0 {
  41. c.pscChan <- watchResponse{0, nil}
  42. return
  43. }
  44. case error:
  45. log.Debug(fmt.Sprintf("Redis error: %v\n", n))
  46. c.pscChan <- watchResponse{0, n}
  47. return
  48. }
  49. }
  50. }()
  51. c.psc.PSubscribe("__keyspace@" + strconv.Itoa(db) + "__:" + c.transform(prefix) + "*")
  52. }
  53. }()
  54. select {
  55. case <-stopChan:
  56. c.psc.PUnsubscribe()
  57. return waitIndex, nil
  58. case r := <- c.pscChan:
  59. return r.waitIndex, r.err
  60. }
  61. }

5. 总结

  1. confd的作用是通过将配置存放到存储后端,来自动触发更新配置的功能,其中常用的后端有EtcdRedis等。
  2. 不同的存储后端,watch机制不同,例如Etcd只需要更新key便可以触发自动更新配置的操作,而redis除了更新key还需要执行publish的操作。
  3. 可以通过配置check_cmd来校验配置文件是否正确,如果配置文件非法则不会执行自动更新配置和reload的操作,但是当存储后端存入的非法数据,会导致每次校验都是失败的,即使后面新增的配置部分是合法的,所以需要有机制来控制存入存储后端的数据始终是合法的。

参考: