kafka(kraft版本)设置ssl认证

1,设置ssl脚本

#!/bin/sh

################################## 设置环境变量 ##############################
BASE_DIR=/root/ssl
CERT_OUTPUT_PATH="${BASE_DIR}/ssl"
PASSWORD=kafka1234567
ALIAS_NAME="localhost"

D_NAME="CN=Fusionskye, OU=Fusionskye, O=Fusionskye, L=Beijing, ST=Beijing, C=CN"
SUBJ="/C=CN/ST=Beijing/L=Beijing/O=Fusionskye/CN=Fusionskye"
##############################################################################

mkdir -p "${CERT_OUTPUT_PATH}"

keytool -keystore ${CERT_OUTPUT_PATH}/kafka.server.keystore.jks -alias ${ALIAS_NAME} -validity 3650 -keyalg RSA -genkey -keypass ${PASSWORD} -dname "${D_NAME}" -storepass ${PASSWORD}
openssl req -new -x509 -keyout ${CERT_OUTPUT_PATH}/ca-key -out ${CERT_OUTPUT_PATH}/ca-cert -days 3650 -passout pass:${PASSWORD} -subj "${SUBJ}"
keytool -keystore ${CERT_OUTPUT_PATH}/kafka.client.truststore.jks -alias CARoot -import -file ${CERT_OUTPUT_PATH}/ca-cert -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt
keytool -keystore ${CERT_OUTPUT_PATH}/kafka.server.truststore.jks -alias CARoot -import -file ${CERT_OUTPUT_PATH}/ca-cert -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt

keytool -keystore ${CERT_OUTPUT_PATH}/kafka.server.keystore.jks -alias ${ALIAS_NAME} -certreq -file ${CERT_OUTPUT_PATH}/cert-file -storepass ${PASSWORD}
openssl x509 -req -CA ${CERT_OUTPUT_PATH}/ca-cert -CAkey ${CERT_OUTPUT_PATH}/ca-key -in ${CERT_OUTPUT_PATH}/cert-file -out ${CERT_OUTPUT_PATH}/cert-signed -days 3650 -CAcreateserial -passin pass:${PASSWORD}
keytool -keystore ${CERT_OUTPUT_PATH}/kafka.server.keystore.jks -alias CARoot -import -file ${CERT_OUTPUT_PATH}/ca-cert -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt
keytool -keystore ${CERT_OUTPUT_PATH}/kafka.server.keystore.jks -alias ${ALIAS_NAME} -import -file ${CERT_OUTPUT_PATH}/cert-signed -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt

#以下是给go程序使用的,用不到golang的可以删除
echo -e "${PASSWORD}\r" | keytool -importkeystore -srckeystore ${CERT_OUTPUT_PATH}/kafka.server.truststore.jks -destkeystore ${CERT_OUTPUT_PATH}/server.p12 -deststoretype PKCS12 -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt
openssl pkcs12 -in ${CERT_OUTPUT_PATH}/server.p12 -nokeys -out ${CERT_OUTPUT_PATH}/server.cer.pem -passin pass:${PASSWORD}
echo -e "${PASSWORD}\r" | keytool -importkeystore -srckeystore ${CERT_OUTPUT_PATH}/kafka.server.keystore.jks -destkeystore ${CERT_OUTPUT_PATH}/client.p12 -deststoretype PKCS12 -storepass ${PASSWORD} -keypass ${PASSWORD} -noprompt
openssl pkcs12 -in ${CERT_OUTPUT_PATH}/client.p12 -nokeys -out ${CERT_OUTPUT_PATH}/client.cer.pem -passin pass:${PASSWORD}
# client.key.pem没有设置密码,-nodes指不设置密码参数
openssl pkcs12 -in ${CERT_OUTPUT_PATH}/client.p12 -nodes -nocerts -out ${CERT_OUTPUT_PATH}/client.key.pem -passin pass:${PASSWORD}

#为golang程序删除掉client.cer.pem不可用字段,用不到go程序的,可以把以下删除
\cp -r ${CERT_OUTPUT_PATH}/client.cer.pem ${CERT_OUTPUT_PATH}/client.cer.pem.backup
sed -i '/^Bag Attributes/,/Bag Attributes$/d ' ${CERT_OUTPUT_PATH}/client.cer.pem && sed -i '1i Bag Attributes' ${CERT_OUTPUT_PATH}/client.cer.pem

