Iptables-mode Proxier

概述

kube-Proxy提供三种模式(userspace/iptables/ipvs)的proxier实现,userspace是早期的proxy模式,ipvs模式处于实验性阶段proxy模式,本文先从默认的内核级iptables proxier代码实现与逻辑分析开始,其它模式将用专文解析源码。

Iptables-mode Proxier的service配置和代码内都包含一些基础概念如clusterIP、nodeport、loadbalancer、Ingress、ClusterCIDR、onlyLocal、ExternalIP等,请在了解源码之前先熟悉其概念用途场景与类型区别,再看源码将对你理解proxy事半功倍。当然也需要对netfilter、iptables、connTrack等proxy基础依赖的工具熟悉。基础概念部分在本文将不深入介绍,有需求可自行查阅相关资料。

从kube-proxy组件整体框架层的代码来看,在ProxyServer.Run()最后走到了s.Proxier.SyncLoop()执行空间一直无限loop下去。而默认的ProxyServer配置的Proxier对象就是Iptables(if proxyMode == proxyModeIPTables),将调用iptabls-mode的Proxier.SyncLoop(),SyncLoop()时间定时循环执行syncProxyRules()完成services、endpoints与iptables规则的同步操作。

Iptables-mode proxier的负载均衡机制是通过底层netfliter/iptables规则来实现的,通过Informer机制watch服务与端点信息的变更事件触发对iptables的规则的同步更新,如下代码逻辑示意图:

iptables-proxier

下面proxier源码分析,我们先从proxier的接口、实现类、实现类方法列表一窥究竟,从结构上看整体Proxier的框架。然后我们再详细分析proxier对象的产生时所定义的属性值、值类型和用途。有了前面的两项的了解后我们再来分析proxier类方法的实现,也就是proxier代理逻辑部分(关键逻辑部分在syncProxyRules()方法分析部分)。最后我们分析proxier底层内核iptables的runner实现,也就是proxy上层逻辑层最终会调用iptables命令去执行规则的操作部分。

Proxier 数据结构与类定义

ProxyProvider 代理提供者接口定义,需要实现两个proxy的关键方法Sync()和SyncLoop()

!FILENAME pkg/proxy/types.go:27

  1. type ProxyProvider interface {
  2. // Sync 即时同步Proxy提供者的当前状态至proxy规则
  3. Sync()
  4. // SyncLoop 周期性运行
  5. // 作为一个线程或应用主loop运行,无返回.
  6. SyncLoop()
  7. }

Iptables-mode Proxier 为 ProxyProvider 接口实现类,proxier 类属性项比较多,我们先看一下注释用途与结构定义,在实例化proxier对象时我们再详看。

!FILENAME pkg/proxy/iptables/proxier.go:205

  1. type Proxier struct {
  2. endpointsChanges *proxy.EndpointChangeTracker // 端点更新信息跟踪器
  3. serviceChanges *proxy.ServiceChangeTracker // 服务更新信息跟踪器
  4. mu sync.Mutex // 保护同步锁
  5. serviceMap proxy.ServiceMap // 存放服务列表信息 ①
  6. endpointsMap proxy.EndpointsMap // 存放端点列表信息 ②
  7. portsMap map[utilproxy.LocalPort]utilproxy.Closeable //端口关闭接口map
  8. endpointsSynced bool // ep同步状态
  9. servicesSynced bool // svc同步状态
  10. initialized int32 // 初始化状态
  11. syncRunner *async.BoundedFrequencyRunner // 指定频率运行器,此处用于管理对
  12. // syncProxyRules的调用
  13. iptables utiliptables.Interface // iptables命令执行接口
  14. masqueradeAll bool
  15. masqueradeMark string // SNAT地址伪装Mark
  16. exec utilexec.Interface // exec命令执行工具接口
  17. clusterCIDR string // 集群CIDR
  18. hostname string // 主机名
  19. nodeIP net.IP // 节点IP地址
  20. portMapper utilproxy.PortOpener // TCP/UTP端口打开与监听
  21. recorder record.EventRecorder // 事件记录器
  22. healthChecker healthcheck.Server // healthcheck服务器对象
  23. healthzServer healthcheck.HealthzUpdater // Healthz更新器
  24. precomputedProbabilities []string //预计算可能性
  25. //iptables规则与链数据(Filter/NAT)
  26. iptablesData *bytes.Buffer
  27. existingFilterChainsData *bytes.Buffer
  28. filterChains *bytes.Buffer
  29. filterRules *bytes.Buffer
  30. natChains *bytes.Buffer
  31. natRules *bytes.Buffer
  32. endpointChainsNumber int
  33. // Node节点IP与端口信息
  34. nodePortAddresses []string
  35. networkInterfacer utilproxy.NetworkInterfacer //网络接口
  36. }

① ServiceMap和ServicePort定义

!FILENAME pkg/proxy/service.go:229

  1. type ServiceMap map[ServicePortName]ServicePort
  2. //String() => "NS/SvcName:PortName"
  3. ServicePortName{NamespacedName: svcName, Port: servicePort.Name}

ServiceSpec service.spec定义,在用户前端可定义service的spec配置项。

!FILENAME vendor/k8s.io/api/core/v1/types.go:3606

  1. type ServiceSpec struct {
  2. Ports []ServicePort //服务端口列表
  3. Selector map[string]string //选择器
  4. ClusterIP string //VIP 、 portal
  5. Type ServiceType //服务类型
  6. ExternalIPs []string //外部IP列表,如外部负载均衡
  7. SessionAffinity ServiceAffinity //会话保持
  8. LoadBalancerIP string //service类型为"LoadBalancer"时配置LB ip
  9. LoadBalancerSourceRanges []string //cloud-provider的限制client ip区间
  10. ExternalName string
  11. ExternalTrafficPolicy ServiceExternalTrafficPolicyType
  12. HealthCheckNodePort int32
  13. PublishNotReadyAddresses bool
  14. SessionAffinityConfig *SessionAffinityConfig //会话保持配置信息
  15. }

ServicePort类定义和ServicePort接口

!FILENAME vendor/k8s.io/api/core/v1/types.go:3563

  1. type ServicePort struct {
  2. Name string
  3. Protocol Protocol
  4. Port int32
  5. TargetPort intstr.IntOrString
  6. NodePort int32
  7. }
  8. //ServicePort接口
  9. type ServicePort interface {
  10. // 返回服务字串,格式如: `IP:Port/Protocol`.
  11. String() string
  12. // 返回集群IP字串
  13. ClusterIPString() string
  14. // 返回协议
  15. GetProtocol() v1.Protocol
  16. // 返回健康检测端口
  17. GetHealthCheckNodePort() int
  18. }

② EndpointsMap定义与Endpoint接口

!FILENAME pkg/proxy/endpoints.go:181

  1. type EndpointsMap map[ServicePortName][]Endpoint
  2. type Endpoint interface {
  3. // 返回endpoint字串,格式 `IP:Port`.
  4. String() string
  5. // 是否本地
  6. GetIsLocal() bool
  7. // 返回IP
  8. IP() string
  9. // 返回端口
  10. Port() (int, error)
  11. // 检测两上endpoint是否相等
  12. Equal(Endpoint) bool
  13. }

Endpoints结构与相关定义

!FILENAME vendor/k8s.io/api/core/v1/types.go:3710

  1. type Endpoints struct {
  2. metav1.TypeMeta
  3. metav1.ObjectMeta
  4. Subsets []EndpointSubset
  5. }
  6. type EndpointSubset struct {
  7. Addresses []EndpointAddress // EndpointAddress地址列表
  8. NotReadyAddresses []EndpointAddress
  9. Ports []EndpointPort // EndpointPort端口列表
  10. }
  11. type EndpointAddress struct {
  12. IP string
  13. Hostname string
  14. NodeName *string
  15. TargetRef *ObjectReference
  16. }
  17. type EndpointPort struct {
  18. Name string
  19. Port int32
  20. Protocol Protocol
  21. }

Iptables-mode Proxier提供的方法列表,先大概从名称上来了解一下方法用途,后面我在逻辑部分对主要使用的方法再深入分析。

  1. func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {/*...*/}
  2. func (proxier *Proxier) probability(n int) string{/*...*/}
  3. func (proxier *Proxier) Sync(){/*...*/}
  4. func (proxier *Proxier) SyncLoop(){/*...*/}
  5. func (proxier *Proxier) setInitialized(value bool){/*...*/}
  6. func (proxier *Proxier) isInitialized() bool{/*...*/}
  7. func (proxier *Proxier) OnServiceAdd(service *v1.Service){/*...*/}
  8. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service){/*...*/}
  9. func (proxier *Proxier) OnServiceDelete(service *v1.Service){/*...*/}
  10. func (proxier *Proxier) OnServiceSynced(){/*...*/}
  11. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints){/*...*/}
  12. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints){/*...*/}
  13. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {/*...*/}
  14. func (proxier *Proxier) OnEndpointsSynced() {/*...*/}
  15. func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint){/*...*/}
  16. func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string){/*...*/}
  17. func (proxier *Proxier) syncProxyRules(){/*...*/}

Proxier对象生成与运行

iptables Proxier 构建New方法,下面省略部分校验的代码,关注关键构造部分。

