python推送数据到kafka的kerberos服务验证(ubuntu)
1. 安装必要的依赖包
apt-get install krb5-kdc libkrb5-dev python3-six -y --fix-missing pip3 install gssapi==1.6.6 kafka-python==2.0.1 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
2. 获取必要的配置文件krb5.conf、kafka.keytab、jaas.conf放到/etc目录下
3. 配置/etc/hosts地址映射到kafka数据节点
echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts
4. 生成kerberos认证票据
kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH
5. 初始化kerberos环境变量
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"
6. 代码示例
#!/usr/bin/env python # -*- coding:utf-8 -*- import os import time import json from kafka import KafkaProducer def get_producer(): # 配置地址映射,127.0.0.1为示例 with open('/etc/hosts', 'r') as hosts: if 'bigdata' not in hosts.read(): os.system('echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts') # kafka鉴权文件软链接到/etc目录 cur_dir = os.path.dirname(os.path.abspath(__file__)) # 自定义该目录 os.system('ln -fs %s/krb5.conf /etc/krb5.conf' % cur_dir) os.system('ln -fs %s/kafka.keytab /etc/kafka.keytab' % cur_dir) os.system('ln -fs %s/jaas.conf /etc/jaas.conf' % cur_dir) # 生成kafka认证密钥,配置系统环境变量 os.system('kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH') os.environ['KAFKA_OPTS'] = '-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf' producer = KafkaProducer(**{ 'bootstrap_servers': ['127.0.0.1:9092', '127.0.0.1:9092', '127.0.0.1:9092'], 'security_protocol': 'SASL_PLAINTEXT', # 安全协议 'sasl_mechanism': 'GSSAPI', # SASL机制 'compression_type': 'gzip', # 压缩方式,可选配置 'api_version': (0, 10, 2), # API版本 'max_block_ms': 3000, # 发送请求最大阻塞时间 'value_serializer': lambda value: json.dumps(value).encode(), # 数据序列化方法 'sasl_kerberos_service_name': 'kafka' # kerberos服务名称 }) # 等待0.5秒后检测是否连接成功了 time.sleep(0.5) if not producer.bootstrap_connected(): raise Exception('Connect kafka failed') return producer # 因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,需要通过消息队列做生产者消费者模型 producer = get_producer() print(producer.send('kafka_topic', {'test': 'test_data'}, partition=0).get(timeout=1))
7. 注意的点
pip安装的依赖包是kafka-python,不是kafka,这两个都有,不要混淆了。
因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,多线程多进程场景下可以使用redis作为消息中间件实现生产者消费者模型。
使用get_producer获取一次认证后的连接就可以长期使用,票据过期不会导致后续的推送失败。
热门文章
- 「2月10日」最高速度18.9M/S,2025年Hiddify Next每天更新免费节点订阅链接
- 「2月9日」最高速度22.5M/S,2025年Hiddify Next每天更新免费节点订阅链接
- 南京农大动物医院院长(南京农大动科院院长)
- JavaCV的摄像头实战之七:推流(带声音)
- 闲鱼无货源怎么弄(闲鱼无货源怎么卖货)
- 兽医站和宠物医院哪个好(狗看病去兽医站还是宠物医院)
- 「2月7日」最高速度22.3M/S,2025年Hiddify Next每天更新免费节点订阅链接
- 「1月6日」最高速度18.6M/S,2025年Hiddify Next每天更新免费节点订阅链接
- 「1月27日」最高速度23M/S,2025年Hiddify Next每天更新免费节点订阅链接
- 宠物领养网官网app(宠物领养平台app)