Iterator与LIstIterator接口在java中的区别有哪些
318
2022-08-29
python之Kafka(pYthon)
1.安装kafka环境
# 看这个地址# 你还需要装Java环境# 测试启动如果启动成功,那么证明kafka启动成功.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties#启动kafka.\bin\windows\kafka-server-start.bat .\config\server.properties# 创建top.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 生产者.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test# 消费者.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
2.pykafka
2,1安装
pip install pykafka
2.2生产者
from pykafka import KafkaClientfrom conf import KAFKA_HOSTS_LOCALHOST#连接kafka客户端kafka_client = KafkaClient(hosts=KAFKA_HOSTS_LOCALHOST)#获取topictopic = kafka_client.topics["test"]#获取生产者对象produce = topic.get_producer()#传数据必须是字节produce.produce("now_time_bytes".encode("utf8"))#手动关闭该生产者produce.stop()
2.3消费者
# 导入安装包from pykafka import KafkaClient# 设置客户端的连接信息client = KafkaClient(hosts="127.0.0.1:9092")topic = client.topics['test']# print(client.topics)# print(topic.latest_available_offsets())#consumer_group 与consumer_id值不能一样,不同group相互独立consumer = topic.get_simple_consumer( consumer_group='18', auto_commit_enable=True, auto_commit_interval_ms=1, reset_offset_on_start=True # consumer_id =1,)for message in consumer: if message is not None: print(message.offset, message.value)
-----------------------------------------------------------------------------------------------------------------------------------------
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~