本文次要钻研一下spring-cloud-kubernetes-client-loadbalancer

ServiceInstanceListSupplier

org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {    String getServiceId();    default Flux<List<ServiceInstance>> get(Request request) {        return get();    }    static ServiceInstanceListSupplierBuilder builder() {        return new ServiceInstanceListSupplierBuilder();    }}
spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为Flux<List<ServiceInstance>>,它定义了getServiceId、get(Request)办法,并提供了builder静态方法

Request

org/springframework/cloud/client/loadbalancer/Request.java

public interface Request<C> {    // Avoid breaking backward compatibility    default C getContext() {        return null;    }    // TODO: define contents}
Request提供了getContext办法,默认返回null

DefaultRequest

org/springframework/cloud/client/loadbalancer/DefaultRequest.java

public class DefaultRequest<T> implements Request<T> {    private T context;    public DefaultRequest() {        new DefaultRequestContext();    }    public DefaultRequest(T context) {        this.context = context;    }    @Override    public T getContext() {        return context;    }    public void setContext(T context) {        this.context = context;    }    @Override    public String toString() {        ToStringCreator to = new ToStringCreator(this);        to.append("context", context);        return to.toString();    }    @Override    public boolean equals(Object o) {        if (this == o) {            return true;        }        if (!(o instanceof DefaultRequest<?> that)) {            return false;        }        return Objects.equals(context, that.context);    }    @Override    public int hashCode() {        return Objects.hash(context);    }}
DefaultRequest实现了Request,其定义的泛型为context的类型

ServiceInstanceListSupplier

spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServicesListSupplier.java

public abstract class KubernetesServicesListSupplier implements ServiceInstanceListSupplier {    protected final Environment environment;    protected final KubernetesDiscoveryProperties discoveryProperties;    protected final KubernetesServiceInstanceMapper mapper;    public KubernetesServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,            KubernetesDiscoveryProperties discoveryProperties) {        this.environment = environment;        this.discoveryProperties = discoveryProperties;        this.mapper = mapper;    }    @Override    public String getServiceId() {        return environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);    }    @Override    public abstract Flux<List<ServiceInstance>> get();}
KubernetesServicesListSupplier申明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get办法,这里疏忽了get(Request),没有将Request传递下来

KubernetesClientServicesListSupplier

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServicesListSupplier.java

public class KubernetesClientServicesListSupplier extends KubernetesServicesListSupplier {    private static final Log LOG = LogFactory.getLog(KubernetesClientServicesListSupplier.class);    private CoreV1Api coreV1Api;    private KubernetesClientProperties kubernetesClientProperties;    private KubernetesNamespaceProvider kubernetesNamespaceProvider;    public KubernetesClientServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,            KubernetesDiscoveryProperties discoveryProperties, CoreV1Api coreV1Api,            KubernetesNamespaceProvider kubernetesNamespaceProvider) {        super(environment, mapper, discoveryProperties);        this.coreV1Api = coreV1Api;        this.kubernetesNamespaceProvider = kubernetesNamespaceProvider;    }    private String getNamespace() {        return kubernetesNamespaceProvider != null ? kubernetesNamespaceProvider.getNamespace()                : kubernetesClientProperties.namespace();    }    @Override    public Flux<List<ServiceInstance>> get() {        LOG.info("Getting services with id " + this.getServiceId());        List<ServiceInstance> result = new ArrayList<>();        List<V1Service> services;        try {            if (discoveryProperties.allNamespaces()) {                services = coreV1Api.listServiceForAllNamespaces(null, null, "metadata.name=" + this.getServiceId(),                        null, null, null, null, null, null, null, null).getItems();            }            else {                services = coreV1Api.listNamespacedService(getNamespace(), null, null, null,                        "metadata.name=" + this.getServiceId(), null, null, null, null, null, null, null).getItems();            }            services.forEach(service -> result.add(mapper.map(service)));        }        catch (ApiException e) {            LOG.warn("Error retrieving service with name " + this.getServiceId(), e);        }        LOG.info("Returning services: " + result);        return Flux.defer(() -> Flux.just(result));    }}
KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其结构器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get办法应用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance

V1Service

io/kubernetes/client/openapi/models/V1Service.java

