共计 10860 个字符,预计需要花费 28 分钟才能阅读完成。
序
本文次要钻研一下 spring-cloud-kubernetes-client-loadbalancer
ServiceInstanceListSupplier
org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java
public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {String getServiceId();
default Flux<List<ServiceInstance>> get(Request request) {return get();
}
static ServiceInstanceListSupplierBuilder builder() {return new ServiceInstanceListSupplierBuilder();
}
}
spring-cloud-loadbalancer 定义了 ServiceInstanceListSupplier,它继承自 Supplier,其泛型为
Flux<List<ServiceInstance>>
,它定义了 getServiceId、get(Request) 办法,并提供了 builder 静态方法
Request
org/springframework/cloud/client/loadbalancer/Request.java
public interface Request<C> {
// Avoid breaking backward compatibility
default C getContext() {return null;}
// TODO: define contents
}
Request 提供了 getContext 办法,默认返回 null
DefaultRequest
org/springframework/cloud/client/loadbalancer/DefaultRequest.java
public class DefaultRequest<T> implements Request<T> {
private T context;
public DefaultRequest() {new DefaultRequestContext();
}
public DefaultRequest(T context) {this.context = context;}
@Override
public T getContext() {return context;}
public void setContext(T context) {this.context = context;}
@Override
public String toString() {ToStringCreator to = new ToStringCreator(this);
to.append("context", context);
return to.toString();}
@Override
public boolean equals(Object o) {if (this == o) {return true;}
if (!(o instanceof DefaultRequest<?> that)) {return false;}
return Objects.equals(context, that.context);
}
@Override
public int hashCode() {return Objects.hash(context);
}
}
DefaultRequest 实现了 Request,其定义的泛型为 context 的类型
ServiceInstanceListSupplier
spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServicesListSupplier.java
public abstract class KubernetesServicesListSupplier implements ServiceInstanceListSupplier {
protected final Environment environment;
protected final KubernetesDiscoveryProperties discoveryProperties;
protected final KubernetesServiceInstanceMapper mapper;
public KubernetesServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,
KubernetesDiscoveryProperties discoveryProperties) {
this.environment = environment;
this.discoveryProperties = discoveryProperties;
this.mapper = mapper;
}
@Override
public String getServiceId() {return environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
}
@Override
public abstract Flux<List<ServiceInstance>> get();}
KubernetesServicesListSupplier 申明实现 ServiceInstanceListSupplier 接口,它是一个抽象类,定义了 get 办法,这里疏忽了 get(Request),没有将 Request 传递下来
KubernetesClientServicesListSupplier
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServicesListSupplier.java
public class KubernetesClientServicesListSupplier extends KubernetesServicesListSupplier {private static final Log LOG = LogFactory.getLog(KubernetesClientServicesListSupplier.class);
private CoreV1Api coreV1Api;
private KubernetesClientProperties kubernetesClientProperties;
private KubernetesNamespaceProvider kubernetesNamespaceProvider;
public KubernetesClientServicesListSupplier(Environment environment, KubernetesServiceInstanceMapper mapper,
KubernetesDiscoveryProperties discoveryProperties, CoreV1Api coreV1Api,
KubernetesNamespaceProvider kubernetesNamespaceProvider) {super(environment, mapper, discoveryProperties);
this.coreV1Api = coreV1Api;
this.kubernetesNamespaceProvider = kubernetesNamespaceProvider;
}
private String getNamespace() {return kubernetesNamespaceProvider != null ? kubernetesNamespaceProvider.getNamespace()
: kubernetesClientProperties.namespace();}
@Override
public Flux<List<ServiceInstance>> get() {LOG.info("Getting services with id" + this.getServiceId());
List<ServiceInstance> result = new ArrayList<>();
List<V1Service> services;
try {if (discoveryProperties.allNamespaces()) {services = coreV1Api.listServiceForAllNamespaces(null, null, "metadata.name=" + this.getServiceId(),
null, null, null, null, null, null, null, null).getItems();}
else {services = coreV1Api.listNamespacedService(getNamespace(), null, null, null,
"metadata.name=" + this.getServiceId(), null, null, null, null, null, null, null).getItems();}
services.forEach(service -> result.add(mapper.map(service)));
}
catch (ApiException e) {LOG.warn("Error retrieving service with name" + this.getServiceId(), e);
}
LOG.info("Returning services:" + result);
return Flux.defer(() -> Flux.just(result));
}
}
KubernetesClientServicesListSupplier 继承了 KubernetesServicesListSupplier,其结构器依赖 KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其 get 办法应用 coreV1Api.listServiceForAllNamespaces 或者 coreV1Api.listNamespacedService 来返回指定 serviceId 的 V1Service 信息,之后通过 mapper 转换为 ServiceInstance
V1Service
io/kubernetes/client/openapi/models/V1Service.java
public class V1Service implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
private String apiVersion;
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
private String kind;
public static final String SERIALIZED_NAME_METADATA = "metadata";
@SerializedName(SERIALIZED_NAME_METADATA)
private V1ObjectMeta metadata;
public static final String SERIALIZED_NAME_SPEC = "spec";
@SerializedName(SERIALIZED_NAME_SPEC)
private V1ServiceSpec spec;
public static final String SERIALIZED_NAME_STATUS = "status";
@SerializedName(SERIALIZED_NAME_STATUS)
private V1ServiceStatus status;
//......
}
V1Service 定义了 apiVersion、kind、metadata、spec、status 属性
KubernetesServiceInstanceMapper
spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/loadbalancer/KubernetesServiceInstanceMapper.java
public interface KubernetesServiceInstanceMapper<T> {KubernetesServiceInstance map(T service);
static String createHost(String serviceName, String namespace, String clusterDomain) {return String.format("%s.%s.svc.%s", serviceName, StringUtils.hasText(namespace) ? namespace : "default",
clusterDomain);
}
static boolean isSecure(Map<String, String> labels, Map<String, String> annotations, String servicePortName,
Integer servicePort) {if (labels != null) {final String securedLabelValue = labels.getOrDefault("secured", "false");
if (securedLabelValue.equals("true")) {return true;}
}
if (annotations != null) {final String securedAnnotationValue = annotations.getOrDefault("secured", "false");
if (securedAnnotationValue.equals("true")) {return true;}
}
return (servicePortName != null && servicePortName.endsWith("https")) || servicePort.toString().endsWith("443");
}
static Map<String, String> getMapWithPrefixedKeys(Map<String, String> map, String prefix) {if (map == null) {return new HashMap<>();
}
if (!StringUtils.hasText(prefix)) {return map;}
final Map<String, String> result = new HashMap<>();
map.forEach((k, v) -> result.put(prefix + k, v));
return result;
}
}
KubernetesServiceInstanceMapper 接口定义了 map 办法,它提供了 createHost、isSecure、getMapWithPrefixedKeys 静态方法
KubernetesClientServiceInstanceMapper
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientServiceInstanceMapper.java
public class KubernetesClientServiceInstanceMapper implements KubernetesServiceInstanceMapper<V1Service> {
private KubernetesLoadBalancerProperties properties;
private KubernetesDiscoveryProperties discoveryProperties;
public KubernetesClientServiceInstanceMapper(KubernetesLoadBalancerProperties properties,
KubernetesDiscoveryProperties discoveryProperties) {
this.properties = properties;
this.discoveryProperties = discoveryProperties;
}
@Override
public KubernetesServiceInstance map(V1Service service) {final V1ObjectMeta meta = service.getMetadata();
final List<V1ServicePort> ports = service.getSpec().getPorts();
V1ServicePort port = null;
if (ports.size() == 1) {port = ports.get(0);
}
else if (ports.size() > 1 && StringUtils.hasText(this.properties.getPortName())) {Optional<V1ServicePort> optPort = ports.stream()
.filter(it -> properties.getPortName().endsWith(it.getName())).findAny();
if (optPort.isPresent()) {port = optPort.get();
}
}
if (port == null) {return null;}
final String host = KubernetesServiceInstanceMapper.createHost(service.getMetadata().getName(),
service.getMetadata().getNamespace(), properties.getClusterDomain());
final boolean secure = KubernetesServiceInstanceMapper.isSecure(service.getMetadata().getLabels(),
service.getMetadata().getAnnotations(), port.getName(), port.getPort());
return new DefaultKubernetesServiceInstance(meta.getUid(), meta.getName(), host, port.getPort(),
getServiceMetadata(service), secure);
}
private Map<String, String> getServiceMetadata(V1Service service) {final Map<String, String> serviceMetadata = new HashMap<>();
KubernetesDiscoveryProperties.Metadata metadataProps = this.discoveryProperties.metadata();
if (metadataProps.addLabels()) {
Map<String, String> labelMetadata = KubernetesServiceInstanceMapper
.getMapWithPrefixedKeys(service.getMetadata().getLabels(), metadataProps.labelsPrefix());
serviceMetadata.putAll(labelMetadata);
}
if (metadataProps.addAnnotations()) {
Map<String, String> annotationMetadata = KubernetesServiceInstanceMapper
.getMapWithPrefixedKeys(service.getMetadata().getAnnotations(), metadataProps.annotationsPrefix());
serviceMetadata.putAll(annotationMetadata);
}
return serviceMetadata;
}
}
KubernetesClientServiceInstanceMapper 实现了 KubernetesServiceInstanceMapper 接口,其泛型为 V1Service,其 map 办法先通过 service.getSpec().getPorts()获取 port 信息,之后通过 createHost 返回 svc 的短域名
<servicename>.<namespace>.svc.<clusterdomain>
,例如service-a.default.svc.cluster.local
,最初创立 DefaultKubernetesServiceInstance
KubernetesClientLoadBalancerClientConfiguration
spring-cloud-kubernetes-client-loadbalancer/src/main/java/org/springframework/cloud/kubernetes/client/loadbalancer/KubernetesClientLoadBalancerClientConfiguration.java
public class KubernetesClientLoadBalancerClientConfiguration {
@Bean
@ConditionalOnProperty(name = "spring.cloud.kubernetes.loadbalancer.mode", havingValue = "SERVICE")
ServiceInstanceListSupplier kubernetesServicesListSupplier(Environment environment, CoreV1Api coreV1Api,
KubernetesClientServiceInstanceMapper mapper, KubernetesDiscoveryProperties discoveryProperties,
KubernetesNamespaceProvider kubernetesNamespaceProvider, ConfigurableApplicationContext context) {return ServiceInstanceListSupplier.builder().withBase(new KubernetesClientServicesListSupplier(environment,
mapper, discoveryProperties, coreV1Api, kubernetesNamespaceProvider)).withCaching().build(context);
}
}
KubernetesClientLoadBalancerClientConfiguration 在
spring.cloud.kubernetes.loadbalancer.mode
设置为SERVICE
的时候会主动创立 kubernetesServicesListSupplier
小结
spring-cloud-loadbalancer 定义了 ServiceInstanceListSupplier,它继承自 Supplier,其泛型为 Flux<List<ServiceInstance>>
,它定义了 getServiceId、get(Request) 办法,并提供了 builder 静态方法;KubernetesServicesListSupplier 申明实现 ServiceInstanceListSupplier 接口,它是一个抽象类,定义了 get 办法;KubernetesClientServicesListSupplier 继承了 KubernetesServicesListSupplier,其结构器依赖 KubernetesServiceInstanceMapper、KubernetesDiscoveryProperties、CoreV1Api、KubernetesNamespaceProvider;其 get 办法应用 coreV1Api.listServiceForAllNamespaces 或者 coreV1Api.listNamespacedService 来返回指定 serviceId 的 V1Service 信息,之后通过 mapper 转换为 ServiceInstance。
整体来看 spring-cloud-kubernetes-client-loadbalancer 目前仅反对 spring.cloud.kubernetes.loadbalancer.mode 为 SERVICE 的模式,其实外部还是走的 k8s 的 service 的域名解析及负载平衡,无奈细粒度到个性化的负载平衡。