!FILENAME pkg/proxy/iptables/proxier.go:281

  1. func NewProxier(ipt utiliptables.Interface,
  2. sysctl utilsysctl.Interface,
  3. exec utilexec.Interface,
  4. syncPeriod time.Duration,
  5. minSyncPeriod time.Duration,
  6. masqueradeAll bool,
  7. masqueradeBit int,
  8. clusterCIDR string,
  9. hostname string,
  10. nodeIP net.IP,
  11. recorder record.EventRecorder,
  12. healthzServer healthcheck.HealthzUpdater,
  13. nodePortAddresses []string,
  14. ) (*Proxier, error) {
  15. // ...以下为省略部分解析...
  16. // sysctl对"net/ipv4/conf/all/route_localnet"设置 ,内核iptables支持
  17. // sysctl对"net/bridge/bridge-nf-call-iptables"设置,内核iptables支持
  18. // 生产SNAT的IP伪装mark
  19. // 如果节点IP为空,则kube-proxy的nodeIP的初始IP为127.0.0.1
  20. // 集群CIDR检验是否为空,IPv6验证
  21. // ...
  22. //healthcheck服务器对象
  23. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil)
  24. //proxier对象
  25. proxier := &Proxier{
  26. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  27. serviceMap: make(proxy.ServiceMap), //svc存放map
  28. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), //svc变化跟踪器
  29. endpointsMap: make(proxy.EndpointsMap), //ep存放map
  30. endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder), //ep变化跟踪器
  31. iptables: ipt,
  32. masqueradeAll: masqueradeAll,
  33. masqueradeMark: masqueradeMark,
  34. exec: exec,
  35. clusterCIDR: clusterCIDR,
  36. hostname: hostname,
  37. nodeIP: nodeIP,
  38. portMapper: &listenPortOpener{}, //服务端口监听器
  39. recorder: recorder,
  40. healthChecker: healthChecker,
  41. healthzServer: healthzServer,
  42. precomputedProbabilities: make([]string, 0, 1001),
  43. iptablesData: bytes.NewBuffer(nil), //iptables配置规则数据
  44. existingFilterChainsData: bytes.NewBuffer(nil),
  45. filterChains: bytes.NewBuffer(nil),
  46. filterRules: bytes.NewBuffer(nil),
  47. natChains: bytes.NewBuffer(nil),
  48. natRules: bytes.NewBuffer(nil),
  49. nodePortAddresses: nodePortAddresses,
  50. networkInterfacer: utilproxy.RealNetwork{}, //网络接口
  51. }
  52. burstSyncs := 2
  53. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  54. //运行器执行指定频率对syncProxyRules调用来同步规则,关键的逻辑则在syncProxyRules()方法内,后面有详述方法
  55. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  56. return proxier, nil
  57. }

proxier.SyncLoop() 我们知道proxy server的运行时最后的调用就是”s.Proxier.SyncLoop()”,此处我们来详细了解一下SyncLoop的proxier运行实现。

!FILENAME pkg/proxy/iptables/proxier.go:487

  1. func (proxier *Proxier) SyncLoop() {
  2. if proxier.healthzServer != nil {
  3. proxier.healthzServer.UpdateTimestamp()
  4. }
  5. // proxier.syncRunner在proxier对象创建时指定为async.NewBoundedFrequencyRunner(...) ①
  6. proxier.syncRunner.Loop(wait.NeverStop)
  7. }

async.BoundedFrequencyRunner时间器循环func执行器。

proxier类结构内定义的proxier.syncRunner 类型为async.BoundedFrequencyRunner

!FILENAME pkg/util/async/bounded_frequency_runner.go:31

  1. type BoundedFrequencyRunner struct {
  2. name string
  3. minInterval time.Duration // 两次运行的最小间隔时间
  4. maxInterval time.Duration // 两次运行的最大间隔时间
  5. run chan struct{} // 执行一次run
  6. mu sync.Mutex
  7. fn func() // 需要运行的func
  8. lastRun time.Time // 最近一次运行时间
  9. timer timer // 定时器
  10. limiter rateLimiter // 按需限制运行的QPS
  11. }

BoundedFrequencyRunner实例化构建,通过传参按需来控制对func的调用。

!FILENAME pkg/util/async/bounded_frequency_runner.go:134

  1. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  2. timer := realTimer{Timer: time.NewTimer(0)} // 立即tick
  3. <-timer.C() // 消费第一次tick,完成一执行run
  4. return construct(name, fn, minInterval, maxInterval, burstRuns, timer) // ->
  5. }
  6. //实例构建
  7. func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
  8. if maxInterval < minInterval {
  9. panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval))
  10. }
  11. if timer == nil {
  12. panic(fmt.Sprintf("%s: timer must be non-nil", name))
  13. }
  14. bfr := &BoundedFrequencyRunner{
  15. name: name,
  16. fn: fn, //被调用处理的func
  17. minInterval: minInterval,
  18. maxInterval: maxInterval,
  19. run: make(chan struct{}, 1),
  20. timer: timer,
  21. }
  22. if minInterval == 0 { //最小间隔时间如果不指定,将不受限制
  23. bfr.limiter = nullLimiter{}
  24. } else {
  25. // TokenBucketRateLimiter的实现流控机制,有兴趣可以再深入了解机制,此处不展开
  26. qps := float32(time.Second) / float32(minInterval)
  27. bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
  28. }
  29. return bfr
  30. }

proxier.syncRunner.Loop() 时间器循环运行的实现

  1. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  2. klog.V(3).Infof("%s Loop running", bfr.name)
  3. bfr.timer.Reset(bfr.maxInterval)
  4. for {
  5. select {
  6. case <-stop: // 停止channel实现关闭loop
  7. bfr.stop()
  8. klog.V(3).Infof("%s Loop stopping", bfr.name)
  9. return
  10. case <-bfr.timer.C(): //定时器tick运行
  11. bfr.tryRun()
  12. case <-bfr.run: // 执行一次运行
  13. bfr.tryRun()
  14. }
  15. }
  16. }

BoundedFrequencyRunner.tryRun() 按指定频率对func的运行

!FILENAME:pkg/util/async/bounded_frequency_runner.go:211

  1. func (bfr *BoundedFrequencyRunner) tryRun() {
  2. bfr.mu.Lock()
  3. defer bfr.mu.Unlock()
  4. //限制条件允许运行func
  5. if bfr.limiter.TryAccept() {
  6. bfr.fn() // 调用func
  7. bfr.lastRun = bfr.timer.Now() // 记录运行时间
  8. bfr.timer.Stop()
  9. bfr.timer.Reset(bfr.maxInterval) // 重设下次运行时间
  10. klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
  11. return
  12. }
  13. //限制条件不允许运行,计算下次运行时间
  14. elapsed := bfr.timer.Since(bfr.lastRun) // elapsed:上次运行时间到现在已过多久
  15. nextPossible := bfr.minInterval - elapsed // nextPossible:下次运行至少差多久(最小周期)
  16. nextScheduled := bfr.maxInterval - elapsed // nextScheduled:下次运行最迟差多久(最大周期)
  17. klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
  18. if nextPossible < nextScheduled {
  19. // Set the timer for ASAP, but don't drain here. Assuming Loop is running,
  20. // it might get a delivery in the mean time, but that is OK.
  21. bfr.timer.Stop()
  22. bfr.timer.Reset(nextPossible)
  23. klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
  24. }
  25. }

Proxier 服务与端点更新 Tracker

kube-proxy需要及时同步services和endpoints的变化信息,前面我们看到proxier类对象有两个属性:serviceChangesendpointsChanges是就是用来跟踪Service和Endpoint的更新信息,我们先来分析与之相关这两个类ServiceChangeTracker和EndpointChangeTracker。

ServiceChangeTracker 服务信息变更Tracker

!FILENAME pkg/proxy/service.go:143

  1. type ServiceChangeTracker struct {
  2. // 同步锁保护items
  3. lock sync.Mutex
  4. // items为service变化记录map
  5. items map[types.NamespacedName]*serviceChange
  6. // makeServiceInfo允许proxier在处理服务时定制信息
  7. makeServiceInfo makeServicePortFunc
  8. isIPv6Mode *bool //IPv6
  9. recorder record.EventRecorder //事件记录器
  10. }
  11. //serviceChange类型定义, <previous, current> 新旧服务对.
  12. type serviceChange struct {
  13. previous ServiceMap
  14. current ServiceMap
  15. }
  16. //ServiceMap类型定义
  17. type ServiceMap map[ServicePortName]ServicePort
  18. //ServicePortName类型定义
  19. type ServicePortName struct {
  20. types.NamespacedName
  21. Port string
  22. }
  23. //NamespacedName类型定义
  24. type NamespacedName struct {
  25. Namespace string
  26. Name string
  27. }
  28. //实例化ServiceChangeTracker对象
  29. func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker {
  30. return &ServiceChangeTracker{
  31. items: make(map[types.NamespacedName]*serviceChange),
  32. makeServiceInfo: makeServiceInfo,
  33. isIPv6Mode: isIPv6Mode,
  34. recorder: recorder,
  35. }
  36. }

EndpointChangeTracker.Update()

!FILENAME pkg/proxy/service.go:173

  1. func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
  2. endpoints := current
  3. if endpoints == nil {
  4. endpoints = previous
  5. }
  6. // previous == nil && current == nil is unexpected, we should return false directly.
  7. if endpoints == nil {
  8. return false
  9. }
  10. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  11. ect.lock.Lock()
  12. defer ect.lock.Unlock()
  13. change, exists := ect.items[namespacedName]
  14. if !exists {
  15. change = &endpointsChange{}
  16. change.previous = ect.endpointsToEndpointsMap(previous)
  17. ect.items[namespacedName] = change
  18. }
  19. change.current = ect.endpointsToEndpointsMap(current)
  20. // if change.previous equal to change.current, it means no change
  21. if reflect.DeepEqual(change.previous, change.current) {
  22. delete(ect.items, namespacedName)
  23. }
  24. return len(ect.items) > 0
  25. }

EndpointChangeTracker 端点信息变更Tracker

!FILENAME pkg/proxy/endpoints.go:83

  1. type EndpointChangeTracker struct {
  2. lock sync.Mutex
  3. // kube-proxy运行的主机名.
  4. hostname string
  5. // items为"endpoints"变化记录map
  6. items map[types.NamespacedName]*endpointsChange
  7. makeEndpointInfo makeEndpointFunc
  8. isIPv6Mode *bool
  9. recorder record.EventRecorder
  10. }
  11. //endpointsChange类型定义,<previous, current> 新旧端点对
  12. type endpointsChange struct {
  13. previous EndpointsMap
  14. current EndpointsMap
  15. }
  16. //EndpointsMap类型定义
  17. type EndpointsMap map[ServicePortName][]Endpoint
  18. //Endpoint接口
  19. type Endpoint interface {
  20. // 返回endpoint字串 如格式: `IP:Port`.
  21. String() string
  22. // 是否本地主机
  23. GetIsLocal() bool
  24. // 返回endpoint的IP部分
  25. IP() string
  26. // 返回endpoint的Port部分
  27. Port() (int, error)
  28. // 计算两个endpoint是否相等
  29. Equal(Endpoint) bool
  30. }
  31. //实例化NewEndpointChangeTracker对象
  32. func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
  33. return &EndpointChangeTracker{
  34. hostname: hostname,
  35. items: make(map[types.NamespacedName]*endpointsChange),
  36. makeEndpointInfo: makeEndpointInfo,
  37. isIPv6Mode: isIPv6Mode,
  38. recorder: recorder,
  39. }
  40. }

EndpointChangeTracker.Update()

