一、Kerbeos简介

Kerberos能够将认证的密钥在集群部署时当时放到牢靠的节点上。集群运行时,集群内的节点应用密钥失去认证,认证通过后的节点能力提供服务。希图假冒的节点因为没有当时失去的密钥信息,无奈与集群外部的节点通信。这样就避免了歹意地应用或篡改Hadoop集群的问题,确保了Hadoop集群的可靠性、安全性。

名词解释

AS(Authentication Server): 认证服务器
KDC(Key Distribution Center): 密钥散发核心
TGT(Ticket Granting Ticket): 票据受权票据,票据的票据
TGS(Ticket Granting Server):票据受权服务器
SS(Service Server): 特定服务提供端
Principal: 被认证的个体
Ticket: 票据, 客户端用来证实身份真实性。蕴含: 用户名,IP,工夫戳,有效期,会话秘钥。

二、装置zookeeper(以下操作以centos8为例)

#下载二进制压缩包wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/zookeeper-3.6.3.tar.gz#解压缩tar -zxvf zookeeper-3.6.3.tar.gz#进入zookeeper的根目录,而后把conf下的zoo_sample.cfg这个文件重命名为zoo.cfg#进去bin目录上一级目录 启动./bin/zkServer.sh start#查看是否启动胜利ps -aux | grep 'zookeeper-3.6.3' 或 ps -ef | grep 'zookeeper-3.6.3'

三、装置启动kafka

#下载wget http://mirrors.hust.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz#解压tar -zxvf kafka_2.12-2.8.0.tgz#重命名mv kafka_2.12-2.8.0 kafka-2.8.0#批改kafka根目录下 config/server.properties 配置文件#新增如下配置port=9092#内网(示意本机ip)host.name=10.206.0.17   #外网(云服务器ip,如果有须要配置,没有这项去掉)advertised.host.name=119.45.174.249#启动kafka(-daemon示意后盾启动,在bin上一层目录启动)./bin/kafka-server-start.sh -daemon config/server.properties#敞开kafka./bin/kafka-server-stop.sh#创立topic./bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic test --zookeeper localhost:2181/kafka#创立生产者./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#创立消费者./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning#如想外界连贯须要敞开防火墙#查看防火墙状态systemctl status firewalld.service#敞开防火墙systemctl stop firewalld.service#启动防火墙systemctl start firewalld.service#防火墙主动重启systemctl enable firewalld.service

四、配置装置Kerberos

1.装置Kerberos相干软件

yum install krb5-server krb5-libs krb5-workstation -y

2、配置kdc.conf