public class V1Service implements io.kubernetes.client.common.KubernetesObject {  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";  @SerializedName(SERIALIZED_NAME_API_VERSION)  private String apiVersion;  public static final String SERIALIZED_NAME_KIND = "kind";  @SerializedName(SERIALIZED_NAME_KIND)  private String kind;  public static final String SERIALIZED_NAME_METADATA = "metadata";  @SerializedName(SERIALIZED_NAME_METADATA)  private V1ObjectMeta metadata;  public static final String SERIALIZED_NAME_SPEC = "spec";  @SerializedName(SERIALIZED_NAME_SPEC)  private V1ServiceSpec spec;  public static final String SERIALIZED_NAME_STATUS = "status";  @SerializedName(SERIALIZED_NAME_STATUS)  private V1ServiceStatus status;  //......}  
V1Service定义了apiVersion、kind、metadata、spec、status属性

KubernetesServiceInstanceMapper

spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServiceInstanceMapper.java

public interface KubernetesServiceInstanceMapper<T> {    KubernetesServiceInstance map(T service);    static String createHost(String serviceName, String namespace, String clusterDomain) {        return String.format("%s.%s.svc.%s", serviceName, StringUtils.hasText(namespace) ? namespace : "default",                clusterDomain);    }    static boolean isSecure(Map<String, String> labels, Map<String, String> annotations, String servicePortName,            Integer servicePort) {        if (labels != null) {            final String securedLabelValue = labels.getOrDefault("secured", "false");            if (securedLabelValue.equals("true")) {                return true;            }        }        if (annotations != null) {            final String securedAnnotationValue = annotations.getOrDefault("secured", "false");            if (securedAnnotationValue.equals("true")) {                return true;            }        }        return (servicePortName != null && servicePortName.endsWith("https")) || servicePort.toString().endsWith("443");    }    static Map<String, String> getMapWithPrefixedKeys(Map<String, String> map, String prefix) {        if (map == null) {            return new HashMap<>();        }        if (!StringUtils.hasText(prefix)) {            return map;        }        final Map<String, String> result = new HashMap<>();        map.forEach((k, v) -> result.put(prefix + k, v));        return result;    }}
KubernetesServiceInstanceMapper接口定义了map办法,它提供了createHost、isSecure、getMapWithPrefixedKeys静态方法

KubernetesClientServiceInstanceMapper

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServiceInstanceMapper.java

public class KubernetesClientServiceInstanceMapper implements KubernetesServiceInstanceMapper<V1Service> {    private KubernetesLoadBalancerProperties properties;    private KubernetesDiscoveryProperties discoveryProperties;    public KubernetesClientServiceInstanceMapper(KubernetesLoadBalancerProperties properties,            KubernetesDiscoveryProperties discoveryProperties) {        this.properties = properties;        this.discoveryProperties = discoveryProperties;    }    @Override    public KubernetesServiceInstance map(V1Service service) {        final V1ObjectMeta meta = service.getMetadata();        final List<V1ServicePort> ports = service.getSpec().getPorts();        V1ServicePort port = null;        if (ports.size() == 1) {            port = ports.get(0);        }        else if (ports.size() > 1 && StringUtils.hasText(this.properties.getPortName())) {            Optional<V1ServicePort> optPort = ports.stream()                    .filter(it -> properties.getPortName().endsWith(it.getName())).findAny();            if (optPort.isPresent()) {                port = optPort.get();            }        }        if (port == null) {            return null;        }        final String host = KubernetesServiceInstanceMapper.createHost(service.getMetadata().getName(),                service.getMetadata().getNamespace(), properties.getClusterDomain());        final boolean secure = KubernetesServiceInstanceMapper.isSecure(service.getMetadata().getLabels(),                service.getMetadata().getAnnotations(), port.getName(), port.getPort());        return new DefaultKubernetesServiceInstance(meta.getUid(), meta.getName(), host, port.getPort(),                getServiceMetadata(service), secure);    }    private Map<String, String> getServiceMetadata(V1Service service) {        final Map<String, String> serviceMetadata = new HashMap<>();        KubernetesDiscoveryProperties.Metadata metadataProps = this.discoveryProperties.metadata();        if (metadataProps.addLabels()) {            Map<String, String> labelMetadata = KubernetesServiceInstanceMapper                    .getMapWithPrefixedKeys(service.getMetadata().getLabels(), metadataProps.labelsPrefix());            serviceMetadata.putAll(labelMetadata);        }        if (metadataProps.addAnnotations()) {            Map<String, String> annotationMetadata = KubernetesServiceInstanceMapper                    .getMapWithPrefixedKeys(service.getMetadata().getAnnotations(), metadataProps.annotationsPrefix());            serviceMetadata.putAll(annotationMetadata);        }        return serviceMetadata;    }}
KubernetesClientServiceInstanceMapper实现了KubernetesServiceInstanceMapper接口,其泛型为V1Service,其map办法先通过service.getSpec().getPorts()获取port信息,之后通过createHost返回svc的短域名<servicename>.<namespace>.svc.<clusterdomain>,例如service-a.default.svc.cluster.local,最初创立DefaultKubernetesServiceInstance

KubernetesClientLoadBalancerClientConfiguration

spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientLoadBalancerClientConfiguration.java

public class KubernetesClientLoadBalancerClientConfiguration {    @Bean    @ConditionalOnProperty(name = "spring.cloud.kubernetes.loadbalancer.mode", havingValue = "SERVICE")    ServiceInstanceListSupplier kubernetesServicesListSupplier(Environment environment, CoreV1Api coreV1Api,            KubernetesClientServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties,            KubernetesNamespaceProvider kubernetesNamespaceProvider, ConfigurableApplicationContext context) {        return ServiceInstanceListSupplier.builder().withBase(new KubernetesClientServicesListSupplier(environment,                mapper, discoveryProperties, coreV1Api, kubernetesNamespaceProvider)).withCaching().build(context);    }}
KubernetesClientLoadBalancerClientConfiguration在spring.cloud.kubernetes.loadbalancer.mode设置为SERVICE的时候会主动创立kubernetesServicesListSupplier

小结

spring-cloud-loadbalancer定义了ServiceInstanceListSupplier,它继承自Supplier,其泛型为Flux<List<ServiceInstance>>,它定义了getServiceId、get(Request)办法,并提供了builder静态方法;KubernetesServicesListSupplier申明实现ServiceInstanceListSupplier接口,它是一个抽象类,定义了get办法;KubernetesClientServicesListSupplier继承了KubernetesServicesListSupplier,其结构器依赖KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其get办法应用coreV1Api.listServiceForAllNamespaces或者coreV1Api.listNamespacedService来返回指定serviceId的V1Service信息,之后通过mapper转换为ServiceInstance。

整体来看spring-cloud-kubernetes-client-loadbalancer目前仅反对spring.cloud.kubernetes.loadbalancer.mode为SERVICE的模式,其实外部还是走的k8s的service的域名解析及负载平衡,无奈细粒度到个性化的负载平衡。