kafka自学之路--zookeeper中存储结构_汤高的博客-CSDN博客


本站和网页 https://blog.csdn.net/tanggao1314/article/details/51943151 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

kafka自学之路--zookeeper中存储结构_汤高的博客-CSDN博客
kafka自学之路--zookeeper中存储结构
汤高
于 2016-07-18 16:33:06 发布
3307
收藏
分类专栏:
大数据与云计算
kafka
大数据生态系统技术
大数据与云计算
同时被 3 个专栏收录
50 篇文章
3 订阅
订阅专栏
kafka
5 篇文章
4 订阅
订阅专栏
大数据生态系统技术
60 篇文章
100 订阅
订阅专栏
1.topic注册信息
/brokers/topics/[topic] :
存储某个topic的partitions所有分配信息
Schema:
{     "version": "版本编号目前固定为数字1",     "partitions": {         "partitionId编号": [             同步副本组brokerId列表         ],         "partitionId编号": [             同步副本组brokerId列表         ],         .......     } }
Example:
{ "version": 1, "partitions": { "0": [1, 2], "1": [2, 1], "2": [1, 2], } }
说明:紫红色为patitions编号,蓝色为同步副本组brokerId列表
2.partition状态信息
/brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引号
/brokers/topics/[topic]/partitions/[partitionId]/state
Schema:
{ "controller_epoch": 表示kafka集群中的中央控制器选举次数, "leader": 表示该partition选举leader的brokerId, "version": 版本编号默认为1, "leader_epoch": 该partition leader选举次数, "isr": [同步副本组brokerId列表] }
Example:
{ "controller_epoch": 1, "leader": 2, "version": 1, "leader_epoch": 0, "isr": [2, 1] }
3. Broker注册信息 /brokers/ids/[0...N]                 
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)
Schema:
{ "jmx_port": jmx端口号, "timestamp": kafka broker初始启动时的时间戳, "host": 主机名或ip地址, "version": 版本编号默认为1, "port": kafka broker的服务端端口号,由server.properties中参数port确定 }
Example:
{ "jmx_port": 6061,
"timestamp":"1403061899859" "version": 1, "host": "192.168.1.148", "port": 9092 }
4. Controller epoch: 
/controller_epoch -> int (epoch)   
此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1; 
5. Controller注册信息:
/controller -> int (broker id of the controller)  存储center controller中央控制器所在kafka broker的信息
Schema:
"version": 版本编号默认为1, "brokerid": kafka集群中broker唯一编号, "timestamp": kafka broker中央控制器变更时的时间戳
Example:
{ "version": 1, "brokerid": 3, "timestamp": "1403061802981" }
Consumer and Consumer group概念: 
a.每个consumer客户端被创建时,会向zookeeper注册自己的信息;
b.此作用主要是为了"负载均衡".
c.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer。
d.Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer;
e.一个Consumer group的多个consumer的所有线程依次有序地消费一个topic的所有partitions,如果Consumer group中所有consumer总线程大于partitions数量,则会出现空闲情况;
举例说明:
kafka集群中创建一个topic为report-log   4 partitions 索引编号为0,1,2,3
假如有目前有三个消费者node:注意-->一个consumer中一个消费线程可以消费一个或多个partition.
如果每个consumer创建一个consumer thread线程,各个node消费情况如下,node1消费索引编号为0,1分区,node2费索引编号为2,node3费索引编号为3
如果每个consumer创建2个consumer thread线程,各个node消费情况如下(是从consumer node先后启动状态来确定的),node1消费索引编号为0,1分区;node2费索引编号为2,3;node3为空闲状态
总结

从以上可知,Consumer Group中各个consumer是根据先后启动的顺序有序消费一个topic的所有partitions的。
如果Consumer Group中所有consumer的总线程数大于partitions数量,则可能consumer thread或consumer会出现空闲状态。
Consumer均衡算法
当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力.
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3
4) 根据(consumer.id + '-'+ thread序号)排序: C0,C1
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
6. Consumer注册信息:
每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
/consumers/[groupId]/ids/[consumerIdString]
是一个临时的znode,此节点的值为请看consumerIdString产生规则,即表示此consumer目前所消费的topic + partitions列表.
consumerId产生规则:
   StringconsumerUuid = null;     if(config.consumerId!=null && config.consumerId)       consumerUuid = consumerId;     else {       String uuid = UUID.randomUUID()       consumerUuid = "%s-%d-%s".format(         InetAddress.getLocalHost.getHostName, System.currentTimeMillis,         uuid.getMostSignificantBits().toHexString.substring(0,8));
     }      String consumerIdString = config.groupId + "_" + consumerUuid;