参数说明:
名称 说明
-alias 别名,可自定义,这里叫localhost
-keystore 指定密钥库的名称(就像数据库一样的证书库,可以有很多个证书,cacerts这个文件是jre自带的, 也可以使用其它文件名字,如果没有这个文件名字,它会创建这样一个)
-storepass 指定密钥库的密码
-keypass 指定别名条目的密码
-list 显示密钥库中的证书信息
-export 将别名指定的证书导出到文件
-file 参数指定导出到文件的文件名
-import 将已签名数字证书导入密钥库
-keypasswd 修改密钥库中指定条目口令
-dname 指定证书拥有者信息。
其中,CN=名字与姓氏/域名,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码
-keyalg 指定密钥的算法
-validity 指定创建的证书有效期多少天
-keysize 指定密钥长度

2,修改kafka配置

#ssl
ssl.keystore.location=/data/ezsonar/ssl/kafka-server.keystore.jks
ssl.keystore.password=1234567890
ssl.key.password=kafka1234567
ssl.truststore.location=/data/ezsonar/ssl/kafka-server.truststore.jks
ssl.truststore.password=1234567890
#重要
ssl.client.auth=none
security.inter.broker.protocol=SSL
#重要
ssl.endpoint.identification.algorithm=
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
node.id=1265979533
process.roles=broker,controller
controller.quorum.voters=1265979533@192.168.1.192:2181,471805583@192.168.1.192:2182
controller.listener.names=CONTROLLER
listeners=SSL://:9092,CONTROLLER://:2181
advertised.listeners=SSL://192.168.1.192:9092
listener.security.protocol.map=SSL:SSL,CONTROLLER:SSL
advertised.host.name=192.168.1.192
advertised.port=9092
log.dirs=/data/data/kafka_0
log.retention.hours=24
num.partitions=8
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
log.retention.bytes=1073741824
log.segment.bytes=262144000
log.cleanup.policy=delete
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=2147483647
socket.request.max.bytes=2147483647

集群也是类似的配置,只不过nodeID不一样而已。

说明:

  • listeners=PLAINTEXT://:9092,SSL://:9093   # 这里为Kafka broker配置了两个listeners,一个是明文传输;另一个使用SSL加密进行数据传输
  • advertised.listeners=PLAINTEXT://公网IP:9092,SSL://公网IP:9093  # 如果clients通过公网(或外网)去连接broker,那么advertiesd.listeners就必须配置成所在机器的公网IP
  • ssl.keystore.location=/var/private/ssl/server.keystore.jks # 提供SSL keystore的文件
  • ssl.keystore.password=1234567890 # 提供keystore密码
  • ssl.truststore.location=/var/private/ssl/server.keystore.jks # 提供SSL truststore的文件
  • ssl.truststore.password=1234567890 # 提供truststore密码
  • ssl.key.password=1234567890 # keystore中的私钥密码
  • ssl.client.auth=none # 设置clients不开启认证

3,客户端连接

3.1 创建kafka-ssl.properties文件,内容填写以下信息:

security.protocol=SSL
ssl.truststore.location=/data/ezsonar/ssl/kafka-server.truststore.jks
ssl.truststore.password=1234567890
#重要重要重要,否则连不上
ssl.endpoint.identification.algorithm=

3.2 生产和消费topic
bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config kafka-ssl.properties

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config kafka-ssl.properties

3.3 代码使用:
代码中添加的参数:

       final String keyStoreConfig = "/Users/xxx/Downloads/ssl/kafka-server.keystore.jks";

       final String trustStoreConfig ="/Users/xxx/Downloads/ssl/kafka-server.truststore.jks";

       kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // security.protocol
       kafkaProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreConfig); // ssl.keystore.location
       kafkaProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "1234567890"); // ssl.keystore.password
       kafkaProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "1234567890"); // ssl.key.password
       kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreConfig); // ssl.truststore.location
       kafkaProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "1234567890"); // ssl.truststore.password
       kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); // ssl.endpoint.identification.algorithm