一、简介

环境:
1、JDK 1.8
2、SpringBoot
3、华为Kafka集群

Kafka集群用户名明码的连贯集群简略,Kerberos认证集群的网上简直没有,这里我把对接的重要点分享一下。

二、集成

1、能到手上的材料

Kafka集群个别不是咱们装置的,当咱们须要对接到Kafka集群的时候,对方会提供给咱们的货色有:

  • 密钥文件:user.keytab
  • principal:xxxxxx
  • 加密协议security.protocol: SASL_SSL或SASL_PLAINTEXT
  • jaas.conf(如果没有,咱们也能够依据以上内容构建一个)

2、入手

2.1、程序配置

通过官网、网上的材料,咱们最初的配置文件如下:

spring:  ##########################################################################  #############  kafka 配置  ##########################################################################  kafka:    # kafka实例的broker地址和端口     bootstrap-servers: 100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x    # 生产者配置    producer:      # 重试次数,则客户端会将发送失败的记录从新发送      retries: 1      # 16K      batch-size: 16384      # #32M      buffer-memory: 33554432      # 发送确认参数: 0:发送后不论, 1:发送后Partition Leader音讯落盘, all: 所有的正本都ok才返回      acks: 1      # 指定音讯key和音讯体的编解码形式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      # 消费者组       group-id: Consumer-test      # 动提交      enable-auto-commit: true      # 偏移量的形式:      # earliest:当各分区下有已提交的offset时,从提交的offset开始生产;无提交的offset时,从头开始生产      # latest:  当各分区下有已提交的offset时,从提交的offset开始生产;无提交的offset时,从新产生的该分区下的数据生产      auto-offset-reset: latest      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    jaas:      enabled: true      login-module: com.sun.security.auth.module.Krb5LoginModule      control-flag: required      options:        "useKeyTab": true        "debug": true        "useTicketCache": false        "storeKey": true        "keyTab": "/etc/user.keytab"        "principal": xxxxx    properties:      # 加密协议,目前反对 SASL_SSL、SASL_PLAINTEXT 协定      "security.protocol": SASL_PLAINTEXT      # 域名      "kerberos.domain.name": topinfo.com      # 服务名      "sasl.kerberos.service.name": kafka

关注: jaas 属性的配置,"keyTab" 的门路配置。

2.2、其余配置

咱们还须要在指定 java.security.auth.login.config 的配置,

网上说 能够通过 System.setProperty("java.security.auth.login.config", /etc/jaas/root.jaas.conf)

而我是抉择在Tomccat的/bin/catalina.sh中增加一下内容:
JAVA_OPTS=" $JAVA_OPTS -Djava.security.auth.login.config=/etc/jaas/root.jaas.conf"

3、异样排查

错误信息: Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)

谬误详情:

2022-08-30 21:20:52.052 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  (org.apache.kafka.common.network.Selector:?) - [Consumer clientId=collect-Consumer-3, groupId=collect-Consumer] Failed authentication with topinfo/11.11.11.20 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.)

解决办法:

更换kafka-clients,应用华为依赖仓库里的kafka-clients,下载deploy到本人的私仓 针对FusionInsight 6.5.1 下载kafka-clients-2.4.0-hw-ei-311006.jar版本的亲测可用

华为仓库地址: https://repo.huaweicloud.com/...

我这里给出我的maven:

         <!-- spring kafka 排查依赖的kafka而引入华为的kafak包 -->         <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>            <version>2.3.4.RELEASE</version>            <exclusions>                <exclusion>                    <groupId>org.apache.kafka</groupId>                    <artifactId>kafka-clients</artifactId>                </exclusion>                <exclusion>                    <groupId>org.apache.kafka</groupId>                    <artifactId>kafka-streams</artifactId>                </exclusion>            </exclusions>        </dependency>        <!-- 华为 组件 kafka  start -->        <dependency>            <groupId>com.huawei</groupId>            <artifactId>kafka-clients</artifactId>            <version>2.4.0</version>            <scope>system</scope>            <systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-311006.jar</systemPath>        </dependency>        <dependency>            <groupId>com.huawei</groupId>            <artifactId>kafka</artifactId>            <version>2.11</version>            <scope>system</scope>            <systemPath>${project.basedir}/lib/kafka_2.11-1.1.0.jar</systemPath>        </dependency>        <dependency>            <groupId>com.huawei</groupId>            <artifactId>kafka-streams-examples</artifactId>            <version>1.1.0</version>            <scope>system</scope>            <systemPath>${project.basedir}/lib/kafka-streams-examples-1.1.0.jar</systemPath>        </dependency>        <dependency>            <groupId>com.huawei</groupId>            <artifactId>kafka-streams</artifactId>            <version>1.1.0</version>            <scope>system</scope>            <systemPath>${project.basedir}/lib/kafka-streams-1.1.0.jar</systemPath>        </dependency>        <!-- 华为 组件 kafka  end -->

参考资料: https://www.baojieearth.cn/po...