Kafka提交offset机制_小白是我吖的博客-CSDN博客


本站和网页 https://blog.csdn.net/admin294/article/details/96429965 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Kafka提交offset机制_小白是我吖的博客-CSDN博客
Kafka提交offset机制
小白是我吖
于 2019-07-18 14:22:14 发布
3214
收藏
分类专栏:
Kafka
Kafka
专栏收录该内容
7 篇文章
0 订阅
订阅专栏
转载出处:https://www.cnblogs.com/FG123/p/10091599.html ————————————————————————————————————————————
在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始看(消费)。
Kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
(1) 自动提交偏移量:
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。
如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28
自动提交偏移量示例:
import pickle
import uuid
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
max_poll_records=500,
enable_auto_commit=True, # 默认为True 表示自动提交偏移量
auto_commit_interval_ms=100, # 控制自动提交偏移量的频率 单位ms 默认是5000ms
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
try:
while True:
consumer_records_dict = consumer.poll(timeout_ms=1000)
# consumer.assignment()可以获取每个分区的offset
for partition in consumer.assignment():
print('主题:{} 分区:{},需要从下面的offset开始消费:{}'.format(
str(partition.topic),
str(partition.partition),
consumer.position(partition)
))
# 处理逻辑.
for k, record_list in consumer_records_dict.items():
print(k)
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
record.topic, record.partition, record.offset, record.key, record.value)
finally:
# 调用close方法的时候会触发偏移量的自动提交 close默认autocommit=True
consumer.close()
返回结果: 在上述代码中,最后调用consumer.close()时候也会触发自动提交,因为它默认autocommit=True,源码如下:
def close(self, autocommit=True):
"""Close the consumer, waiting indefinitely for any needed cleanup.
Keyword Arguments:
autocommit (bool): If auto-commit is configured for this consumer,
this optional flag causes the consumer to attempt to commit any
pending consumed offsets prior to close. Default: True
"""
if self._closed:
return
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close(autocommit=autocommit)
self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()
except AttributeError:
pass
try:
self.config['value_deserializer'].close()
except AttributeError:
pass
log.debug("The KafkaConsumer has closed.")
对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好。
(2) 手动提交偏移量:
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交
1.同步手动提交偏移量
同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
"""
同步的方式10W条消息 4.58s
"""
import pickle
import uuid
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
enable_auto_commit=False, # 设置为手动提交偏移量.
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
try:
start_time = time.time()
while True:
consumer_records_dict = consumer.poll(timeout_ms=100) # 在轮询中等待的毫秒数
print("获取下一轮")
record_num = 0
for key, record_list in consumer_records_dict.items():
for record in record_list:
record_num += 1
print("---->当前批次获取到的消息个数是:{}<----".format(record_num))
record_num = 0
for k, record_list in consumer_records_dict.items():
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
record.topic, record.partition, record.offset, record.key, record.value)
try:
# 轮询一个batch 手动提交一次
consumer.commit() # 提交当前批次最新的偏移量. 会阻塞 执行完后才会下一轮poll
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)
except Exception as e:
print('commit failed', str(e))
finally:
consumer.close() # 手动提交中close对偏移量提交没有影响
从上述可以看出,每轮循一个批次,手动提交一次,只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息,经测试10W条消息耗时4.58s
2.异步手动提交偏移量+回调函数
异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。
"""
异步的方式手动提交偏移量(异步+回调函数的模式) 10W条消息 3.09s
"""
import pickle
import uuid
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
enable_auto_commit=False, # 设置为手动提交偏移量.
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
def _on_send_response(*args, **kwargs):
"""
提交偏移量涉及回调函数
:param args: args[0] --> {TopicPartition:OffsetAndMetadata} args[1] --> Exception
:param kwargs:
:return:
"""
if isinstance(args[1], Exception):
print('偏移量提交异常. {}'.format(args[1]))
else:
print('偏移量提交成功')
try:
start_time = time.time()
while True:
consumer_records_dict = consumer.poll(timeout_ms=10)
record_num = 0
for key, record_list in consumer_records_dict.items():
for record in record_list:
record_num += 1
print("当前批次获取到的消息个数是:{}".format(record_num))
for record_list in consumer_records_dict.values():
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
record.topic, record.partition, record.offset, record.key, record.value))
# 避免频繁提交
if record_num != 0:
try:
consumer.commit_async(callback=_on_send_response)
except Exception as e:
print('commit failed', str(e))
record_num = 0
finally:
consumer.close()
对于args参数:args[0]是一个dict,key是TopicPartition,value是OffsetAndMetadata,表示该主题下的partition对应的offset;args[1]在提交成功是True,提交失败时是一个Exception类。
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
3.异步+同步 组合的方式提交偏移量
针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。
"""
同步和异步组合的方式提交偏移量
"""
import pickle
import uuid
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
enable_auto_commit=False, # 设置为手动提交偏移量.
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
def _on_send_response(*args, **kwargs):
"""
提交偏移量涉及的回调函数
:param args:
:param kwargs:
:return:
"""
if isinstance(args[1], Exception):
print('偏移量提交异常. {}'.format(args[1]))
else:
print('偏移量提交成功')
try:
start_time = time.time()
while True:
consumer_records_dict = consumer.poll(timeout_ms=100)
record_num = 0
for key, record_list in consumer_records_dict.items():
for record in record_list:
record_num += 1
print("---->当前批次获取到的消息个数是:<----".format(record_num))
record_num = 0
for k, record_list in consumer_records_dict.items():
print(k)
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
record.topic, record.partition, record.offset, record.key, record.value)
try:
# 轮询一个batch 手动提交一次
consumer.commit_async(callback=_on_send_response)
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)
except Exception as e:
print('commit failed', str(e))
except Exception as e:
print(str(e))
finally:
try:
# 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
consumer.commit()
print("同步补救提交成功")
except Exception as e:
consumer.close()
通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失
小白是我吖
关注
关注
点赞
收藏
评论
Kafka提交offset机制
转载出处:https://www.cnblogs.com/FG123/p/10091599.html————————————————————————————————————————————在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比...
复制链接
扫一扫
专栏目录
kafka 自动与手动管理offset
congge_study的博客
03-13
9162
kafka 自动与手动管理offset
Kafka到底有几个Offset?——Kafka核心之偏移量机制
weixin_30416497的博客
08-27
290
​ Kafka是由LinkIn开源的实时数据处理框架,目前已经更新到2.3版本。不同于一般的消息中间件,Kafka通过数据持久化和磁盘读写获得了极高的吞吐量,并可以不依赖Storm,SparkStreaming的流处理平台,自己进行实时的流处理。
​ Kakfa的Offset机制是其最核心机制之一,由于API对于部分功能的实现,我们有时并没有手动去设置Offset,那么Kafka到底有...
参与评论
您还未登录,请先
登录
后发表或查看评论
kafka-offset手动提交和自动提交
最新发布
weixin_57128596的博客
10-18
402
消费者会根据设置的消费时间来决定消费多少消息。
kafka存储机制以及offset
weixin_34101784的博客
01-11
92
1.前言
一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
2.Kafka文件存储机制
Kafka部分名词解释如下:
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
...
Kafka Offset Storage
weixin_33827965的博客
01-09
94
1.概述
  目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的T...