Schema:
{ "version": 版本编号默认为1, "subscription": { //订阅topic列表 "topic名称": consumer中topic消费者线程数 }, "pattern": "static", "timestamp": "consumer启动时的时间戳" }
Example:
{ "version": 1, "subscription": { "open_platform_opt_push_plus1": 5 }, "pattern": "static", "timestamp": "1411294187842" }
7. Consumer owner:
/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号
当consumer启动时,所触发的操作:
a) 首先进行"Consumer Id注册";
b) 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
c) 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
8. Consumer offset:
/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用来跟踪每个consumer目前所消费的partition中最大的offset
此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,
重新触发balance,其他consumer可以继续消费.
9. Re-assign partitions
/admin/reassign_partitions
   
"fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition to be reassigned"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition to be reassigned"
                  },
                  {
                     "name":"replicas",
                     "type":"array",
                     "items":"int",
                     "doc":"a list of replica ids"
                  }
               ],
            }
            "doc":"an array of partitions to be reassigned to new replicas"
         }
      }
   ]
Example:
  "version": 1,
  "partitions":
     [
        {
            "topic": "Foo",
            "partition": 1,
            "replicas": [0, 1, 3]
        }
     ]            
10. Preferred replication election
/admin/preferred_replica_election
   "fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition for which preferred replica election should be triggered"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition for which preferred replica election should be triggered"
                  }
               ],
            }
            "doc":"an array of partitions for which preferred replica election should be triggered"
         }
      }
   ]
例子:
  "version": 1,
  "partitions":
     [
        {
            "topic": "Foo",
            "partition": 1         
        },
        {
            "topic": "Bar",
            "partition": 0         
        }
     ]            
11. 删除topics /admin/delete_topics
Schema:

"fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "topics",
       "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
      } ]
例子:
  "version": 1,
  "topics": ["foo", "bar"]