!FILENAME pkg/proxy/endpoints.go:116

  1. func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
  2. endpoints := current
  3. if endpoints == nil {
  4. endpoints = previous
  5. }
  6. // previous == nil && current == nil is unexpected, we should return false directly.
  7. if endpoints == nil {
  8. return false
  9. }
  10. namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
  11. ect.lock.Lock()
  12. defer ect.lock.Unlock()
  13. change, exists := ect.items[namespacedName]
  14. if !exists {
  15. change = &endpointsChange{}
  16. change.previous = ect.endpointsToEndpointsMap(previous)
  17. ect.items[namespacedName] = change
  18. }
  19. change.current = ect.endpointsToEndpointsMap(current)
  20. // if change.previous equal to change.current, it means no change
  21. if reflect.DeepEqual(change.previous, change.current) {
  22. delete(ect.items, namespacedName)
  23. }
  24. return len(ect.items) > 0
  25. }

Proxier服务与端点同步(Add / delete /update)都将调用ChangeTracker的update()来执行syncProxyRules

Proxier.OnServiceSynced()服务信息同步

!FILENAME pkg/proxy/iptables/proxier.go:528

  1. func (proxier *Proxier) OnServiceSynced() {
  2. proxier.mu.Lock()
  3. proxier.servicesSynced = true
  4. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  5. proxier.mu.Unlock()
  6. // 非周期同步,单次执行同步
  7. proxier.syncProxyRules()
  8. }

service服务更新(add/delete/update),都调用Proxier.OnServiceUpdate()方法

!FILENAME pkg/proxy/iptables/proxier.go:513

  1. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  2. proxier.OnServiceUpdate(nil, service) //传参一:oldService为nil
  3. }
  4. // update Service 包含Add / Delete
  5. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  6. if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
  7. proxier.syncRunner.Run() //单次执行
  8. }
  9. }
  10. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  11. proxier.OnServiceUpdate(service, nil) //传参二:newService为nil
  12. }

Proxier.OnEndpointsSynced()端点信息同步

!FILENAME pkg/proxy/iptables/proxier.go:552

  1. func (proxier *Proxier) OnEndpointsSynced() {
  2. proxier.mu.Lock()
  3. proxier.endpointsSynced = true
  4. proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
  5. proxier.mu.Unlock()
  6. // Sync unconditionally - this is called once per lifetime.
  7. proxier.syncProxyRules()
  8. }

同service服务一样endpoint更新(add/delete/update),都调用Proxier.OnEndpointsUpdate()方法

!FILENAME pkg/proxy/iptables/proxier.go:538

  1. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  2. proxier.OnEndpointsUpdate(nil, endpoints) //传参一:oldEndpoints为nil
  3. }
  4. // update endpoints,包含 Add / Delete
  5. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  6. if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
  7. proxier.syncRunner.Run()
  8. }
  9. }
  10. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  11. proxier.OnEndpointsUpdate(endpoints, nil) //传参二:endpoints为nil
  12. }

syncProxyRule 同步配置与规则

proxier.syncProxyRules() 实现监听svc或ep更新配置到iptables规则的一致性同步机制功能,这也是iptables proxer最核心的逻辑代码。作者实现是利用了iptables-save/iptables-restore机制将现存的iptables配置和服务与端点同步的信息来生成相对应的iptables链与规则数据,每次同步执行写入可restore标准格式的规则数据后通过iptables-restore命令进行重设iptables规则。

这个同步规则处理代码比较长,我们后面将分解成小块来讲解。下面为syncProxyRules(部分已解析注释替代)代码框架说明,内注释的每块内容在后面都将有单独代码分析说明。

!FILENAME pkg/proxy/iptables/proxier.go:634

  1. func (proxier *Proxier) syncProxyRules() {
  2. proxier.mu.Lock()
  3. defer proxier.mu.Unlock()
  4. start := time.Now()
  5. defer func() {
  6. metrics.SyncProxyRulesLatency.Observe(metrics.SinceInMicroseconds(start))
  7. klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
  8. }()
  9. // don't sync rules till we've received services and endpoints
  10. if !proxier.endpointsSynced || !proxier.servicesSynced {
  11. klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
  12. return
  13. }
  14. // 检测与更新变化(service/endpoints)
  15. //...
  16. // 创建与联接kube链
  17. //...
  18. // 获取现存在的Filter/Nat表链数据
  19. //...
  20. // 创建iptables-save/restore格式数据(表头、链)
  21. //...
  22. // 写kubernets特有的SNAT地址伪装规则
  23. //...
  24. // Accumulate NAT chains to keep.
  25. activeNATChains := map[utiliptables.Chain]bool{}
  26. // Accumulate the set of local ports that we will be holding open once this update is complete
  27. replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}
  28. endpoints := make([]*endpointsInfo, 0)
  29. endpointChains := make([]utiliptables.Chain, 0)
  30. args := make([]string, 64)
  31. // Compute total number of endpoint chains across all services.
  32. proxier.endpointChainsNumber = 0
  33. for svcName := range proxier.serviceMap {
  34. proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
  35. }
  36. // 为个"服务"创建rules(service portal规则的创建)
  37. for svcName, svc := range proxier.serviceMap {
  38. //...
  39. }
  40. // 删除不再使用的链
  41. //...
  42. // nodeports链
  43. //...
  44. // FORWARD策略
  45. //...
  46. // 配置clusterCIDR规则
  47. //...
  48. // 写结整标签
  49. //...
  50. //汇集与iptables-restore加载数据
  51. //...
  52. // 关闭过旧的本地端口,更新portmap数据
  53. for k, v := range proxier.portsMap {
  54. if replacementPortsMap[k] == nil {
  55. v.Close()
  56. }
  57. }
  58. proxier.portsMap = replacementPortsMap
  59. // 更新healthz timestamp.
  60. if proxier.healthzServer != nil {
  61. proxier.healthzServer.UpdateTimestamp()
  62. }
  63. // 更新healthchecks.
  64. if err := proxier.healthChecker.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
  65. klog.Errorf("Error syncing healthcheck services: %v", err)
  66. }
  67. if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
  68. klog.Errorf("Error syncing healthcheck endpoints: %v", err)
  69. }
  70. // 完成清理工作
  71. for _, svcIP := range staleServices.UnsortedList() {
  72. if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
  73. klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
  74. }
  75. }
  76. proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
  77. }

下面将分解详述每块代码逻辑:

更新 service 和 endpoints ;返回更新结果

!FILENAME pkg/proxy/iptables/proxier.go:652

  1. //更新SVC/EP
  2. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
  3. endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
  4. staleServices := serviceUpdateResult.UDPStaleClusterIP
  5. // 从EndpointsMap更新结果返回中合并UDP协议废弃服务信息
  6. for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
  7. if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
  8. klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
  9. staleServices.Insert(svcInfo.ClusterIPString())
  10. }
  11. }

UpdateServiceMap() SVC 服务的更新实现

!FILENAME pkg/proxy/service.go:212

  1. func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
  2. result.UDPStaleClusterIP = sets.NewString() // 已废弃的UDP端口
  3. serviceMap.apply(changes, result.UDPStaleClusterIP) // 应用更新map->
  4. result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
  5. for svcPortName, info := range serviceMap {
  6. if info.GetHealthCheckNodePort() != 0 {
  7. result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) //健康检测的node Port
  8. }
  9. }
  10. return result
  11. }

serviceMap.apply() 应用更新变化事件的服务项(merge->filter->unmerge)

!FILENAME pkg/proxy/service.go:268

  1. func (serviceMap *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
  2. changes.lock.Lock()
  3. defer changes.lock.Unlock()
  4. for _, change := range changes.items {
  5. serviceMap.merge(change.current) //合并变更增加项到serviceMap ①
  6. change.previous.filter(change.current) //过滤掉已处理更新变化的项,跳过unmerge处理 ②
  7. serviceMap.unmerge(change.previous, UDPStaleClusterIP) //删除废弃的项 ③
  8. }
  9. // clear changes after applying them to ServiceMap.
  10. changes.items = make(map[types.NamespacedName]*serviceChange)
  11. return
  12. }

① serviceMap.merge() 合并增加”other“内容到当前的serviceMap内。”即将变化的服务列表进行合并”。

!FILENAME pkg/proxy/service.go:301

  1. func (sm *ServiceMap) merge(other ServiceMap) sets.String {
  2. existingPorts := sets.NewString()
  3. for svcPortName, info := range other {
  4. existingPorts.Insert(svcPortName.String())
  5. _, exists := (*sm)[svcPortName]
  6. //...
  7. (*sm)[svcPortName] = info
  8. }
  9. return existingPorts
  10. }

② serviceMap.unmerge() 从当前map移除”other”存在的内容项。”即删除废弃的项

!FILENAME pkg/proxy/service.go:330

  1. func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
  2. for svcPortName := range other {
  3. info, exists := (*sm)[svcPortName]
  4. if exists {
  5. if info.GetProtocol() == v1.ProtocolUDP {
  6. UDPStaleClusterIP.Insert(info.ClusterIPString()) //存储已废丢UDP服务的集群IP列表
  7. }
  8. delete(*sm, svcPortName)
  9. } //...
  10. }
  11. }

③ serviceMap.filter() 基于”other”给定的服务端口名(key值),过滤掉存在于serviceMap的项

!FILENAME pkg/proxy/service.go:319

  1. func (sm *ServiceMap) filter(other ServiceMap) {
  2. for svcPortName := range *sm {
  3. if _, ok := other[svcPortName]; ok {
  4. delete(*sm, svcPortName)
  5. }
  6. }
  7. }

UpdateEndpointsMap() 端点更新的实现

!FILENAME pkg/proxy/endpoints.go:163

  1. func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
  2. result.StaleEndpoints = make([]ServiceEndpoint, 0)
  3. result.StaleServiceNames = make([]ServicePortName, 0)
  4. endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
  5. // TODO: If this will appear to be computationally expensive, consider
  6. // computing this incrementally similarly to endpointsMap.
  7. result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
  8. localIPs := GetLocalEndpointIPs(endpointsMap)
  9. for nsn, ips := range localIPs {
  10. result.HCEndpointsLocalIPSize[nsn] = len(ips)
  11. }
  12. return result
  13. }

EndpointsMap.apply() 应用更新变化事件的端点项(merge->unmerge)

!FILENAME pkg/proxy/endpoints.go:242

  1. func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
  2. if changes == nil {
  3. return
  4. }
  5. changes.lock.Lock()
  6. defer changes.lock.Unlock()
  7. for _, change := range changes.items {
  8. endpointsMap.Unmerge(change.previous) // 删除 ①
  9. endpointsMap.Merge(change.current) // 更新 ②
  10. detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) //废弃查找 ③
  11. }
  12. changes.items = make(map[types.NamespacedName]*endpointsChange)
  13. }