深入解析Kafka的offset管理
zjjcchina的博客
01-11
2977
Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。
Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种:Current Offset 和 Committed Offset。
Current Offset
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一
kafka中offset使用原理
congge
05-24
1万+
前言
在使用kafka时,从消费端来说,基本上大家在使用的时候,一般是通过一个消息监听器监听具体的topic以及对应的partition,接收消息即可,但有必要深入了解一下关于kafka的offset原理
kafka在设计上和其他的消息中间其中有一个不同点是,kafka中存在一个offset的概念,即偏移量,而这个偏移量是需要消费端进行记录的,即producer将消息发到broker上之后,当某个消费者订阅了这个topic之后,consumer需要自己记录每次的消费位置,以便下次知道从哪个位置开始消费消息,
kafka offset 机制
stuliper的专栏
08-30
4057
kafka消息处理类:MessageAndOffset
case class MessageAndOffset(message: Message, offset: Long) {
/**
* Compute the offset of the next message in the log
*/
def nextOffset: Long = offset + 1
...
kafka的log存储解析——topic的分区partition分段segment以及索引等
jushisi的博客
05-11
227
引言
Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。
partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配
消息队列-kafka提交offset问题
weixin_38312719的博客
05-18
2060
说明:转载本人掘金文章
概述
offset是相对Consumer来说的,offset是用来帮助记录某个主题某个分区的消费情况的。当你每提交一次offset,意味着向kafka汇报一次消费进度,对于提交offset所以又分为同步和异步提交
同步和异步提交offset对比
同步提交:
1.每次处理完一条消息,然后调用consumer.commitSync()提交offset
2.在调用consumer.commitSync()方法时候会堵塞住,严重影响消费者性能
3.每次提交都会向__consumer_offs
Kafka offset维护机制
Easy_Wly
06-28
599
Kafka offset的维护
上一篇介绍过Kafka的生产者相关的机制,这一篇来介绍一下kafka消息的offset。
由于 Consumer 在消费过程中可能会出现断电宕机等故障,Consumer 恢复后,需要从故障前的位置继续消费。所以 Consumer 需要实时记录自己消费到了哪个 Offset,以便故障恢复后继续消费。
Kafka 0.9 版本之前,Consumer 默认将 Offset 保存在 Zookeeper 中,从 0.9 版本开始,Consumer 默认将 Offset 保存在 Kafk
kafka原理详解之各种offset和checkpoint
define_us的专栏
06-01
7154
每一个分区都是一个顺序的、不可变的消息队列,并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
一个分区在文件系统里存储为一个文件夹。文件夹里包含日志文件和索引文件。其文件名是其包含的offset的最小的条目的offset。
每个文件是一个segment。
在broker的log存储文件下,除了存储这各个topic的文...
Kafka connect的offset commit机制
热门推荐
石权的博客
07-13
1万+
“offset.storage.topioc”的含义。
1 . 主要是给SourceTask来记录Kafka connector对源数据的消费状态;2 . SinkTask用不到;
1. 调用task的flush机制记录offset;2. 异步向kafka Server commit offset
一文读懂 kafka 的事务机制
明哥的IT随笔
09-16
862
1 前言
大家好,我是明哥!
KAFKA 作为开源分布式事件流平台,在大数据和微服务领域都有着广泛的应用场景,是实时流处理场景下消息队列事实上的标准。用一句话概括,KAFKA 是实时数仓的基石,是事件驱动架构的灵魂。
但是一些技术小伙伴,尤其是一些很早就开始使用 KAFKA 的技术小伙伴们,对 KAFKA 的发展趋势和一些新特性,并不太熟悉,在使用过程中也踩了不少坑。
有鉴于此,我们会通过一系列 KAFKA 相关博文,专门讲述 KAFKA 的这些新特性。
本文是该系列文章之一,讲述 KAFAK 的事务机制。
kafka原理系列之(一)消息存储和offset提交机制
sheep8521的博客
04-24
4459
kafka之消息存储和offset提交机制
Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说kafka broker上的数据超7天或者1G,
就会被清理掉。这些数据存放在broker服务器上,以log文件的形式存在。
kafka的安装目录下面的/conf/server.propertites文件中中设置:
### 日志保存时间 (hours|minutes),默认为7天(168...
kafka手动提交offset以及offset回滚
码灵的博客
01-10
3381
1 手动提交
当 auto.commit.enable 设置为false时,表示kafak的offset由消费者手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
ackMode有以下7种值:
public enum AckMode
// 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
RECORD,
// 当每一批poll()的数据被消费者监听器(ListenerConsumer...
Kafka自动提交offset设置
鸭梨的博客
02-04
5601
auto.commit.interval.ms
kafka自动提交offset的频率,默认是5000ms,就是5s
如果将enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。
The frequency in milliseconds that the consumer offsets are
auto-committed to Kafka if enable.auto.commit is set to true.
自动提交是调用poll方法的时
kafka —— offset篇
idream
03-08
938
KAFKA
offset提交方式
三种
1、自动提交方式
“enable.auto.commit”, “true”
是否自动提交
“auto.commit.interval.ms”, “5000”
间隔多久ms提交
2、手动提交 —— 同步
consumer.commitSync();
3、手动提交 —— 异步
consumer.commitAsync();
3种提交方式优缺点
1、自动提交
优点:不用自己管理offset
缺点:可能会出现数据重复
原因:
调用poll()方法时将offset提交,所以如果
【Kafka】offset策略 客户端原理
u010900754的专栏
07-14
1037
1.消费者本地offset
Kafka consumer会在本地维护每一个分区的消费offset,然后再发送拉取请求的时候,会把分区对应的本地offset发送给broker,broker按照请求里的offset来返回消息,broker不会维护每一个客户端的消费进度。但是broker会维护partition级别的offset,这样新的consumer第一次消费或者再均衡消费时,就可以根据这个off...
Kafka的offset提交管理
m0_57126456的博客
10-26
826
一:自动提交
//开启offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
缺点:先提交offset后消费,提交完offset后没有消费就挂机后,可能造成丢失数据
二:手动提交
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步
提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,
c...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:技术黑板
设计师:CSDN官方博客
返回首页
小白是我吖
CSDN认证博客专家
CSDN认证企业博客
码龄4年
暂无认证
原创
53万+
周排名
71万+
总排名
2万+
访问
等级
367
积分
粉丝
获赞
评论
30
收藏
私信
关注
热门文章
kafka常见问题如果想消费已经被消费过的数据
8495
Kafka提交offset机制
3214
Logstash读取指定日志并在屏幕输出
3061
Java中CharSequence和String
2627
MySQL8.0的安装、配置、启动服务和登录及配置环境变量
2463
分类专栏
Logstash
1篇
Docker
BurpSuite
Charles
1篇
Tomcat
2篇
Kafka
7篇
Java
6篇
zookeeper
2篇
Mysql
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
Logstash读取指定日志并在屏幕输出
软件测试之抓包工具Charles
tomcat启动错误:Error running tomcat: Address localhost:1099 is already in use
2020年4篇
2019年13篇
目录
目录
分类专栏
Logstash
1篇
Docker
BurpSuite
Charles
1篇
Tomcat
2篇
Kafka
7篇
Java
6篇
zookeeper
2篇
Mysql
目录
评论
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值