OpenYurt 之 Yurthub 数据过滤框架解析

2022年05月11日 阅读数:4
这篇文章主要向大家介绍OpenYurt 之 Yurthub 数据过滤框架解析,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

做者:应健健,新华智云计算中心node

OpenYurt 是业界首个非侵入的边缘计算云原生开源项目,经过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的使用体验。在 Openyurt 里边缘网络可使用数据过滤框架在不一样节点池里实现边缘流量闭环能力。nginx

Yurthub 数据过滤框架解析

Yurthub 本质上是一层 kube-apiserver 的代理,在代理的基础上加了一层 cache,一来保证边缘节点离线的状况下可使用本地 cache 保证业务稳定性,有效的解决了边缘自治的问题。二来能够下降大量的 list & watch 操做对云上 api 产生必定的负载。git

Yurthub 的数据过滤经过节点上的 pod 以及 kubelet 的请求经过 Load Balancer 发送给 kube-apiserver,代理接收到响应消息进行数据过滤处理,以后再将过滤后的数据返回给请求方。若是节点是边缘节点会根据请求类型对响应请求体中的资源进行本地缓存,若是是云端节点考虑到网络状态良好不进行本地缓存。github

Yurthub 的过滤框架实现原理图:api

Yurthub 目前包含四种过滤规则,经过 addons 请求的 user-agent,resource,verb 判断通过那个过滤器进行相应的数据过滤。缓存

四种过滤规则功能及实现

ServiceTopologyFilter 

主要针对 EndpointSlice 资源进行数据过滤, 但 Endpoint Slice 特性须要在 Kubernetes v1.18 或以上版本才能支持,若是在 1.18 版本如下建议使用 endpointsFilter 过滤器。当通过该过滤器首先经过 kubernetes.io/service-name 找到 endpointSlice 资源所对应的 services 资源,以后判断 servces 资源是否存在 openyurt.io/topologyKeys 这个 Annotations,若是存在那么经过这个 Annotations 的值判断数据过滤规则,最后更新 response data 返回给 addons。网络

Annotations 的值分为两大类:app

一、kubernetes.io/hostname:只过滤出相同节点的 endpoint ip框架

二、openyurt.io/nodepool 或者 kubernetes.io/zone: 经过这个 Annotations 获取对应节点池,最后遍历 endpointSlice 资源,经过 endpointSlice 里的 topology 字段中的 kubernetes.io/hostname 字段在 endpointSlice 对象里找到对应的 Endpoints,以后重组 endpointSlice 里的 Endpoints 后返回给 addons。 性能

代码实现:

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
   var serviceTopologyType string
   // get the service Topology type
   if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {
      svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
         return endpointSlice
      }
 
      if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {
         klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
         return endpointSlice
      }
   }
 
   var newEps []discovery.Endpoint
   // if type of service Topology is 'kubernetes.io/hostname'
   // filter the endpoint just on the local host
   if serviceTopologyType == AnnotationServiceTopologyValueNode {
      for i := range endpointSlice.Endpoints {
         if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
            newEps = append(newEps, endpointSlice.Endpoints[i])
         }
      }
      endpointSlice.Endpoints = newEps
   } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
      // if type of service Topology is openyurt.io/nodepool
      // filter the endpoint just on the node which is in the same nodepool with current node
      currentNode, err := fh.nodeGetter(fh.nodeName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
         return endpointSlice
      }
      if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
         nodePool, err := fh.nodePoolLister.Get(nodePoolName)
         if err != nil {
            klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
            return endpointSlice
         }
         for i := range endpointSlice.Endpoints {
            if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
               newEps = append(newEps, endpointSlice.Endpoints[i])
            }
         }
         endpointSlice.Endpoints = newEps
      }
   }
   return endpointSlice
}

EndpointsFilter

针对 endpoints 资源进行相应的数据过滤,首先判断 endpoint 是否存在对应的 service,经过 node 的 label: apps.openyurt.io/nodepool 获取节点池,以后获取节点池下的全部节点,遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 以后返回给 addons。

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
   svcName := endpoints.Name
   _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
      return endpoints
   }
   // filter the endpoints on the node which is in the same nodepool with current node
   currentNode, err := fh.nodeGetter(fh.nodeName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
      return endpoints
   }
   if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
      nodePool, err := fh.nodePoolLister.Get(nodePoolName)
      if err != nil {
         klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
         return endpoints
      }
      var newEpSubsets []v1.EndpointSubset
      for i := range endpoints.Subsets {
         endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
         endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
         if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {
            newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
         }
      }
      endpoints.Subsets = newEpSubsets
      if len(endpoints.Subsets) == 0 {
         // this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it
         return nil
      }
   }
   return endpoints
}

