python如何实现对kafka的基本操作
admin
2023-07-23 07:42:17
0

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生产者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一个消费者消费一个topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,从第1个偏移量消费
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一个消费者订阅多个topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消费者(手动拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #从kafka获取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

相关内容

热门资讯

沈伯洋称“台北要向台南学”,蓝... 海峡导报综合报道 民进党台北市长参选人沈伯洋23日参加台北市大台南同乡会核心干部座谈会,提出台北要向...
云南夫妻吃菌中毒擅自离开医院后... 近日,昆明市公安局特巡警支队四大队接到一位市民报警称,自己妹妹、妹夫吃菌中毒行为异常,怕120急救人...
梅洛尼与特朗普突然“翻脸”后,... 【环球网报道】美国总统特朗普日前在接受采访时称意大利总理梅洛尼在七国集团(G7)峰会期间“央求合影”...
上财校长寄语毕业生:避免活成一... 澎湃新闻资深记者 邹桥“AI时代最危险的,不是机器越来越聪明,而是人越来越习惯于把自己的判断拱手相让...
欧洲遭遇“更早、更久、更难解”... 澎湃新闻首席记者 刘栋当地时间2026年6月23日,英国伦敦,一名民众坐在公园的阴凉处。视觉中国 图...
江苏省最新平均工资出炉 【大河财立方消息】6月24日,江苏省统计局发布2025年全省城镇单位就业人员年平均工资数据。统计显示...
活力中国调研行|雄安“村”里故... 【大河财立方 记者 陈诗昂 文 王子阳 摄影 雄安新区报道】 到8月底,雄安新区中关村科技园开园就将...
涉及零食、饮料等 37批次食品... 【大河财立方消息】6月23日,市场监管总局办公厅通报37批次食品抽检不合格情况。近期,市场监管总局组...
古巴外长痛批鲁比奥虚伪狡诈 据凤凰卫视报道,6月23日,美国国务院官网发布消息称,鲁比奥宣布将5个古巴实体以及古巴革命领袖劳尔·...
水压低了热水器风压故障 当水压低了热水器风压故障是一种常见问题,这通常是因为在热水器正常运行过程中产生的压力(风压)与水压不...