乐趣区

关于kubernetes:聊聊springcloudkubernetesclientdiscovery

本文次要钻研一下 spring-cloud-kubernetes-client-discovery

DiscoveryClient

org/springframework/cloud/client/discovery/DiscoveryClient.java

public interface DiscoveryClient extends Ordered {

    /**
     * Default order of the discovery client.
     */
    int DEFAULT_ORDER = 0;

    /**
     * A human-readable description of the implementation, used in HealthIndicator.
     * @return The description.
     */
    String description();

    /**
     * Gets all ServiceInstances associated with a particular serviceId.
     * @param serviceId The serviceId to query.
     * @return A List of ServiceInstance.
     */
    List<ServiceInstance> getInstances(String serviceId);

    /**
     * @return All known service IDs.
     */
    List<String> getServices();

    /**
     * Can be used to verify the client is valid and able to make calls.
     * <p>
     * A successful invocation with no exception thrown implies the client is able to make
     * calls.
     * <p>
     * The default implementation simply calls {@link #getServices()} - client
     * implementations can override with a lighter weight operation if they choose to.
     */
    default void probe() {getServices();
    }

    /**
     * Default implementation for getting order of discovery clients.
     * @return order
     */
    @Override
    default int getOrder() {return DEFAULT_ORDER;}

}

spring-cloud-commons 提供了 DiscoveryClient 接口,它定义了 description、getInstances、getServices、probe、getOrder 办法

KubernetesInformerDiscoveryClient

spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesInformerDiscoveryClient.java

public class KubernetesInformerDiscoveryClient implements DiscoveryClient {private static final LogAccessor LOG = new LogAccessor(LogFactory.getLog(KubernetesInformerDiscoveryClient.class));

    private final List<SharedInformerFactory> sharedInformerFactories;

    private final List<Lister<V1Service>> serviceListers;

    private final List<Lister<V1Endpoints>> endpointsListers;

    private final Supplier<Boolean> informersReadyFunc;

    private final KubernetesDiscoveryProperties properties;

    private final Predicate<V1Service> filter;

    private final ServicePortSecureResolver servicePortSecureResolver;

    // visible only for testing and
    // must be constructor injected in a future release
    @Autowired
    CoreV1Api coreV1Api;

    @Override
    public String description() {return "Kubernetes Client Discovery";}

    @Override
    public List<String> getServices() {List<String> services = serviceListers.stream().flatMap(serviceLister -> serviceLister.list().stream())
                .filter(service -> matchesServiceLabels(service, properties)).filter(filter)
                .map(s -> s.getMetadata().getName()).distinct().toList();
        LOG.debug(() -> "will return services :" + services);
        return services;
    }

    @PostConstruct
    public void afterPropertiesSet() {postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers);
    }

    @Override
    public int getOrder() {return properties.order();
    }

    //......
}    

spring-cloud-kubernetes-client-discovery 的 KubernetesInformerDiscoveryClient 实现了 DiscoveryClient 接口;其 description 办法返回的是 Kubernetes Client Discovery,其 getServices 办法应用的是 serviceListers 的 list 办法获取 service,加上 service.metadata 的 label 与 KubernetesDiscoveryProperties 的 serviceLabels 匹配过滤而来

getInstances

    public List<ServiceInstance> getInstances(String serviceId) {Objects.requireNonNull(serviceId, "serviceId must be provided");

        List<V1Service> allServices = serviceListers.stream().flatMap(x -> x.list().stream())
                .filter(scv -> scv.getMetadata() != null).filter(svc -> serviceId.equals(svc.getMetadata().getName()))
                .filter(scv -> matchesServiceLabels(scv, properties)).toList();

        List<ServiceInstance> serviceInstances = allServices.stream().filter(filter)
                .flatMap(service -> serviceInstances(service, serviceId).stream())
                .collect(Collectors.toCollection(ArrayList::new));

        if (properties.includeExternalNameServices()) {LOG.debug(() -> "Searching for'ExternalName'type of services with serviceId :" + serviceId);
            List<V1Service> externalNameServices = allServices.stream().filter(s -> s.getSpec() != null)
                    .filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType())).toList();
            for (V1Service service : externalNameServices) {ServiceMetadata serviceMetadata = serviceMetadata(service);
                Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata,
                        properties);

                K8sInstanceIdHostPodNameSupplier supplierOne = externalName(service);
                K8sPodLabelsAndAnnotationsSupplier supplierTwo = externalName();

                ServiceInstance externalNameServiceInstance = serviceInstance(null, serviceMetadata, supplierOne,
                        supplierTwo, new ServicePortNameAndNumber(-1, null), serviceInstanceMetadata, properties);
                serviceInstances.add(externalNameServiceInstance);
            }
        }

        return serviceInstances;
    }

getInstances 办法用 serviceListers 来对 service.metadata.name 与 serviceId 进行匹配获取到 service,之后通过 serviceInstances 办法失去 serviceInstances,最初加上 ExternalName 类型的 service

serviceInstances

    private List<ServiceInstance> serviceInstances(V1Service service, String serviceId) {List<ServiceInstance> instances = new ArrayList<>();

        List<V1Endpoints> allEndpoints = endpointsListers.stream()
                .map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId))
                .filter(Objects::nonNull).toList();

        for (V1Endpoints endpoints : allEndpoints) {List<V1EndpointSubset> subsets = endpoints.getSubsets();
            if (subsets == null || subsets.isEmpty()) {LOG.debug(() -> "serviceId :" + serviceId + "does not have any subsets");
            }
            else {ServiceMetadata serviceMetadata = serviceMetadata(service);
                Map<String, Integer> portsData = endpointSubsetsPortData(subsets);
                Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata,
                        properties);

                for (V1EndpointSubset endpointSubset : subsets) {Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset));
                    ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties);

                    List<V1EndpointAddress> addresses = addresses(endpointSubset, properties);
                    for (V1EndpointAddress endpointAddress : addresses) {K8sInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress, service);
                        K8sPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api,
                                service.getMetadata().getNamespace());

                        ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata,
                                supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties);
                        instances.add(serviceInstance);
                    }
                }

            }
        }

        return instances;
    }

serviceInstances 办法通过 endpointsListers 依据 namespace 和 serviceId 来获取对应的 Endpoints,依据其 subsets,通过 K8sInstanceIdHostPodNameSupplier、K8sPodLabelsAndAnnotationsSupplier 来组装 DefaultKubernetesServiceInstance,最初返回

小结

spring-cloud-commons 提供了 DiscoveryClient 接口,它定义了 description、getInstances、getServices、probe、getOrder 办法;spring-cloud-kubernetes-client-discovery 的 KubernetesInformerDiscoveryClient 实现了 DiscoveryClient 接口;其 description 办法返回的是 Kubernetes Client Discovery,其 getServices 办法应用的是 serviceListers 的 list 办法获取 service,加上 service.metadata 的 label 与 KubernetesDiscoveryProperties 的 serviceLabels 匹配过滤而来。

退出移动版