① EndpointsMap.Merge() 将”other”内指定项的值(端点列表)更新至EndpointMap

!FILENAME pkg/proxy/endpoints.go:259

  1. func (em EndpointsMap) Merge(other EndpointsMap) {
  2. for svcPortName := range other {
  3. em[svcPortName] = other[svcPortName]
  4. }
  5. }

② EndpointsMap.Unmerge() 删除”other”内指定项

!FILENAME pkg/proxy/endpoints.go:266

  1. func (em EndpointsMap) Unmerge(other EndpointsMap) {
  2. for svcPortName := range other {
  3. delete(em, svcPortName)
  4. }
  5. }

③ EndpointsMap.detectStaleConnections() 查找废弃后端连接信息项

!FILENAME pkg/proxy/endpoints.go:291

  1. func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
  2. for svcPortName, epList := range oldEndpointsMap {
  3. for _, ep := range epList {
  4. stale := true
  5. for i := range newEndpointsMap[svcPortName] {
  6. if newEndpointsMap[svcPortName][i].Equal(ep) { //存在则stale为否
  7. stale = false
  8. break
  9. }
  10. }
  11. if stale {
  12. klog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.String())
  13. *staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) //存储废弃的endpoint列表
  14. }
  15. }
  16. }
  17. for svcPortName, epList := range newEndpointsMap {
  18. //对于UDP服务,如果后端变化从0至非0,可能存在conntrack项将服务的流量黑洞
  19. if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
  20. *staleServiceNames = append(*staleServiceNames, svcPortName)
  21. //存储废弃的服务名列表
  22. }
  23. }
  24. }

创建与联接 kube 链

  • filter表中INPUT链头部插入自定义链调转到KUBE-EXTERNAL-SERVICES链iptables -I “INPUT” -t “filter” -m “conntrack” —ctstate “NEW” -m comment —comment “kubernetes externally-visible service portals” -j “KUBE-EXTERNAL-SERVICES”

  • filter表中OUTPUT链头部插入自定义链调转到KUBE-SERVICE链iptables -I “OUTPUT” -t “filter” -m “conntrack” —ctstate “NEW” -m comment —comment “kubernetes service portals” -j “KUBE-SERVICES”

  • nat表中OUTPUT链头部插入自定义链调转到KUBE-SERVICES链iptables -I “OUTPUT” -t “nat” -m comment —comment “kubernetes service portals” -j “KUBE-SERVICES”

  • nat表中PREROUTING链头部插入自定义链调转到KUBE-SERVICES链iptables -I “PREROUTING” -t “nat” -m comment —comment “kubernetes service portals” -j “KUBE-SERVICES”

  • nat表中POSTROUTING链头部插入自定义链调转到KUBE-POSTROUTING链iptables -I “POSTROUTING” -t “nat” -m comment —comment “kubernetes postrouting rules” -j “KUBE-POSTROUTING”

  • filter表中FORWARD链头部插入自定义链调转到KUBE-FORWARD链iptables -I “FORWARD” -t “filter” -m comment —comment “kubernetes forwarding rules” -j “KUBE-FORWARD”

!FILENAME pkg/proxy/iptables/proxier.go:667

  1. //循环iptablesJumpChains定义
  2. for _, chain := range iptablesJumpChains {
  3. //底层命令iptables -t $tableName -N $chainName
  4. if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
  5. klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
  6. return
  7. }
  8. args := append(chain.extraArgs,
  9. "-m", "comment", "--comment", chain.comment,
  10. "-j", string(chain.chain),
  11. )
  12. //底层命令iptables -I $chainName -t $tableName -m comment --comment $comment -j $chain
  13. if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
  14. klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
  15. return
  16. }
  17. }

创建 Iptables 基础数据

  • 获取现存在的Filter/Nat表链数据
  • 创建iptables-save/restore格式数据(表头、链)
  • 创建SNAT地址伪装规则

!FILENAME pkg/proxy/iptables/proxier.go:688

  1. //现存在的filter表链获取
  2. existingFilterChains := make(map[utiliptables.Chain][]byte)
  3. proxier.existingFilterChainsData.Reset()
  4. err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData) //通过iptables-save方式来获取
  5. if err != nil {
  6. klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
  7. } else {
  8. existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes()) //输出结果
  9. }
  10. //同上,现存在的nat表链获取
  11. existingNATChains := make(map[utiliptables.Chain][]byte)
  12. proxier.iptablesData.Reset()
  13. err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
  14. if err != nil {
  15. klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
  16. } else {
  17. existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
  18. }
  19. // Reset all buffers used later.
  20. // This is to avoid memory reallocations and thus improve performance.
  21. proxier.filterChains.Reset()
  22. proxier.filterRules.Reset()
  23. proxier.natChains.Reset()
  24. proxier.natRules.Reset()
  25. // 写表头
  26. writeLine(proxier.filterChains, "*filter")
  27. writeLine(proxier.natChains, "*nat")

写链数据

fileter: “KUBE-SERVICES” / “KUBE-EXTERNAL-SERVICES”/ “KUBE-FORWARD” nat: “KUBE-SERVICES” / “KUBE-NODEPORTS” / “KUBE-POSTROUTING” / “KUBE-MARK-MASQ”

!FILENAME pkg/proxy/iptables/proxier.go:720

  1. // 写chain链数据,将filter和Nat相关链格式化存放buffer
  2. for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
  3. if chain, ok := existingFilterChains[chainName]; ok {
  4. writeBytesLine(proxier.filterChains, chain)
  5. } else {
  6. // iptables-save/restore格式的链行":$chainName - [0:0]"
  7. writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
  8. }
  9. }
  10. for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
  11. if chain, ok := existingNATChains[chainName]; ok {
  12. writeBytesLine(proxier.natChains, chain)
  13. } else {
  14. writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
  15. }
  16. }

写地址伪装规则,在POSTROUTING阶段对地址进行MASQUERADE(基于接口动态IP的SNAT)处理,原始请求源IP将被丢失,被请求POD的应用看到为NodeIP或CNI设备IP(bridge/vxlan设备)

!FILENAME pkg/proxy/iptables/proxier.go:738

  1. // 写kubernets特有的SNAT地址伪装规则
  2. // -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
  3. writeLine(proxier.natRules, []string{
  4. "-A", string(kubePostroutingChain),
  5. "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  6. "-m", "mark", "--mark", proxier.masqueradeMark,
  7. "-j", "MASQUERADE",
  8. }...)
  9. //-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
  10. writeLine(proxier.natRules, []string{
  11. "-A", string(KubeMarkMasqChain),
  12. "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
  13. }...)

为每个 service 创建 rules

先了解serviceInfo的完整定义说明

!FILENAME pkg/proxy/iptables/proxier.go:141

  1. type serviceInfo struct {
  2. *proxy.BaseServiceInfo
  3. // The following fields are computed and stored for performance reasons.
  4. serviceNameString string
  5. servicePortChainName utiliptables.Chain // KUBE-SVC-XXXX16BitXXXX 服务链
  6. serviceFirewallChainName utiliptables.Chain // KUBE-FW-XXXX16BitXXXX Firewall链
  7. serviceLBChainName utiliptables.Chain // KUBE-XLB-XXXX16BitXXXX SLB链
  8. }
  9. type BaseServiceInfo struct {
  10. ClusterIP net.IP //PortalIP(VIP)
  11. Port int //portal端口
  12. Protocol v1.Protocol //协议
  13. NodePort int //node节点端口
  14. LoadBalancerStatus v1.LoadBalancerStatus //LB Ingress
  15. SessionAffinityType v1.ServiceAffinity //会话保持
  16. StickyMaxAgeSeconds int //保持最大时长
  17. ExternalIPs []string //ExternalIPs(指定的node上监听端口)
  18. LoadBalancerSourceRanges []string //过滤源地址流量
  19. HealthCheckNodePort int //HealthCheck检测端口
  20. OnlyNodeLocalEndpoints bool
  21. }

为每个服务创建服务”KUBE-SVC-XXX…”和外部负载均衡”KUBE-XLB-XXX…”链

!FILENAME pkg/proxy/iptables/proxier.go:791

  1. svcChain := svcInfo.servicePortChainName //"KUBE-SVC-XXX..."
  2. if hasEndpoints {
  3. // Create the per-service chain, retaining counters if possible.
  4. if chain, ok := existingNATChains[svcChain]; ok {
  5. writeBytesLine(proxier.natChains, chain)
  6. } else {
  7. writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
  8. }
  9. activeNATChains[svcChain] = true
  10. }
  11. svcXlbChain := svcInfo.serviceLBChainName // "KUBE-XLB-XXX…"
  12. if svcInfo.OnlyNodeLocalEndpoints {
  13. // Only for services request OnlyLocal traffic
  14. // create the per-service LB chain, retaining counters if possible.
  15. if lbChain, ok := existingNATChains[svcXlbChain]; ok {
  16. writeBytesLine(proxier.natChains, lbChain)
  17. } else {
  18. writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
  19. }
  20. activeNATChains[svcXlbChain] = true
  21. }

clusterIP流量的匹配,clusterIP为默认方式,仅资源集群内可访问。

!FILENAME pkg/proxy/iptables/proxier.go:815

  1. //存在端点,写规则
  2. if hasEndpoints {
  3. args = append(args[:0],
  4. "-A", string(kubeServicesChain),
  5. "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
  6. "-m", protocol, "-p", protocol,
  7. "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
  8. "--dport", strconv.Itoa(svcInfo.Port),
  9. )
  10. // proxier配置masqueradeAll
  11. // -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
  12. // --dport $port -j KUBE-MARK-MASQ
  13. if proxier.masqueradeAll {
  14. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  15. } else if len(proxier.clusterCIDR) > 0 {
  16. // proxier配置clusterCIDR情况:
  17. // -A KUBE-SERVICES ! -s $clusterCIDR -m comment --comment "..." -m $prot \
  18. // -p $prot -d $clusterIP --dport $port -j KUBE-MARK-MASQ
  19. writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
  20. }
  21. // -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d $clusterIP \
  22. // --dport $port -j KUBE-SVC-XXXX16bitXXXX
  23. writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
  24. } else {
  25. // 无Endpoints的情况,则创建REJECT规则
  26. // -A KUBE-SERVICES -m comment --comment $svcName -m $prot -p $prot -d $clusterIP \
  27. // --dport $port -j REJECT
  28. writeLine(proxier.filterRules,
  29. "-A", string(kubeServicesChain),
  30. "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
  31. "-m", protocol, "-p", protocol,
  32. "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
  33. "--dport", strconv.Itoa(svcInfo.Port),
  34. "-j", "REJECT",
  35. )
  36. }