vim /var/kerberos/krb5kdc/kdc.conf  内容如下: [kdcdefaults]kdc_ports = 88kdc_tcp_ports = 88 [realms]HADOOP.COM = {#master_key_type = aes256-ctsacl_file = /var/kerberos/krb5kdc/kadm5.acldict_file = /usr/share/dict/wordsadmin_keytab = /var/kerberos/krb5kdc/kadm5.keytabmax_renewable_life = 7dsupported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal }
HADOOP.COM:是设定的realms。名字随便。Kerberos能够反对多个realms,个别全用大写
masterkeytype,supported_enctypes默认应用aes256-cts。因为,JAVA应用aes256-######cts验证形式须要装置额定的jar包,这里暂不应用
acl_file:标注了admin的用户权限。
admin_keytab:KDC进行校验的keytab
supported_enctypes:反对的校验形式。留神把aes256-cts去掉

3、配置krb5.conf

vim /etc/krb5.conf 内容如下: # Configuration snippets may be placed in this directory as wellincludedir /etc/krb5.conf.d/ [logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = HADOOP.COM dns_lookup_realm = false dns_lookup_kdc = false ticket_lifetime = 24h renew_lifetime = 7d forwardable = true clockskew = 120 udp_preference_limit = 1[realms] HADOOP.COM = {  kdc = es1    //改为服务器主机名或ip地址  admin_server = es1   //改为服务器主机名或ip地址 } [domain_realm] .hadoop.com = HADOOP.COM hadoop.com = HADOOP.COM

4、初始化kerberos database

kdb5_util create -s -r HADOOP.COM

5、批改database administrator的ACL权限

vim /var/kerberos/krb5kdc/kadm5.acl #批改如下*/admin@HADOOP.COM  

6、启动kerberos daemons

#启动systemctl start kadmin krb5kdc#设置开机主动启动systemctl enable kadmin krb5kdc

7、kerberos简略操作

#1.首先以超管身份进入kadminkadmin.local#2.查看用户listprincs#新增用户kafka/es1addprinc kafka/es1#退出kadmin.localexit

8、Kafka集成Kerberos

1.生成用户keytab(该文件后续kafka配置和客户端认证需用到)

kadmin.local -q "xst  -k /var/kerberos/krb5kdc/kadm5.keytab  kafka/es1@HADOOP.COM"

2.在kafka装置目录下config创立kafka-jaas.conf

#内容如下KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="/var/kerberos/krb5kdc/kadm5.keytab"principal="kafka/es1@HADOOP.COM";}; KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=trueuseTicketCache=truekeyTab="/var/kerberos/krb5kdc/kadm5.keytab"principal="kafka/es1@HADOOP.COM";};

3.在config/server.properties增加如下配置

advertised.listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称security.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=GSSAPIsasl.enabled.mechanisms=GSSAPIsasl.kerberos.service.name=kafka

4.在bin/kafka-run-class.sh脚本增加kafka jvm参数("<"外面的内容,留神 ">" 要去掉,如原参数外面内容和待增加雷同,只保留一份即可)

#jvm performance optionsif [ -z "$KAFKAJVMPERFORMANCEOPTS" ]; then KAFKAJVMPERFORMANCEOPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent < -Djava.awt.headless=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/xingtongping/kafka_2.12-2.2.0/config/kafka-jaas.conf >" fi

5.配置config/producer.properties,kafka生产者kerberos配置

security.protocol = SASL_PLAINTEXTsasl.mechanism = GSSAPIsasl.kerberos.service.name =kafka

6.配置config/consumer.properties,kafka消费者kerberos配置

security.protocol = SASL_PLAINTEXTsasl.mechanism = GSSAPIsasl.kerberos.service.name=kafka

五、linux测试命令

1.启动kafka服务

./bin/kafka-server-start.sh config/server.properties

2.创立生产者

./bin/kafka-console-producer.sh --broker-list es1:9092 --topic test  --producer.config config/producer.properties

3.创立消费者

./bin/kafka-console-consumer.sh --bootstrap-server es1:9092 --topic test  --consumer.config config/consumer.properties

六、本地环境java客户端生产测试代码

1.把krb5.conf和kafka-jaas.conf下载到本地,并且批改kafka-jaas.conf文件的keytab门路:改为本地keytab门路


2.应用域名记得批改hosts文件,增加内容:172.16.110.173 es1
3.增加如下代码测试

public class Consumer {    public static void main(String[] args) throws IOException {      System.setProperty("java.security.krb5.conf","D:\zookeeper\kerberos\bd\krb5.conf");              //认证代码     System.setProperty("java.security.auth.login.config","D:\zookeeper\kerberos\bd\kafka-jaas.conf");//认证代码     Properties props = new Properties();      //集群地址,多个地址用","分隔     props.put("bootstrap.servers", "es1:9092");//主机名称或Ip                props.put("sasl.kerberos.service.name", "kafka");     //认证代码     props.put("sasl.mechanism", "GSSAPI");                //认证代码     props.put("security.protocol", "SASL_PLAINTEXT");     //认证代码     props.put("group.id", "1");     props.put("enable.auto.commit", "true");     props.put("auto.commit.interval.ms", "1000");     props.put("session.timeout.ms", "30000");     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");      //创立消费者     KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);     // 订阅topic,能够为多个用,隔开,此处订阅了"test"这个主题     consumer.subscribe(Arrays.asList("test"));     //继续监听     while(true){        //poll频率        ConsumerRecords<String,String> consumerRecords = consumer.poll(100);        for(ConsumerRecord<String,String> consumerRecord : consumerRecords){            System.out.println("在test中读到:" + consumerRecord.value());        }    } }}