共计 6367 个字符,预计需要花费 16 分钟才能阅读完成。
一、Kerbeos 简介
Kerberos 能够将认证的密钥在集群部署时当时放到牢靠的节点上。集群运行时,集群内的节点应用密钥失去认证,认证通过后的节点能力提供服务。希图假冒的节点因为没有当时失去的密钥信息,无奈与集群外部的节点通信。这样就避免了歹意地应用或篡改 Hadoop 集群的问题,确保了 Hadoop 集群的可靠性、安全性。
名词解释
AS(Authentication Server): 认证服务器
KDC(Key Distribution Center): 密钥散发核心
TGT(Ticket Granting Ticket): 票据受权票据,票据的票据
TGS(Ticket Granting Server): 票据受权服务器
SS(Service Server): 特定服务提供端
Principal: 被认证的个体
Ticket: 票据, 客户端用来证实身份真实性。蕴含: 用户名,IP,工夫戳,有效期,会话秘钥。
二、装置 zookeeper(以下操作以 centos8 为例)
# 下载二进制压缩包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/zookeeper-3.6.3.tar.gz
#解压缩
tar -zxvf zookeeper-3.6.3.tar.gz
#进入 zookeeper 的根目录,而后把 conf 下的 zoo_sample.cfg 这个文件重命名为 zoo.cfg
#进去 bin 目录上一级目录 启动
./bin/zkServer.sh start
#查看是否启动胜利
ps -aux | grep 'zookeeper-3.6.3' 或 ps -ef | grep 'zookeeper-3.6.3'
三、装置启动 kafka
# 下载
wget http://mirrors.hust.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
#解压
tar -zxvf kafka_2.12-2.8.0.tgz
#重命名
mv kafka_2.12-2.8.0 kafka-2.8.0
#批改 kafka 根目录下 config/server.properties 配置文件
#新增如下配置
port=9092
#内网 (示意本机 ip)
host.name=10.206.0.17
#外网 (云服务器 ip, 如果有须要配置,没有这项去掉)
advertised.host.name=119.45.174.249
#启动 kafka(-daemon 示意后盾启动,在 bin 上一层目录启动)
./bin/kafka-server-start.sh -daemon config/server.properties
#敞开 kafka
./bin/kafka-server-stop.sh
#创立 topic
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic test --zookeeper localhost:2181/kafka
#创立生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#创立消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
#如想外界连贯须要敞开防火墙
#查看防火墙状态
systemctl status firewalld.service
#敞开防火墙
systemctl stop firewalld.service
#启动防火墙
systemctl start firewalld.service
#防火墙主动重启
systemctl enable firewalld.service
四、配置装置 Kerberos
1. 装置 Kerberos 相干软件
yum install krb5-server krb5-libs krb5-workstation -y
2、配置 kdc.conf
vim /var/kerberos/krb5kdc/kdc.conf
内容如下:[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
HADOOP.COM = {
#master_key_type = aes256-cts
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
max_renewable_life = 7d
supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
}
HADOOP.COM: 是设定的 realms。名字随便。Kerberos 能够反对多个 realms,个别全用大写
masterkeytype,supported_enctypes 默认应用 aes256-cts。因为,JAVA 应用 aes256-######cts 验证形式须要装置额定的 jar 包,这里暂不应用
acl_file: 标注了 admin 的用户权限。
admin_keytab:KDC 进行校验的 keytab
supported_enctypes: 反对的校验形式。留神把 aes256-cts 去掉
3、配置 krb5.conf
vim /etc/krb5.conf
内容如下:# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = HADOOP.COM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
clockskew = 120
udp_preference_limit = 1
[realms]
HADOOP.COM = {
kdc = es1 // 改为服务器主机名或 ip 地址
admin_server = es1 // 改为服务器主机名或 ip 地址
}
[domain_realm]
.hadoop.com = HADOOP.COM
hadoop.com = HADOOP.COM
4、初始化 kerberos database
kdb5_util create -s -r HADOOP.COM
5、批改 database administrator 的 ACL 权限
vim /var/kerberos/krb5kdc/kadm5.acl
#批改如下
*/admin@HADOOP.COM
6、启动 kerberos daemons
# 启动
systemctl start kadmin krb5kdc
#设置开机主动启动
systemctl enable kadmin krb5kdc
7、kerberos 简略操作
#1. 首先以超管身份进入 kadmin
kadmin.local
#2. 查看用户
listprincs
#新增用户 kafka/es1
addprinc kafka/es1
#退出 kadmin.local
exit
8、Kafka 集成 Kerberos
1. 生成用户 keytab(该文件后续 kafka 配置和客户端认证需用到)
kadmin.local -q "xst -k /var/kerberos/krb5kdc/kadm5.keytab kafka/es1@HADOOP.COM"
2. 在 kafka 装置目录下 config 创立 kafka-jaas.conf
# 内容如下
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="kafka/es1@HADOOP.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="kafka/es1@HADOOP.COM";
};
3. 在 config/server.properties 增加如下配置
advertised.listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称
listeners=SASL_PLAINTEXT://es1:9092 #对应主机名称
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
4. 在 bin/kafka-run-class.sh 脚本增加 kafka jvm 参数 (“<“ 外面的内容, 留神 “>” 要去掉,如原参数外面内容和待增加雷同,只保留一份即可)
#jvm performance options
if [-z "$KAFKAJVMPERFORMANCEOPTS"]; then KAFKAJVMPERFORMANCEOPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent < -Djava.awt.headless=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/xingtongping/kafka_2.12-2.2.0/config/kafka-jaas.conf >" fi
5. 配置 config/producer.properties,kafka 生产者 kerberos 配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name =kafka
6. 配置 config/consumer.properties,kafka 消费者 kerberos 配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name=kafka
五、linux 测试命令
1. 启动 kafka 服务
./bin/kafka-server-start.sh config/server.properties
2. 创立生产者
./bin/kafka-console-producer.sh --broker-list es1:9092 --topic test --producer.config config/producer.properties
3. 创立消费者
./bin/kafka-console-consumer.sh --bootstrap-server es1:9092 --topic test --consumer.config config/consumer.properties
六、本地环境 java 客户端生产测试代码
1. 把 krb5.conf 和 kafka-jaas.conf 下载到本地,并且批改 kafka-jaas.conf 文件的 keytab 门路: 改为本地 keytab 门路
2. 应用域名记得批改 hosts 文件,增加内容:172.16.110.173 es1
3. 增加如下代码测试
public class Consumer {public static void main(String[] args) throws IOException {System.setProperty("java.security.krb5.conf","D:\zookeeper\kerberos\bd\krb5.conf"); // 认证代码
System.setProperty("java.security.auth.login.config","D:\zookeeper\kerberos\bd\kafka-jaas.conf");// 认证代码
Properties props = new Properties();
// 集群地址,多个地址用 "," 分隔
props.put("bootstrap.servers", "es1:9092");// 主机名称或 Ip
props.put("sasl.kerberos.service.name", "kafka"); // 认证代码
props.put("sasl.mechanism", "GSSAPI"); // 认证代码
props.put("security.protocol", "SASL_PLAINTEXT"); // 认证代码
props.put("group.id", "1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创立消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// 订阅 topic,能够为多个用, 隔开,此处订阅了 "test" 这个主题
consumer.subscribe(Arrays.asList("test"));
// 继续监听
while(true){
//poll 频率
ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println("在 test 中读到:" + consumerRecord.value());
}
}
}
}
正文完