0%

kafka-python 使用

一:kafka producer

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback

json_list = {"test": 1}
producer = KafkaProducer(
bootstrap_servers=['kafka.aws.bigdata.yidianshihui.com:9092'],
# key和value序列化
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode()
)

future = producer.send(
'custom_msg',
# key='test', # 同一个key值,会被送至同一个分区
value=json_list,
# value=json.dumps(json_list).encode("utf-8"),
partition=random.randint(0, 3) # 向多分区发送数据(注意:分区需存在,如果不存在需要通过标题四的方法去创建分区)
) # 向分区1发送消息

try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()

二:kafka consumer 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers=['kafka.aws.bigdata.yidianshihui.com:9092'],
# value_deserializer=msgpack.loads,
# key_deserializer=msgpack.loads
key_deserializer=lambda k: json.loads(k),
value_deserializer=lambda v: json.loads(v),
auto_offset_reset='latest',
group_id='test1-group'
)
# 订阅消费的主题的分区
consumer.assign([TopicPartition('test1', 0),TopicPartition('test1', 1),TopicPartition('test1', 2)])


while True:
time.sleep(1)
for message in consumer:
if message is not None:
print(message.offset, message.value)
print(msg)
value = msg.value
key = msg.key
print(type(value))
print(value)
print(type(key))
print(msg.partition)

参数详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。



client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’



group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None



auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。



enable_auto_commit (bool) – 如果为True,将自动定时提交消费者offset。默认为True。



auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。



value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value



subscribe(topics=(), pattern=None, listener=None)

订阅需要的主题

topics (list) – 需要订阅的主题列表

pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。


三:kafka 创建topic并创建分区 

1
2
3
4
5
6
7
from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions
c = KafkaAdminClient(bootstrap_servers="kafka.aws.bigdata.yidianshihui.com:9092")
# 创建topic同时创建分区
topic_list = []
topic_list.append(NewTopic(name="test1", num_partitions=2,replication_factor=1))
c.create_topics(new_topics=topic_list, validate_only=True)

四:kafka 已有的topic并创建分区
有时候,向指定分区发送数据时,会报如下错误,这个是因为topic里不存在此分区,需手动创建

1
2
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
AssertionError: Unrecognized partition

创建方法

1
2
3
4
5
6
from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions
c = KafkaAdminClient(bootstrap_servers="kafka.aws.bigdata.yidianshihui.com:9092")

# 在已有的topic中创建分区
new_partitions = NewPartitions(3)
c.create_partitions({"custom_msg": new_partitions})

参考文档:
https://zhuanlan.zhihu.com/p/103915834
https://www.cnblogs.com/xiaozengzeng/p/13621045.html
https://kafka-python.readthedocs.io/en/master/usage.html
Python生产者和消费者API使用
kafka 随笔

------------- 本文结束 感谢您的阅读-------------