MasterServiceFilter

针对 services 下的域名进行 ip 以及端口替换,这个过滤器的场景主要在于边缘端的 pod 无缝使用 InClusterConfig 访问集群资源。

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)
      return b, nil
   }
 
   // return data un-mutated if not ServiceList
   serviceList, ok := list.(*v1.ServiceList)
   if !ok {
      return b, nil
   }
 
   // mutate master service
   for i := range serviceList.Items {
      if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {
         serviceList.Items[i].Spec.ClusterIP = fh.host
         for j := range serviceList.Items[i].Spec.Ports {
            if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {
               serviceList.Items[i].Spec.Ports[j].Port = fh.port
               break
            }
         }
         klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))
         break
      }
   }
 
   // return the mutated serviceList
   return fh.serializer.Encode(serviceList)
}

DiscardCloudService

该过滤器针对两种 service 其中的一种类型是 LoadBalancer,由于边缘端没法访问 LoadBalancer 类型的资源,因此该过滤器会将这种类型的资源直接过滤掉。另一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc,这个 services 主要存在 cloud 节点用于访问 yurt-tunnel-server,对于 edge 节点会直接过滤掉该 service。

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
      return b, nil
   }
 
   serviceList, ok := list.(*v1.ServiceList)
   if ok {
      var svcNew []v1.Service
      for i := range serviceList.Items {
         nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
         // remove lb service
         if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {
            if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
               klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
               continue
            }
         }
 
         // remove cloud clusterIP service
         if _, ok := cloudClusterIPService[nsName]; ok {
            klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
            continue
         }
 
         svcNew = append(svcNew, serviceList.Items[i])
      }
      serviceList.Items = svcNew
      return fh.serializer.Encode(serviceList)
   }
 
   return b, nil
}

过滤框架现状

目前的过滤框架比较僵硬,将资源过滤硬编码至代码中,只能是已注册的资源才能进行相应的过滤,为了解决这个问题,须要对过滤框架进行相应的改造。

解决方案

方案一:

使用参数或者环境变量的形式自定义过滤配置,可是这种方式有如下弊端:

一、配置复杂须要将因此须要自定义的配置写入到启动参数或者读取环境变量 例以下格式:

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

二、没法热更新,每次修改配置都须要重启 Yurthub 生效。

方案二:

一、使用 configmap 的形式自定义过滤配置下降配置复杂度配置格式(user-agent/resource#list,watch) 多个资源经过逗号隔开。以下所示:

filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""

二、利用 Informer 机制保证配置实时生效

综合以上两点在 OpenYurt 中咱们选择了解决方案二。

开发过程当中遇到的问题

在边缘端 Informer watch 的 api 地址是 Yurthub 的代理地址,那么 Yurthub 在启动代理端口以前都是没法保证 configmap 的数据是正常的。若是在启动完成以后 addons 的请求先于 configmap 数据更新 这个时候会致使数据在没有过滤的状况下就返回给了 addons,这样会致使不少预期之外的问题。

为了解决这个问题 咱们须要在 apporve 中加入 WaitForCacheSync 保证数据同步完成以后才能返回相应的过滤数据,可是在 apporve 中加入 WaitForCacheSync 也直接致使 configmap 进行 watch 的时候也会被阻塞,因此须要在 WaitForCacheSync 以前加入一个白名单机制,当 Yurthub 使用 list & watch 访问 configmap 的时候咱们直接不进行数据过滤,相应的代码逻辑以下:

func (a *approver) Approve(comp, resource, verb string) bool {
   if a.isWhitelistReq(comp, resource, verb) {
      return false
   }
   if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
      panic("wait for configMap cache sync timeout")
   }
   a.Lock()
   defer a.Unlock()
   for _, requests := range a.nameToRequests {
      for _, request := range requests {
         if request.Equal(comp, resource, verb) {
            return true
         }
      }
   }
   return false
}

总结

一、经过上述的扩展能力能够看出,YurtHub 不单单是边缘节点上的带有数据缓存能力的反向代理。而是对 Kubernetes 节点应用生命周期管理加了一层新的封装,提供边缘计算所须要的核心管控能力。

二、YurtHub 不单单适用于边缘计算场景,其实能够做为节点侧的一个常备组件,适用于使用 Kubernetes 的任意场景。相信这也会驱动 YurtHub 向更高性能,更高稳定性发展。

点击​此处​​,当即了解 OpenYurt 项目!​