服务是否启用ExternalIPs(指定的node上开启监听端口)

!FILENAME pkg/proxy/iptables/proxier.go:846

  1. for _, externalIP := range svcInfo.ExternalIPs {
  2. // 判断externalIP是否为本node的IP以及协议为SCTP,且端口是否已开启
  3. // 如果未开启则在本地打开监听端口
  4. if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
  5. klog.Errorf("can't determine if IP is local, assuming not: %v", err)
  6. } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
  7. lp := utilproxy.LocalPort{
  8. Description: "externalIP for " + svcNameString,
  9. IP: externalIP,
  10. Port: svcInfo.Port,
  11. Protocol: protocol,
  12. }
  13. if proxier.portsMap[lp] != nil {
  14. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  15. replacementPortsMap[lp] = proxier.portsMap[lp]
  16. } else {
  17. //打开与监听本地端口
  18. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  19. if err != nil {
  20. msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
  21. proxier.recorder.Eventf(
  22. &v1.ObjectReference{
  23. Kind: "Node",
  24. Name: proxier.hostname,
  25. UID: types.UID(proxier.hostname),
  26. Namespace: "",
  27. }, v1.EventTypeWarning, err.Error(), msg)
  28. klog.Error(msg)
  29. continue
  30. }
  31. replacementPortsMap[lp] = socket
  32. }
  33. }
  34. //存在端点,写规则
  35. if hasEndpoints {
  36. args = append(args[:0],
  37. "-A", string(kubeServicesChain),
  38. "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
  39. "-m", protocol, "-p", protocol,
  40. "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
  41. "--dport", strconv.Itoa(svcInfo.Port),
  42. )
  43. // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
  44. // $externalIP --dport $port -j KUBE-MARK-MASQ
  45. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  46. // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
  47. // $externalIP --dport $port -m physdev ! --physdev-is-in \
  48. // -m addrtype ! --src-type Local -j KUBE-SVC-XXXX16bitXXXXX
  49. externalTrafficOnlyArgs := append(args,
  50. "-m", "physdev", "!", "--physdev-is-in",
  51. "-m", "addrtype", "!", "--src-type", "LOCAL")
  52. writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
  53. dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  54. // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
  55. // $externalIP --dport $port -m addrtype --dst-type Local
  56. // -j KUBE-SVC-XXXX16bitXXXXX
  57. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
  58. } else {
  59. // 不存在端点信息则reject
  60. // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
  61. // $externalIP --dport $port -j REJECT
  62. writeLine(proxier.filterRules,
  63. "-A", string(kubeExternalServicesChain),
  64. "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
  65. "-m", protocol, "-p", protocol,
  66. "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
  67. "--dport", strconv.Itoa(svcInfo.Port),
  68. "-j", "REJECT",
  69. )
  70. }
  71. }

服务是否启用了外部负载均衡服务load-balancer ingress

!FILENAME pkg/proxy/iptables/proxier.go:917

  1. //存在端点,写规则
  2. if hasEndpoints {
  3. fwChain := svcInfo.serviceFirewallChainName //"KUBE-FW-XXXX16bitXXXXX"
  4. for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
  5. if ingress.IP != "" {
  6. // 创建服务KUBE-FW-X链
  7. if chain, ok := existingNATChains[fwChain]; ok {
  8. writeBytesLine(proxier.natChains, chain)
  9. } else { //原来不存在则新建
  10. writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
  11. }
  12. activeNATChains[fwChain] = true
  13. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
  14. // This currently works for loadbalancers that preserves source ips.
  15. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
  16. args = append(args[:0],
  17. "-A", string(kubeServicesChain),
  18. "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
  19. "-m", protocol, "-p", protocol,
  20. "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
  21. "--dport", strconv.Itoa(svcInfo.Port),
  22. )
  23. // -A KUBE-SERVICES -m comment --comment "..." -m $prot -p $prot -d \
  24. // $ingresIP --dport $port -j KUBE-FW-XXXX16bitXXXXX
  25. writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
  26. args = append(args[:0],
  27. "-A", string(fwChain),
  28. "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
  29. )
  30. // 在KUBE-FW链,每个源匹配规则可能跳转至一个SVC或XLB链
  31. chosenChain := svcXlbChain
  32. // If we are proxying globally, we need to masquerade in case we cross nodes.
  33. // If we are proxying only locally, we can retain the source IP.
  34. if !svcInfo.OnlyNodeLocalEndpoints {
  35. // -j "KUBE-MARK-MASQ" 地址伪装实现跨主机访问
  36. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  37. chosenChain = svcChain // 选择为SVC链
  38. }
  39. if len(svcInfo.LoadBalancerSourceRanges) == 0 {
  40. // 允许所有源,直接跳转
  41. writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
  42. } else {
  43. // 基于source range配置过滤 "-s $srcRanges"
  44. allowFromNode := false
  45. for _, src := range svcInfo.LoadBalancerSourceRanges {
  46. writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
  47. _, cidr, _ := net.ParseCIDR(src)
  48. if cidr.Contains(proxier.nodeIP) {
  49. allowFromNode = true //配置CIDR包含节点IP,则允许来自节点请求
  50. }
  51. }
  52. // 添加 "-s $ingresIP" 来允许LB后端主机请求
  53. if allowFromNode {
  54. writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
  55. }
  56. }
  57. // 条件ingress.IP为空"-j KUBE-MARK-DROP"
  58. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
  59. }
  60. }
  61. }

服务是否启用了nodeport(在每个节点上都将开启一个nodeport端口)

!FILENAME pkg/proxy/iptables/proxier.go:989

  1. if svcInfo.NodePort != 0 {
  2. // 获取node addresses
  3. addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
  4. if err != nil {
  5. klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
  6. continue
  7. }
  8. lps := make([]utilproxy.LocalPort, 0)
  9. for address := range addresses {
  10. lp := utilproxy.LocalPort{
  11. Description: "nodePort for " + svcNameString,
  12. IP: address,
  13. Port: svcInfo.NodePort,
  14. Protocol: protocol,
  15. }
  16. if utilproxy.IsZeroCIDR(address) {
  17. // Empty IP address means all
  18. lp.IP = ""
  19. lps = append(lps, lp)
  20. // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
  21. break
  22. }
  23. lps = append(lps, lp) //IP列表
  24. }
  25. // 为node节点的ips打开端口并保存持有socket句柄
  26. for _, lp := range lps {
  27. if proxier.portsMap[lp] != nil {
  28. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  29. replacementPortsMap[lp] = proxier.portsMap[lp]
  30. } else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
  31. // 打开和监听端口
  32. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  33. if err != nil {
  34. klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
  35. continue
  36. }
  37. if lp.Protocol == "udp" {
  38. //清理udp conntrack记录
  39. err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
  40. if err != nil {
  41. klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
  42. }
  43. }
  44. replacementPortsMap[lp] = socket //socket保存
  45. }
  46. }
  47. //存在端点,写规则
  48. if hasEndpoints {
  49. // -A KUBE-NODEPORTS -m comment --comment "..." -m $prot -p $prot --dport $nodePort
  50. args = append(args[:0],
  51. "-A", string(kubeNodePortsChain),
  52. "-m", "comment", "--comment", svcNameString,
  53. "-m", protocol, "-p", protocol,
  54. "--dport", strconv.Itoa(svcInfo.NodePort),
  55. )
  56. if !svcInfo.OnlyNodeLocalEndpoints {
  57. //非本地nodeports则需SNAT规则添加,
  58. // -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
  59. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  60. // Jump to the service chain.
  61. writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
  62. } else {
  63. loopback := "127.0.0.0/8"
  64. if isIPv6 {
  65. loopback = "::1/128"
  66. }
  67. // 本地nodeports则规则添加,
  68. // -s $loopback -j KUBE-MARK-MASQ -j KUBE-XLB-XXXX16bitXXXX
  69. writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
  70. writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
  71. }
  72. } else {
  73. // 无hasEndpoints,添加-j reject规则
  74. // -A KUBE-EXTERNAL-SERVICES -m comment --comment "..." -m addrtype \
  75. // --dst-type LOCAL -m $prot -p $prot --dport $nodePort -j REJECT
  76. writeLine(proxier.filterRules,
  77. "-A", string(kubeExternalServicesChain),
  78. "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
  79. "-m", "addrtype", "--dst-type", "LOCAL",
  80. "-m", protocol, "-p", protocol,
  81. "--dport", strconv.Itoa(svcInfo.NodePort),
  82. "-j", "REJECT",
  83. )
  84. }
  85. }

基于服务名和协议,生成每个端点链

!FILENAME pkg/proxy/iptables/proxier.go:1087

  1. for _, ep := range proxier.endpointsMap[svcName] {
  2. epInfo, ok := ep.(*endpointsInfo)
  3. if !ok {
  4. klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
  5. continue
  6. }
  7. endpoints = append(endpoints, epInfo)
  8. //基于服务名和协议生成端点链名称 "KUBE-SEP-XXXX16bitXXXX"
  9. endpointChain = epInfo.endpointChain(svcNameString, protocol)
  10. endpointChains = append(endpointChains, endpointChain)
  11. // 创建端点链
  12. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
  13. writeBytesLine(proxier.natChains, chain)
  14. } else {
  15. writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
  16. }
  17. activeNATChains[endpointChain] = true
  18. }

写SessionAffinity会话保持规则,实现在一段时间内保持session affinity,保持时间为180秒,通过添加“-m recent –rcheck –seconds 180 –reap”的iptables规则实现了会话保持。

!FILENAME pkg/proxy/iptables/proxier.go:1107

  1. //SessionAffinityType设置为"ClientIP",则写session保持规则
  2. // -A KUBE-SVC-XXXX16bitXXXX -m recent -m comment –comment "..." \
  3. // --name KUBE-SEP-XXXX16bitXXXX --rcheck --seconds 180 --reap \
  4. // -j KUBE-SEP-XXXX16bitXXXX
  5. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  6. for _, endpointChain := range endpointChains {
  7. args = append(args[:0],
  8. "-A", string(svcChain),
  9. )
  10. proxier.appendServiceCommentLocked(args, svcNameString)
  11. args = append(args,
  12. "-m", "recent", "--name", string(endpointChain),
  13. "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
  14. "-j", string(endpointChain),
  15. )
  16. writeLine(proxier.natRules, args...)
  17. }
  18. }

