一:kafka producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback json_list = {"test": 1} producer = KafkaProducer( bootstrap_servers=['kafka.aws.bigdata.yidianshihui.com:9092'], # key和value序列化 key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode() ) future = producer.send( 'custom_msg', # key='test', # 同一个key值,会被送至同一个分区 value=json_list, # value=json.dumps(json_list).encode("utf-8"), partition=random.randint(0, 3) # 向多分区发送数据(注意:分区需存在,如果不存在需要通过标题四的方法去创建分区) ) # 向分区1发送消息 try: future.get(timeout=10) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc()
二:kafka consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from kafka import KafkaConsumer consumer = KafkaConsumer(bootstrap_servers=['kafka.aws.bigdata.yidianshihui.com:9092'], # value_deserializer=msgpack.loads, # key_deserializer=msgpack.loads key_deserializer=lambda k: json.loads(k), value_deserializer=lambda v: json.loads(v), auto_offset_reset='latest', group_id='test1-group' ) # 订阅消费的主题的分区 consumer.assign([TopicPartition('test1', 0),TopicPartition('test1', 1),TopicPartition('test1', 2)]) while True: time.sleep(1) for message in consumer: if message is not None: print(message.offset, message.value) print(msg) value = msg.value key = msg.key print(type(value)) print(value) print(type(key)) print(msg.partition)
参数详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class kafka.KafkaConsumer(*topics, **configs) *topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。 client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’ group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。 enable_auto_commit (bool) – 如果为True,将自动定时提交消费者offset。默认为True。 auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。 value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value subscribe(topics=(), pattern=None, listener=None) 订阅需要的主题 topics (list) – 需要订阅的主题列表 pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。
三:kafka 创建topic并创建分区
1 2 3 4 5 6 7 from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions c = KafkaAdminClient(bootstrap_servers="kafka.aws.bigdata.yidianshihui.com:9092") # 创建topic同时创建分区 topic_list = [] topic_list.append(NewTopic(name="test1", num_partitions=2,replication_factor=1)) c.create_topics(new_topics=topic_list, validate_only=True)
四:kafka 已有的topic并创建分区 有时候,向指定分区发送数据时,会报如下错误,这个是因为topic里不存在此分区,需手动创建
1 2 assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition' AssertionError: Unrecognized partition
创建方法
1 2 3 4 5 6 from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions c = KafkaAdminClient(bootstrap_servers="kafka.aws.bigdata.yidianshihui.com:9092") # 在已有的topic中创建分区 new_partitions = NewPartitions(3) c.create_partitions({"custom_msg": new_partitions})
参考文档:https://zhuanlan.zhihu.com/p/103915834 https://www.cnblogs.com/xiaozengzeng/p/13621045.html https://kafka-python.readthedocs.io/en/master/usage.html Python生产者和消费者API使用 kafka 随笔