Topic配置
/config/topics/[topic_name]
例子
  "version": 1,
  "config": {
    "config.a": "x",
    "config.b": "y",
    ...
  
原文地址:http://blog.csdn.net/lizhitao/article/details/23744675
汤高
关注
关注
点赞
收藏
评论
kafka自学之路--zookeeper中存储结构
1.topic注册信息/brokers/topics/[topic] :存储某个topic的partitions所有分配信息Schema:{    "version": "版本编号目前固定为数字1",    "partitions": {        "partitionId编号": [            同步副本组brok
复制链接
扫一扫
专栏目录
Zookeeper和Kafka的关系,为啥Kafka依赖Zookeeper
u011311291的博客
12-26
5万+
zookeeper和Kafka的关系
1.在Kafka的设计中,选择了使用Zookeeper来进行所有Broker的管理,体现在zookeeper上会有一个专门用来进行Broker服务器列表记录的点,节点路径为/brokers/ids
每个Broker服务器在启动时,都会到Zookeeper上进行注册,即创建/brokers/ids/[0-N]的节点,然后写入IP,端口等信息,Broker创建的...
参与评论
您还未登录,请先
登录
后发表或查看评论
ZooKeeper与Kafka介绍
变成习惯
06-01
2万+
ZooKeeper介绍
官网:http://zookeeper.apache.org/
ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、命名服务、分布式同步、组服务等。Kafka的运行依赖ZooKeeper。
ZooKeeper最早起源于雅虎研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型系统基本都需要依赖一个类似的系统来...
Kafka机制分析
weixin_38380811的博客
10-31
254
Kafka机制分析
Zookeeper在kafka中的应用
Saltwater_leo的专栏
04-14
944
版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]
Zookeeper在kafka中的应用
@20150606
简介
Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者
apache kafka系列之在zookeeper中存储结构
李志涛的专栏
04-15
3万+
1.topic注册信息
/brokers/topics/[topic] :
Schema:
{ "fields" :
[ {"name": "version", "type": "int", "doc": "version
id"},
{"name": "partitions",
"type": {"type": "ma
kafka在zookeeper上的节点信息和查看方式
lkforce
09-06
4万+
kafka在Zookeeper上的节点如下图:
该图片盗自大牛的博客http://blog.csdn.net/lizhitao/article/details/23744675
服务端开启的情况下,进入客户端的命令:{zookeeper目录}/bin/zkCli.sh
以下是几个zookeeper客户端用的命令,不只kafka,其他任何注册到zookeeper的服务都可以使用
kafka-节点说明
猫咪的一生,
12-06
916
kafka单台单节点说明:
kafka单台多节点说明:(伪集群)
kafka多台多节点说明:
注意:
kafka1.2.2版本以后启动消费者的命令变为:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
节点参考:https://blog.csdn.net/hg_harve...
大白话带你认识Kafka
weixin_43167418的博客
12-05
109
来自:掘金(作者:说出你的愿望吧丷)原文链接:https://juejin.im/post/5dcf6b6e51882510a23314f3一、Kafka基础消息系统的作用应该大部分小伙...
查看kafka的主节点
xiaohuangren_123的博客
11-04
511
kafka
kafka环境搭建演示
qq_41449717的博客
08-20
117
kafka环境搭建演示
简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来
查看kafka的主从状态_Kafka 集群部署
weixin_30272849的博客
02-01
3271
ip地址 主机名 安装软件192.168.20.40 k8s-master zookeeper、kafka192.168.20.43 k8s-node2 zookeeper、kafka192.168.20.39 k8s-node3 zookeeper、kafka三个节点安装zoo...
kafka zk节点
u012693823的博客
01-19
923
前言、Zookeeper 在 Kafka 中的作用
1、Broker注册
2、Topic注册
3、生产者负载均衡
4、消费者负载均衡
5、分区 与 消费者 的关系
6、消息 消费进度Offset 记录
7、消费者注册
一、zk节点结构
二、admin
2.1preferred_replica_election
2.1.1 结构
2.1.3 描述
2.2reassign_partitions
2.2.1结构
2.2.2 描述
...
Kafka查询topic以及消息内容
热门推荐
MieMieFly的专栏
08-06
8万+
基于kafka_2.11-0.10.0.1版本:
查看kafka的安装目录
find / -name kafka* -type d
查看redis安装目录:
ps -ef|grep redis
查询所有group信息:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
创建topic:paymentmgt_download_asyn_deal
./bin/kafka-topics.sh --create --zookeep
kafka查看topic和消息内容命令
最新发布
Rocky
12-20
62
【代码】kafka查看topic和消息内容命令。
KafKa在ZooKeeper上的存储结构
夏天小厨的博客
10-25
1499
KafKa在ZooKeeper上的存储结构
首先 我们先了解一下kafka在zookeeper上的文件结构,进入一台zookeeper主机,输入命令 bin/zkCli.sh 进入交互模式,ls出zk的根目录,如下图
KafKa在ZooKeeper上的文件结构
这篇文章我们分析一下上图中除了zookeeper的其他文件夹对于kafka来说都是什么含义。
zk-root根目录包含6个k...
Kafka入门第四课:Kafka节点数、分区数、分区副本数设置及Kafka压力测试
曹利荣的博客
11-04
3848
一、分区副本数设置
由于分区副本仅提供数据冗余的功能,且分区副本数量与集群吐吞量负相关,故冗余度在满足安全要求基础上设置为最小即可。
故我们不妨将分区副本数设置为2.
二、kafka分区数设置
通过对单个分区的topic进行消费者和生产者的压力测试,得出单个分区所能提供的消费和生产的最大峰值吐吞量。
1、创建只有一个分区的topic。
kafka-topics.sh --create \
--zookeeper Linux001:2181 \
--partitions 1 \
--repl.
【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)
chinaherolts2008的博客
07-13
724
这篇文章源自于,一位群友的问题,然后sql教程就写下了这篇文章
进群加V :jjdlmn_
Kafka专栏整理地址 请戳这里0.0
先定义一下名词: 迁移前的java基础教程Broker:OriginBroker、 迁移后的副本TargetBroker
前提
在这之前如果你比较python基础教程了解分区重分配的原理的话,下面的可能更好理解;
推荐你阅读一下下面几篇c#教程文章(如果你点不进去说明我还没有发布)
如果你不想费那个精力,那直接vb.net教程看下面我画的这张图,你自己...
kafka常见异常问题总结:KeepErrorCode = NoNode for...
神芷迦蓝寺
04-01
4100
主要异常
kafka是我们常见的数据订阅中间件系统,但清除累计的日志文件也是件技术活
笔者在清除日志的时候,不小心把zookeeper log 目录下version-2里的文件删除了,就引发了一系列惨案:
kafka数据报错,无法再产生数据,报错详情:
kafka报错:
Error Path:/admin Error:KeeperErrorCode = NoNode for /brokers
Error Path:/admin Error:KeeperErrorCode = NoNode for
有关kafka主节点无法启动
qq_50859962的博客
11-07
350
kafka无法正常启动
最近新配置是kafka,在第一次启动时,第一台机器报错 ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.multi(Ljava/lang/Iterable;Lorg/apache/zookee
storm集成kafka报错【KeeperErrorCode = NoNode for /kafka/brokers/topics/test/partitions】
NeverGiveup54的专栏
01-22
1万+
问题重现:
前置条件:使用的是CDH的KAFKA-0.8.2.0-1.kafka1.4.0.p0.56版本
最近在集成kafka-storm demo的时候,zookeeper总是报错。如下:
7382 [Thread-17-word-reader] INFO b.s.d.executor - Opening spout word-reader:(6)
7382 [Thread
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:编程工作室
设计师:CSDN官方博客
返回首页
汤高
CSDN认证博客专家
CSDN认证企业博客
码龄8年
暂无认证
159
原创
5万+
周排名
160万+
总排名
122万+
访问
等级
9433
积分
955
粉丝
923
获赞
411
评论
2241
收藏
私信
关注
热门文章
hash算法原理详解
234235
Java面试笔试题大汇总(最全+详细答案)
90564
Java接入Spark之创建RDD的两种方式和操作RDD
45372
Hadoop2.6(新版本)----MapReduce工作原理
34395
Python快速学习第二天
28176
分类专栏
大数据生态系统技术
60篇
数据结构与算法
5篇
Java网络编程
11篇
23天征服--23种设计模式
22篇
Spark
6篇
Web Service
5篇
Java技术
8篇
JavaScript
26篇
数据库
4篇
Java EE
7篇
Java线程
7篇
网络编程
3篇
Struts2
4篇
Java设计模式
23篇
软件环境搭建
2篇
Java面试题
2篇
数据结构与算法
5篇
大型数据库技术
Mybatis
2篇
JDK源码分析
中间件
1篇
Redis
3篇
hbase集群安装
4篇
大数据与云计算
50篇
Java疑难杂症
6篇
kafka
5篇
scala
storm
8篇
spark
6篇
linux学习
13篇
工作总结
14篇
Python学习
12篇
quartz
2篇
算法大杂烩
3篇
算法面试题
最新评论
数据挖掘算法之贝叶斯网络
Gao_Yaya:
使用假设:在c已知的情况下,ab独立。然后使用了这个假设证明了,在c条件下,ab独立。这样可以吗?
hash算法原理详解
小白pk菜鸡:
1位、2位、……、6位、7位、8位是指千万位、百万位、……百位、十位、个位。
JDK动态代理的底层实现原理
z563394688:
设置jvm参数让生成的class文件不消失
hash算法原理详解
carth.r:
博主你好,原文中的” 2) 由于哈希函数是一个压缩映象,因此,在一般情况下,很容易产生“冲突”现象,即: key1!=key2,而 f (key1) = f(key2)。“中的” f (key1) = f(key2)“是否应该改成” f (key1) == f(key2)”更好
JDK动态代理的底层实现原理
帅气呢杰哥:
请问楼主,我跑了下代码,看不到有代理对象生成,也就看不了.class文件,这个怎么破?
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
Spring 配置数据库用户名密码加密
Google 面试题分析 | 字典里面的最长单词
Trie树分析
2018年3篇
2017年4篇
2016年137篇
2015年51篇
目录
目录
分类专栏
大数据生态系统技术
60篇
数据结构与算法
5篇
Java网络编程
11篇
23天征服--23种设计模式
22篇
Spark
6篇
Web Service
5篇
Java技术
8篇
JavaScript
26篇
数据库
4篇
Java EE
7篇
Java线程
7篇
网络编程
3篇
Struts2
4篇
Java设计模式
23篇
软件环境搭建
2篇
Java面试题
2篇
数据结构与算法
5篇
大型数据库技术
Mybatis
2篇
JDK源码分析
中间件
1篇
Redis
3篇
hbase集群安装
4篇
大数据与云计算
50篇
Java疑难杂症
6篇
kafka
5篇
scala
storm
8篇
spark
6篇
linux学习
13篇
工作总结
14篇
Python学习
12篇
quartz
2篇
算法大杂烩
3篇
算法面试题
目录
评论
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值