写负载均衡和DNAT规则,使用“-m statistic –-mode random -–probability ” iptables规则将后端POD组成一个基于概率访问的组合,实现服务访问的负载均衡功能效果。

  • 针对服务的每个端点在nat表内该service对应的自定义链“KUBE-SVC-XXXX16bitXXXX”中加入iptables规则。如果该服务对应的endpoints大于等于2,则添加负载均衡规则。
  • 针对选择非本地Node上的POD,需进行DNAT,将请求的目标地址设置成后选的POD的IP后进行路由。KUBE-MARK-MASQ将重设(伪装)源地址

    -A KUBE-SVC-XXXX16bitXXXX -m comment –comment “…” -m statistic —mode random —probability $prob -j KUBE-SEP-XXXX16bitXXXX

-A KUBE-SEP-XXXX16bitXXXX -m comment –comment “…” -s $epIp -j “KUBE-MARK-MASQ”

-A KUBE-SVC-XXXX16bitXXXX -m comment –comment “…” -m prot -p $prot -j DNAT —to-destination X.X.X.X:xxx

!FILENAME pkg/proxy/iptables/proxier.go:1123

  1. // 写负载均衡和DNAT规则
  2. n := len(endpointChains)
  3. for i, endpointChain := range endpointChains {
  4. epIP := endpoints[i].IP()
  5. if epIP == "" {
  6. // Error parsing this endpoint has been logged. Skip to next endpoint.
  7. continue
  8. }
  9. // 每个服务生成的负载均衡规则,后端POD组成一个基于概率访问的组合
  10. // // -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..."
  11. // -m statistic --mode random --probability $prob
  12. // -j KUBE-SEP-XXXX16bitXXXX
  13. args = append(args[:0], "-A", string(svcChain))
  14. proxier.appendServiceCommentLocked(args, svcNameString)
  15. if i < (n - 1) { // 当端点大于或等于2
  16. args = append(args,
  17. "-m", "statistic",
  18. "--mode", "random",
  19. "--probability", proxier.probability(n-i))
  20. }
  21. // The final (or only if n == 1) rule is a guaranteed match.
  22. args = append(args, "-j", string(endpointChain))
  23. writeLine(proxier.natRules, args...)
  24. // 每个端点链规则
  25. // -A KUBE-SEP-XXXX16bitXXXX -m comment –comment "..." -s $epIp -j "KUBE-MARK-MASQ"
  26. args = append(args[:0], "-A", string(endpointChain))
  27. proxier.appendServiceCommentLocked(args, svcNameString)
  28. // Handle traffic that loops back to the originator with SNAT.
  29. writeLine(proxier.natRules, append(args,
  30. "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
  31. "-j", string(KubeMarkMasqChain))...)
  32. // 如配置session保持"ClientIP"
  33. // -m recent --name KUBE-SEP-XXXX16bitXXXX --set
  34. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  35. args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
  36. }
  37. // DNAT至最终的端点服务上
  38. // -A KUBE-SVC-XXXX16bitXXXX -m comment –comment "..."
  39. // -m $prot -p $prot -j DNAT --to-destination X.X.X.X:xxx
  40. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
  41. writeLine(proxier.natRules, args...)
  42. }
  43. // 服务请求仅本地流量
  44. localEndpoints := make([]*endpointsInfo, 0)
  45. localEndpointChains := make([]utiliptables.Chain, 0)
  46. for i := range endpointChains {
  47. if endpoints[i].IsLocal {
  48. // These slices parallel each other; must be kept in sync
  49. localEndpoints = append(localEndpoints, endpoints[i])
  50. localEndpointChains = append(localEndpointChains, endpointChains[i])
  51. }
  52. }

启用clusterCIDR (Kube-proxy中的--cluster-dir指定的是集群中pod使用的网段,而pod使用的网段和apiserver中指定的service的cluster ip或vip网段不是同一个网段)

!FILENAME pkg/proxy/iptables/proxier.go:1179

  1. // pod -> external VIP流量导向服务VIP(服务链)
  2. // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -s $clusterCIDR
  3. // -j KUBE-SVC-XXXX16bitXXXXX
  4. if len(proxier.clusterCIDR) > 0 {
  5. args = append(args[:0],
  6. "-A", string(svcXlbChain),
  7. "-m", "comment", "--comment",
  8. `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
  9. "-s", proxier.clusterCIDR,
  10. "-j", string(svcChain),
  11. )
  12. writeLine(proxier.natRules, args...)
  13. }

生成本地端点链规则,本地源IP保持(当只在本地选择POD服务请求时,则不存在SNAT规则,可保持源地址IP信息。在nodePort或XLB时,可定义"externalTrafficPolicy": "Local"控制向属于这个service的本地的POD转发请求,如果本地没有POD能服务这个请求,请求将被DROP掉,客户端会发现请求超时没有响应。

!FILENAME pkg/proxy/iptables/proxier.go:1190

  1. numLocalEndpoints := len(localEndpointChains)
  2. if numLocalEndpoints == 0 {
  3. // 无本地端点,将流量Drop(流量黑洞处理)
  4. // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -j KUBE-MARK-DROP
  5. args = append(args[:0],
  6. "-A", string(svcXlbChain),
  7. "-m", "comment", "--comment",
  8. fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
  9. "-j",
  10. string(KubeMarkDropChain),
  11. )
  12. writeLine(proxier.natRules, args...)
  13. } else {
  14. // 本地端点会话保持开启
  15. // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \
  16. // --name KUBE-SEP-XXXX16bitXXXX \
  17. // --rcheck --seconds $StickyMaxAge --reap -j KUBE-SEP-XXXX16bitXXXX
  18. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  19. for _, endpointChain := range localEndpointChains {
  20. writeLine(proxier.natRules,
  21. "-A", string(svcXlbChain),
  22. "-m", "comment", "--comment", svcNameString,
  23. "-m", "recent", "--name", string(endpointChain),
  24. "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap",
  25. "-j", string(endpointChain))
  26. }
  27. }
  28. // 本地端点负载均衡处理"-m statistic --mode random --probability"
  29. // 后端POD组成一个基于概率访问的组合
  30. for i, endpointChain := range localEndpointChains {
  31. // Balancing rules in the per-service chain.
  32. args = append(args[:0],
  33. "-A", string(svcXlbChain),
  34. "-m", "comment", "--comment",
  35. fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
  36. )
  37. if i < (numLocalEndpoints - 1) {
  38. // Each rule is a probabilistic match.
  39. args = append(args,
  40. "-m", "statistic",
  41. "--mode", "random",
  42. "--probability", proxier.probability(numLocalEndpoints-i))
  43. }
  44. args = append(args, "-j", string(endpointChain))
  45. // -A KUBE-XLB-XXXX16bitXXXX -m comment --comment "..." -m recent \
  46. // --name KUBE-SEP-XXXX16bitXXXX -m statistic --mode random \
  47. // --probability 0.50000000000 -j KUBE-SEP-XXXX16bitXXXX
  48. writeLine(proxier.natRules, args...)
  49. }
  50. }

配置收尾规则数据

删除不再使用的服务自定义kube链”KUBE-SVC-*“/“KUBE-SEP-*“/“KUBE-FW-*“/“KUBE-XLB-*“。

!FILENAME pkg/proxy/iptables/proxier.go:1237

  1. for chain := range existingNATChains {
  2. if !activeNATChains[chain] {
  3. chainString := string(chain)
  4. if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
  5. // Ignore chains that aren't ours.
  6. continue
  7. }
  8. // We must (as per iptables) write a chain-line for it, which has
  9. // the nice effect of flushing the chain. Then we can remove the
  10. // chain.
  11. writeBytesLine(proxier.natChains, existingNATChains[chain])
  12. writeLine(proxier.natRules, "-X", chainString)
  13. }
  14. }

添加服务的nodeports规则(nat表-“KUBE-SERVICES”链)

!FILENAME pkg/proxy/iptables/proxier.go:1254

  1. // -A KUBE-SERVICES -m comment --comment "..." -m addrtype --dst-type LOCAL \
  2. // -j KUBE-NODEPORTS
  3. //
  4. // -A KUBE-SERVICES -m comment --comment "..." -d $NODEIP -j KUBE-NODEPORTS
  5. //
  6. addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
  7. if err != nil {
  8. klog.Errorf("Failed to get node ip address matching nodeport cidr")
  9. } else {
  10. isIPv6 := proxier.iptables.IsIpv6()
  11. for address := range addresses {
  12. // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
  13. if utilproxy.IsZeroCIDR(address) {
  14. args = append(args[:0],
  15. "-A", string(kubeServicesChain),
  16. "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
  17. "-m", "addrtype", "--dst-type", "LOCAL",
  18. "-j", string(kubeNodePortsChain))
  19. writeLine(proxier.natRules, args...)
  20. // Nothing else matters after the zero CIDR.
  21. break
  22. }
  23. // Ignore IP addresses with incorrect version
  24. if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
  25. klog.Errorf("IP address %s has incorrect IP version", address)
  26. continue
  27. }
  28. // create nodeport rules for each IP one by one
  29. args = append(args[:0],
  30. "-A", string(kubeServicesChain),
  31. "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
  32. "-d", address,
  33. "-j", string(kubeNodePortsChain))
  34. writeLine(proxier.natRules, args...)
  35. }
  36. }

添加forward规则(filter表-“KUBE-FORWARD”链)

!FILENAME pkg/proxy/iptables/proxier.go:1289

  1. //-A KUBE-FORWARD -m comment --comment "..." -m mark --mark 0xFFFF/0xFFFF -j ACCEPT
  2. writeLine(proxier.filterRules,
  3. "-A", string(kubeForwardChain),
  4. "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  5. "-m", "mark", "--mark", proxier.masqueradeMark,
  6. "-j", "ACCEPT",
  7. )

添加带clusterCIDR配置(源/目标)规则(filter表-“KUBE-FORWARD”链)

!FILENAME pkg/proxy/iptables/proxier.go:1297

  1. //Kube-proxy中的cluster-dir指定的是集群中pod使用的网段
  2. //pod使用的网段和service的cluster ip网段不是同一个网段
  3. //
  4. // -A KUBE-FORWARD -s $clusterCIDR -m comment --comment "..." -m conntrack --ctstate \
  5. // RELATED,ESTABLISHED -j ACCEPT
  6. // -A KUBE-FORWARD -m comment --comment "..." -d $clusterCIDR -m conntrack --ctstate \
  7. // RELATED,ESTABLISHED -j ACCEPT
  8. //
  9. if len(proxier.clusterCIDR) != 0 {
  10. writeLine(proxier.filterRules,
  11. "-A", string(kubeForwardChain),
  12. "-s", proxier.clusterCIDR, //指定源
  13. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
  14. "-m", "conntrack",
  15. "--ctstate", "RELATED,ESTABLISHED",
  16. "-j", "ACCEPT",
  17. )
  18. writeLine(proxier.filterRules,
  19. "-A", string(kubeForwardChain),
  20. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
  21. "-d", proxier.clusterCIDR, //指定目标
  22. "-m", "conntrack",
  23. "--ctstate", "RELATED,ESTABLISHED",
  24. "-j", "ACCEPT",
  25. )
  26. }

结尾标志写入

  1. writeLine(proxier.filterRules, "COMMIT")
  2. writeLine(proxier.natRules, "COMMIT")

汇集与加载 iptables 配置规则数据

!FILENAME pkg/proxy/iptables/proxier.go:1326

  1. //汇集前面所处理的filter和nat表数据至iptablesData
  2. proxier.iptablesData.Reset()
  3. proxier.iptablesData.Write(proxier.filterChains.Bytes())
  4. proxier.iptablesData.Write(proxier.filterRules.Bytes())
  5. proxier.iptablesData.Write(proxier.natChains.Bytes())
  6. proxier.iptablesData.Write(proxier.natRules.Bytes())
  7. klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
  8. // iptables-restore加载新配置(iptablesData)
  9. err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  10. if err != nil {
  11. klog.Errorf("Failed to execute iptables-restore: %v", err)
  12. // Revert new local ports.
  13. klog.V(2).Infof("Closing local ports after iptables-restore failure")
  14. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
  15. return
  16. }

IPtables 底层的 runner 实现

前面基本已看完整个proxy的执行流程,最后iptables proxier是如何使用系统层iptables命令进行底层的iptables规则CRUD操作(通俗的理解:iptables proxier实现都是在操作iptables命令生成相应的规则),下面我来看一下kuber-proxy组件底层iptables操作器的封装。

iptables 执行器

Interface**接口为运行iptables命令定义

!FILENAME pkg/util/iptables/iptables.go:45

  1. //接口与接口方法定义
  2. type Interface interface {
  3. GetVersion() (string, error)
  4. EnsureChain(table Table, chain Chain) (bool, error)
  5. FlushChain(table Table, chain Chain) error
  6. DeleteChain(table Table, chain Chain) error
  7. EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
  8. DeleteRule(table Table, chain Chain, args ...string) error
  9. IsIpv6() bool
  10. SaveInto(table Table, buffer *bytes.Buffer) error
  11. Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
  12. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
  13. AddReloadFunc(reloadFunc func())
  14. Destroy()
  15. }

iptables Interface接实现类runner,完成对iptables命令执行器的定义

!FILENAME pkg/util/iptables/iptables.go:135

  1. // 类结构定义
  2. type runner struct {
  3. mu sync.Mutex //同步锁
  4. exec utilexec.Interface //osExec命令执行
  5. dbus utildbus.Interface //D-Bus操作API接口
  6. protocol Protocol //协议IPv4/IPv6
  7. hasCheck bool //"-C"检测命令flag
  8. hasListener bool //D-Bus信号监听是否开启(FirewallD start/restart)
  9. waitFlag []string //iptables命令"wait"flag,等待xtables锁
  10. restoreWaitFlag []string //iptables-restore命令"wait"flag
  11. lockfilePath string //xtables锁文件位置
  12. reloadFuncs []func() //定义reload处理func
  13. signal chan *godbus.Signal //dbus信号
  14. }
  15. // Runner实现Iterface方法列表,后面将详细分析关键的方法实现代码逻辑
  16. func (runner *runner) GetVersion() (string, error)
  17. func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error)
  18. func (runner *runner) FlushChain(table Table, chain Chain) error
  19. func (runner *runner) DeleteChain(table Table, chain Chain) error
  20. func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
  21. func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error
  22. func (runner *runner) IsIpv6() bool
  23. func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error
  24. func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
  25. func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
  26. func (runner *runner) AddReloadFunc(reloadFunc func())
  27. func (runner *runner) Destroy()
  28. // Runner内部方法列表
  29. func (runner *runner) connectToFirewallD()
  30. func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
  31. func (runner *runner) run(op operation, args []string) ([]byte, error)
  32. func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error)
  33. func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error)
  34. func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error)
  35. func (runner *runner) checkRuleUsingCheck(args []string) (bool, error)
  36. func (runner *runner) dbusSignalHandler(bus utildbus.Connection)
  37. func (runner *runner) reload()

iptables runner对象的构造New() -> newInternal(),返回runner{…}实例化对象(Interface接口类型) ,完成了创建一个iptables的命令执行器生成工作。

!FILENAME pkg/util/iptables/iptables.go:152

  1. func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface {
  2. vstring, err := getIPTablesVersionString(exec, protocol) //iptables版本获取
  3. if err != nil {
  4. klog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
  5. vstring = MinCheckVersion
  6. }
  7. if lockfilePath == "" {
  8. lockfilePath = LockfilePath16x //默认锁文件位置"/run/xtables.lock"
  9. }
  10. runner := &runner{
  11. exec: exec, //utilexec = osExec封装
  12. dbus: dbus, //utildbus
  13. protocol: protocol, //IPv4 or IPv6
  14. hasCheck: getIPTablesHasCheckCommand(vstring), //"-C" flag是否指定
  15. hasListener: false,
  16. waitFlag: getIPTablesWaitFlag(vstring), //iptables -wait
  17. restoreWaitFlag: getIPTablesRestoreWaitFlag(exec, protocol), //iptables-restore -wait
  18. lockfilePath: lockfilePath, //xtables锁文件位置
  19. }
  20. return runner
  21. }
  22. // 返回iptables exec命令执行器对象runner
  23. func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
  24. return newInternal(exec, dbus, protocol, "")
  25. }

iptables 执行器方法

runner.run() 这个是方法是runner最基础和公共调用的内部方法,也就是iptables命令执行os exec调用代码。run()有两个传参:1. 指定iptables操作command,2.参数列表。通过传参将组成一个完整的iptables命令进行exec调用执行。runContext()此方法内含有带context上下文和不带context两种执行方式。

!FILENAME pkg/util/iptables/iptables.go:218

  1. func (runner *runner) run(op operation, args []string) ([]byte, error) {
  2. return runner.runContext(nil, op, args)
  3. }
  4. func (runner *runner) runContext(ctx context.Context, op operation, args []string) ([]byte, error) {
  5. iptablesCmd := iptablesCommand(runner.protocol) // "iptabels or ip6tables"
  6. fullArgs := append(runner.waitFlag, string(op))
  7. fullArgs = append(fullArgs, args...)
  8. klog.V(5).Infof("running iptables %s %v", string(op), args)
  9. // 根据是否传有Context上下文,调用不同的执行command/commandContext
  10. if ctx == nil {
  11. return runner.exec.Command(iptablesCmd, fullArgs...).CombinedOutput()
  12. }
  13. return runner.exec.CommandContext(ctx, iptablesCmd, fullArgs...).CombinedOutput()
  14. }
  15. //支持的iptables操作commands
  16. type operation string
  17. //runner.exec实现是osexec命令的执行
  18. func New() Interface {
  19. return &executor{}
  20. }
  21. // Command is part of the Interface interface.
  22. func (executor *executor) Command(cmd string, args ...string) Cmd {
  23. return (*cmdWrapper)(osexec.Command(cmd, args...))
  24. }
  25. // CommandContext is part of the Interface interface.
  26. func (executor *executor) CommandContext(ctx context.Context, cmd string, args ...string) Cmd {
  27. return (*cmdWrapper)(osexec.CommandContext(ctx, cmd, args...))
  28. }

runner.GetVersion() 获取系统安装的iptables版本信息,格式为 “X.Y.Z”

!FILENAME pkg/util/iptables/iptables.go:218

  1. func (runner *runner) GetVersion() (string, error) {
  2. return getIPTablesVersionString(runner.exec, runner.protocol)
  3. }
  4. func getIPTablesVersionString(exec utilexec.Interface, protocol Protocol) (string, error) {
  5. // 执行命令"iptables or ip6tables --version"
  6. iptablesCmd := iptablesCommand(protocol)
  7. bytes, err := exec.Command(iptablesCmd, "--version").CombinedOutput()
  8. if err != nil {
  9. return "", err
  10. }
  11. // 正则匹配,查找版本字符串,格式为 "X.Y.Z"
  12. versionMatcher := regexp.MustCompile("v([0-9]+(\\.[0-9]+)+)")
  13. match := versionMatcher.FindStringSubmatch(string(bytes))
  14. if match == nil {
  15. return "", fmt.Errorf("no iptables version found in string: %s", bytes)
  16. }
  17. return match[1], nil
  18. }

runner.EnsureChain() “-N” 检测指定的规则链是否存在,如果不存则创建此链,存在则返回true

!FILENAME pkg/util/iptables/iptables.go:223

  1. func (runner *runner) EnsureChain(table Table, chain Chain) (bool, error) {
  2. fullArgs := makeFullArgs(table, chain)
  3. runner.mu.Lock()
  4. defer runner.mu.Unlock()
  5. //执行"iptables -t $tableName -N $chainName"
  6. out, err := runner.run(opCreateChain, fullArgs)
  7. if err != nil {
  8. if ee, ok := err.(utilexec.ExitError); ok {
  9. if ee.Exited() && ee.ExitStatus() == 1 {
  10. return true, nil
  11. }
  12. }
  13. return false, fmt.Errorf("error creating chain %q: %v: %s", chain, err, out)
  14. }
  15. return false, nil
  16. }

runner.FlushChain() “-F” 清空指定链

!FILENAME pkg/util/iptables/iptables.go:242

  1. func (runner *runner) FlushChain(table Table, chain Chain) error {
  2. fullArgs := makeFullArgs(table, chain)
  3. //...
  4. //执行"iptables -t $tableName -F $chainName"
  5. out, err := runner.run(opFlushChain, fullArgs)
  6. //...
  7. }

runner.DeleteChain() “-X” 删除指定的链

!FILENAME pkg/util/iptables/iptables.go:256

  1. func (runner *runner) DeleteChain(table Table, chain Chain) error {
  2. fullArgs := makeFullArgs(table, chain)
  3. //...
  4. //执行"iptables -t $tableName -X $chainName"
  5. out, err := runner.run(opDeleteChain, fullArgs)
  6. //...
  7. }

runner.EnsureRule() 检测规则是否存在,不存在则指定的”表内链上”, 指定position添加规则

!FILENAME pkg/util/iptables/iptables.go:271

  1. func (runner *runner) EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error) {
  2. fullArgs := makeFullArgs(table, chain, args...)
  3. runner.mu.Lock()
  4. defer runner.mu.Unlock()
  5. // 检测规则是否存在
  6. exists, err := runner.checkRule(table, chain, args...)
  7. if err != nil {
  8. return false, err
  9. }
  10. if exists {
  11. return true, nil
  12. }
  13. // RulePosition "-I" "-A"
  14. // 指定链序插入规则,执行"iptables -I $chainName -t $tableName ... "
  15. // 链末添加规则,执行"iptables -A $chainName -t $tableName ... "
  16. out, err := runner.run(operation(position), fullArgs)
  17. if err != nil {
  18. return false, fmt.Errorf("error appending rule: %v: %s", err, out)
  19. }
  20. return false, nil
  21. }
  22. //checkRule()先判断iptables是否支持"-C"flag,调用不同版本的检测rule的方法
  23. func (runner *runner) checkRule(table Table, chain Chain, args ...string) (bool, error) {
  24. if runner.hasCheck {
  25. return runner.checkRuleUsingCheck(makeFullArgs(table, chain, args...))
  26. }
  27. return runner.checkRuleWithoutCheck(table, chain, args...)
  28. }
  29. //支持"-C"flag
  30. func (runner *runner) checkRuleUsingCheck(args []string) (bool, error) {
  31. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  32. defer cancel()
  33. //...
  34. //执行"iptables -wait -C $chainName -t $tableName ... "
  35. out, err := runner.runContext(ctx, opCheckRule, args)
  36. //...
  37. }
  38. //不支持"-C"flag,为了兼容iptables版本<1.4.11
  39. func (runner *runner) checkRuleWithoutCheck(table Table, chain Chain, args ...string) (bool, error) {
  40. // 'iptables-save -t $tableName'
  41. iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
  42. klog.V(1).Infof("running %s -t %s", iptablesSaveCmd, string(table))
  43. out, err := runner.exec.Command(iptablesSaveCmd, "-t", string(table)).CombinedOutput()
  44. if err != nil {
  45. return false, fmt.Errorf("error checking rule: %v", err)
  46. }
  47. //移除引号
  48. var argsCopy []string
  49. for i := range args {
  50. tmpField := strings.Trim(args[i], "\"")
  51. tmpField = trimhex(tmpField)
  52. argsCopy = append(argsCopy, strings.Fields(tmpField)...)
  53. }
  54. argset := sets.NewString(argsCopy...)
  55. for _, line := range strings.Split(string(out), "\n") {
  56. var fields = strings.Fields(line)
  57. //检测rule的链是否一致
  58. if !strings.HasPrefix(line, fmt.Sprintf("-A %s", string(chain))) || len(fields) != len(argsCopy)+2 {
  59. continue
  60. }
  61. // 移除所有引号
  62. for i := range fields {
  63. fields[i] = strings.Trim(fields[i], "\"")
  64. fields[i] = trimhex(fields[i])
  65. }
  66. //字符集匹配查找是否存在
  67. if sets.NewString(fields...).IsSuperset(argset) {
  68. return true, nil
  69. }
  70. klog.V(5).Infof("DBG: fields is not a superset of args: fields=%v args=%v", fields, args)
  71. }
  72. return false, nil
  73. }

runner.DeleteRule() “-D” 指定的”表中链上”删除规则

!FILENAME pkg/util/iptables/iptables.go:292

  1. func (runner *runner) DeleteRule(table Table, chain Chain, args ...string) error {
  2. fullArgs := makeFullArgs(table, chain, args...)
  3. //...
  4. //检测规则是否存在
  5. exists, err := runner.checkRule(table, chain, args...)
  6. //...
  7. //执行"iptables -D $chainName -t $tableName ..."
  8. out, err := runner.run(opDeleteRule, fullArgs)
  9. //...
  10. }

runner.SaveInto() 保存指定表的iptables规则集(buffer内)

!FILENAME pkg/util/iptables/iptables.go:317

  1. func (runner *runner) SaveInto(table Table, buffer *bytes.Buffer) error {
  2. //...
  3. // 执行 "iptables-save -t $tableName"
  4. iptablesSaveCmd := iptablesSaveCommand(runner.protocol)
  5. args := []string{"-t", string(table)}
  6. cmd := runner.exec.Command(iptablesSaveCmd, args...)
  7. cmd.SetStdout(buffer)
  8. cmd.SetStderr(buffer)
  9. return cmd.Run()
  10. }

runner.Restore() 装载指定表由iptables-save保存的规则集(从标准输入接收输入)

!FILENAME pkg/util/iptables/iptables.go:340

  1. func (runner *runner) Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
  2. // "iptables-restore -T $tableName"
  3. args := []string{"-T", string(table)}
  4. return runner.restoreInternal(args, data, flush, counters) //call and return
  5. }
  6. // restoreInternal()参数组装和iptables-restore命令恢复规则集data
  7. func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
  8. runner.mu.Lock()
  9. defer runner.mu.Unlock()
  10. trace := utiltrace.New("iptables restore")
  11. defer trace.LogIfLong(2 * time.Second)
  12. //参数的组装 "--noflush" "--counters" "--wait"
  13. if !flush {
  14. args = append(args, "--noflush")
  15. }
  16. if counters {
  17. args = append(args, "--counters")
  18. }
  19. if len(runner.restoreWaitFlag) == 0 {
  20. locker, err := grabIptablesLocks(runner.lockfilePath)
  21. if err != nil {
  22. return err
  23. }
  24. trace.Step("Locks grabbed")
  25. defer func(locker iptablesLocker) {
  26. if err := locker.Close(); err != nil {
  27. klog.Errorf("Failed to close iptables locks: %v", err)
  28. }
  29. }(locker)
  30. }
  31. fullArgs := append(runner.restoreWaitFlag, args...)
  32. iptablesRestoreCmd := iptablesRestoreCommand(runner.protocol)
  33. klog.V(4).Infof("running %s %v", iptablesRestoreCmd, fullArgs)
  34. // "iptables-restore -T $tableName --wait --noflush --counters < data"
  35. cmd := runner.exec.Command(iptablesRestoreCmd, fullArgs...)
  36. //从标准输入接受输入规则集data
  37. cmd.SetStdin(bytes.NewBuffer(data))
  38. //command对象执行与输出反馈
  39. b, err := cmd.CombinedOutput()
  40. if err != nil {
  41. return fmt.Errorf("%v (%s)", err, b)
  42. }
  43. return nil
  44. }

runner.RestoreAll() 如同上Restore(),调用命令iptables-restore装载所有备份规则集

!FILENAME pkg/util/iptables/iptables.go:347

  1. func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
  2. args := make([]string, 0)
  3. //同上,无参数限制
  4. return runner.restoreInternal(args, data, flush, counters)
  5. }

runner.AddReloadFunc() 注册reload回调函数,实现iptables reload重新加载规则

!FILENAME pkg/util/iptables/iptables.go:679

  1. func (runner *runner) AddReloadFunc(reloadFunc func()) {
  2. runner.mu.Lock()
  3. defer runner.mu.Unlock()
  4. //是否已启动监听
  5. if !runner.hasListener {
  6. runner.connectToFirewallD() //启动D-bus监听
  7. }
  8. runner.reloadFuncs = append(runner.reloadFuncs, reloadFunc) //注册信号触发回调func
  9. }
  10. //通过Linux内核D-bus机制实现对FirewallD进程的信号监听与处理(实现reload iptables规则)
  11. func (runner *runner) connectToFirewallD() {
  12. bus, err := runner.dbus.SystemBus()
  13. if err != nil {
  14. klog.V(1).Infof("Could not connect to D-Bus system bus: %s", err)
  15. return
  16. }
  17. runner.hasListener = true
  18. //SystemBus对象添加匹配规则定义(firewalld)
  19. rule := fmt.Sprintf("type='signal',sender='%s',path='%s',interface='%s',member='Reloaded'", firewalldName, firewalldPath, firewalldInterface)
  20. bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
  21. rule = fmt.Sprintf("type='signal',interface='org.freedesktop.DBus',member='NameOwnerChanged',path='/org/freedesktop/DBus',sender='org.freedesktop.DBus',arg0='%s'", firewalldName)
  22. bus.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
  23. runner.signal = make(chan *godbus.Signal, 10)
  24. bus.Signal(runner.signal)
  25. go runner.dbusSignalHandler(bus) //D-Bus信号监听处理Handler
  26. }
  27. //goroutine监听D-Bus信号,监听FirewallD发生变化和reload信号则reload规则集
  28. func (runner *runner) dbusSignalHandler(bus utildbus.Connection) {
  29. firewalld := bus.Object(firewalldName, firewalldPath)
  30. for s := range runner.signal {
  31. if s == nil {
  32. // 反注册dbus
  33. bus.Signal(runner.signal)
  34. return
  35. }
  36. switch s.Name {
  37. case "org.freedesktop.DBus.NameOwnerChanged": //信号:指定名称的拥有者发生了变化
  38. name := s.Body[0].(string)
  39. new_owner := s.Body[2].(string)
  40. // 信号名称为"org.fedoraproject.FirewallD1"
  41. if name != firewalldName || len(new_owner) == 0 {
  42. continue
  43. }
  44. firewalld.Call(firewalldInterface+".getDefaultZone", 0)
  45. runner.reload() //重新加载与同步规则(遍历调用runner.reloadFuncs())
  46. case firewalldInterface + ".Reloaded":
  47. runner.reload()
  48. }
  49. }
  50. }

runner.Destroy() D-bus监听注消

!FILENAME pkg/util/iptables/iptables.go:218

  1. func (runner *runner) Destroy() {
  2. if runner.signal != nil {
  3. runner.signal <- nil //D-Bug信号channel置为空实现反注册
  4. }
  5. }

上面为kube-proxy第三层的iptables Proxier代码分析所有内容,对于另外两种模式ipvs、userspace模式的proxier实现代码分析可查询userspace-mode proxier和ipvs-mode proxier文章内容。

~本文 END~