消费者Rebalance机制

优质博文:IT-BLOG-CN

一、消费者Rebalance机制

Apache Kafka中,消费者组
Consumer Group会在以下几种情况下发生重新平衡Rebalance
【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费者组时,Kafka会触发重新平衡,以重新分配分区给消费者。
【2】消费者崩溃或失去连接: 如果Kafka检测到某个消费者崩溃或失去连接(例如,由于网络问题或消费者进程被终止),它会触发重新平衡。
【3】主题的分区数量发生变化: 如果一个主题的分区数量增加或减少,Kafka会触发重新平衡,以确保新的分区被分配给消费者组中的消费者。
【4】消费者组协调器变更: 消费者组协调器是负责管理消费者组的一个Kafka Broker。如果消费者组协调器发生变更(例如,协调器所在的Broker崩溃),也会触发重新平衡。
【5】消费者组成员发送心跳失败: 消费者需要定期向消费者组协调器发送心跳heartbeat以表明它们仍然活跃。如果心跳失败,协调器会认为该消费者已经失去连接,从而触发重新平衡。

rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance

Kafka在高峰期重平衡rebalancing会导致消费者组的停顿,影响系统的性能和稳定性。为了避免在高峰期发生重平衡,可以采取以下几种策略:
【1】优化分区分配策略: 使用RangeAssignorStickyAssignor等分区分配策略来减少重平衡的频率和影响。

RangeAssignorKafka默认的分区分配策略之一,它将分区按范围分配给消费者。

我们通过一个具体的例子来说明RangeAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么RangeAssignor会重新分配分区,以确保分区尽量按顺序和均匀地分配给剩余的消费者。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,RangeAssignor将分区按顺序重新分配给剩余的消费者,确保每个消费者分配到的分区尽量连续。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,RangeAssignor会再次按顺序和均匀地分配分区。新的分配可能如下:

C1: P0, P1
C3: P2, P3
C4: P4, P5

在这个过程中,RangeAssignor将分区重新分配,以确保每个消费者分配到的分区尽量连续和均匀。

通过这个例子,我们可以看到RangeAssignor的分配策略:
1、将分区按顺序分配给消费者。
2、当消费者组成员变化时,重新分配分区,以确保分区尽量按顺序和均匀地分配给所有消费者。
3、分区分配尽量保持连续性。
这种策略的好处是分区分配简单且稳定,减少了分区在消费者组成员变化时的重新分配范围,从而减少了重平衡的频率和影响。

以下是配置RangeAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class RangeAssignorExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置分区分配策略为 RangeAssignor
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(List.of("example-topic"));

        // 消费消息的逻辑
        // ...
    }
}

StickyAssignorKafka 2.4及以上版本引入的一种分区分配策略,它的目标是尽量保持分区分配的稳定性,减少重平衡的频率。

我们通过一个具体的例子来说明StickyAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么StickyAssignor会尽量保持现有的分区分配不变,并重新分配C2的分区。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,StickyAssignor尽量保持C1C3的分区分配不变,只是将C2的分区重新分配给其他消费者。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,StickyAssignor会尝试保持现有的分区分配不变,并将分区尽量均匀地分配给所有消费者。新的分配可能如下:

C1: P0, P1
C3: P4, P5
C4: P2, P3

在这个过程中,StickyAssignor保持了C1C3的分区不变,并将C2的分区重新分配给C4

通过这个例子,我们可以看到StickyAssignor的分配策略:
1、尽量保持现有的分区分配不变。
2、当消费者组成员变化时,尽量最小化分区在消费者之间的移动。
3、尽量保持分区分配的平衡性。
这种策略的好处是减少了重平衡带来的影响,提高了分区分配的稳定性,减少了因分区移动带来的数据重新加载和处理的开销。

以下是配置StickyAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class StickyAssignorExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置分区分配策略为 StickyAssignor
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(List.of("example-topic"));

        // 消费消息的逻辑
        // ...
    }
}

或者在配置中进行指定

group.id=my-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor

【2】增加session.timeout.msheartbeat.interval.ms:增加session.timeout.msheartbeat.interval.ms的值,这样可以减少消费者因为心跳超时而被认为失效,从而触发重平衡。

1、session.timeout.ms是消费者与Kafka broker之间的会话超时时间。如果在这个时间内Kafka broker没有收到某个消费者的心跳,broker就会认为该消费者已经失效,并触发重平衡
2、heartbeat.interval.ms是消费者发送心跳给Kafka broker的时间间隔。心跳是消费者向broker表示自己仍然活跃的方式。

session.timeout.ms=30000
heartbeat.interval.ms=3000

3、heartbeat.interval.ms的值通常要远小于session.timeout.ms的值。这样可以确保在会话超时之前,消费者有多次机会发送心跳。一般建议session.timeout.ms至少是heartbeat.interval.ms10倍,以确保有足够的时间进行多次心跳尝试。

【3】合理配置消费者组:确保消费者组中的消费者数量稳定,避免频繁地增加或减少消费者。尽量在低峰期进行消费者的添加或移除操作。

【4】优化消费者性能:提高消费者的处理能力,确保消费者能够及时处理消息,避免因为处理延迟导致的重平衡。使用异步处理或批量处理来提高消费者的吞吐量。

【5】监控和报警:实时监控Kafka集群和消费者组的状态,设置报警机制,当检测到重平衡风险时,及时采取措施。

【6】使用静态成员Static MembershipKafka 2.3及以上版本支持静态成员功能,可以通过配置group.instance.id来减少重平衡的频率。

group.instance.idKafka 2.4.0引入的一个配置项,用于为每个消费者实例指定一个唯一的标识符。当消费者组中的消费者具有唯一的group.instance.id时,Kafka可以更智能地处理消费者组成员的变化,从而减少不必要的重平衡。

静态成员:通过配置group.instance.id,消费者实例变成了“静态成员”,即使它们暂时断开连接,Kafka也会保留它们的成员身份。这与传统的动态成员(没有group.instance.id)不同,动态成员在断开连接后会被移除,从而触发重平衡。

group.id=my-consumer-group
group.instance.id=consumer-instance-1

【7】调整rebalance.timeout.ms:增加rebalance.timeout.ms的值,确保消费者有足够的时间完成重平衡过程,避免因超时导致的频繁重平衡。

消费者Rebalance分区分配策略

主要包含四种relalance策略:RangeAssignor(范围分配策略),RoundRobinAssignor(轮询分配策略),StickyAssignor(粘性分配策略),CooperativeStickyAssignor(协作粘性分配策略),之前已经讲过两个,这里聊聊剩下的两个

RoundRobinAssignor(轮询分配策略)

RoundRobinAssignor采用轮询的方式将分区分配给消费者。它会将所有分区和消费者按照字典顺序排序,然后依次将每个分区分配给下一个消费者,直到所有分区都被分配完毕。

CooperativeStickyAssignor(协作粘性分配策略)

CooperativeStickyAssignorStickyAssignor的改进版本,它引入了协作重平衡的概念,使得重平衡过程更加平滑,减少了重平衡期间的停顿时间。

二、Rebalance 过程

第一阶段:选择"组协调器"
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer groupcoordinator

第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入groupconsumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/889322.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

若依项目搭建(黑马经验)

欢迎你搜索和了解到若依&#xff0c;这个项目是从黑马课程的一个实践&#xff0c;更多的项目经历和平台搭建期待着我们的共同学习&#xff01; 关于若依 若依是一套全部开源的快速开发平台&#xff0c;毫无保留给个人及企业免费使用。 前端采用Vue、Element UI。后端采用Sprin…

使用Pytorch+Numpy+Matplotlib实现手写字体分类和图像显示

文章目录 1.引用2.内置图片数据集加载3.处理为batch类型4.设置运行设备5.查看数据6.绘图查看数据图片(1)不显示图片标签(2)打印图片标签(3)图片显示标签 7.定义卷积函数8.卷积实例化、损失函数、优化器9.训练和测试损失、正确率(1)训练(2)测试(3)循环(4)损失和正确率曲线(5)输出…

Spark_累加器

分布式共享只写变量 实现原理&#xff1a;  累加器用来把Executor端变量信息聚合到Driver端&#xff0c;在Driver程序中定义的变量&#xff0c;在Executor端的每个Task都会得到这个变量的一份新的副本&#xff0c;每个task更新这些副本的值后&#xff0c;传回Driver端进行mer…

执行node.js获取本机Ip命令,报:Error: Cannot find module ‘ip‘错误

Error: Cannot find module ip是由于没有改模块的依赖包&#xff0c;需要进行安装&#xff0c;以管理员的身份打开命令行&#xff0c;执行npm install ip 获取当前电脑的ip地址 方法一&#xff1a; const ip require("ip")/*** 1:获取当前电脑的ip地址*/ console.…

PPT技巧:保护PPT文件的方法有哪些?

PPT文件制作好之后保证文件不出错应该是很重要的&#xff0c;毕竟是要拿出去展示的&#xff0c;今天分享PPT加密方法给大家。希望能够帮助大家保护好自己的PPT文件。 打开密码 如果想要其他人需要输入正确的密码才能够打开文件查看并编辑&#xff0c;我们可以给PPT文件设置打…

Github优质项目推荐 - 第六期

文章目录 Github优质项目推荐 - 第六期一、【WiFiAnalyzer】&#xff0c;3.4k stars - WiFi 网络分析工具二、【penpot】&#xff0c;33k stars - UI 设计与原型制作平台三、【Inpaint-Anything】&#xff0c;6.4k stars - 修复图像、视频和3D 场景中的任何内容四、【Malware-P…

gitee开源商城diygw-mall

DIYGW可视化开源商城系统。所的界面布局显示都通过低代码可视化开发工具生成源码实现。支持集成微信小程序支付。 DIYGW可视化开源商城系统是一款基于thinkphp8 framework、 element plus admin、uniapp开发而成的前后端分离系统。 开源商城项目源码地址&#xff1a;diygw商城…

stm32定时器中断和外部中断

一&#xff0c;中断系统的介绍 中断&#xff1a;在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转而去处理中断程序&#xff0c;处理完成后又返回原来被暂停的位置继续运行 中…

知识图谱入门——7:阶段案例:使用 Protégé、Jupyter Notebook 中的 spaCy 和 Neo4j Desktop 搭建知识图谱

在 Windows 环境中结合使用 Protg、Jupyter Notebook 中的 spaCy 和 Neo4j Desktop&#xff0c;可以高效地实现从自然语言处理&#xff08;NLP&#xff09;到知识图谱构建的全过程。本案例将详细论述环境配置、步骤实现以及一些扩展和不足之处。 源文件已上传我的资源区。 文章…

【深海王国】初中生也能画的电路板?目录合集

Hi٩(๑ ^ o ^ ๑)۶, 各位深海王国的同志们&#xff0c;早上下午晚上凌晨好呀~辛勤工作的你今天也辛苦啦 (o゜▽゜)o☆ 今天大都督为大家带来系列文章《初中生也能画的电路板》&#xff0c;帮你一周内快速入门PCB设计&#xff0c;手把手教你从元器件库添加、电路原理图绘制、…

初阶C语言-结构体

一.结构体的声明 1.结构体类型的声明 1.1结构的基础知识 结构是一些值的集合&#xff0c;这些值称为称为变量。结构的每个成员可以是不同类型的变量。 1.2结构的声明 struct tag //struct是结构体关键字&#xff0c;tag是结构体类型名称 { member - list;//成员变…

minio集群部署

最近接触到minio&#xff0c; 将本地集群部署&#xff0c;分别在ubuntu、centos stream9上进行了搭建&#xff0c;目前看里面的小坑不小&#xff0c;记录以下教程&#xff0c;以备忘、以供他人借鉴。 #### 准备 1、因新版本的minio要求&#xff0c;集群部署必须使用挂载非 roo…

AAA Mysql与redis的主从复制原理

一 &#xff1a;Mysql主从复制 重要的两个日志文件&#xff1a;bin log 和 relay log bin log&#xff1a;二进制日志&#xff08;binnary log&#xff09;以事件形式记录了对MySQL数据库执行更改的所有操作。 relay log&#xff1a;用来保存从节点I/O线程接受的bin log日志…

Java中System类和RunTime类的Api

目录 System 类 1)out 2)err 3)in 4)currentTimeMillis() 5)nanoTime() 6)arraycopy(Object 要从里面复制东西的数组, int 要从里面复制东西数组的索引起始位置, Object 获得复制元素的数组, int 获得复制元素数组的起始索引, int 要复制东西的个数) 7)gc() 8)exit(int status)…

51单片机的无线通信智能车库门【proteus仿真+程序+报告+原理图+演示视频】

1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块红外传感器光照传感器时钟模块步进电机蓝牙按键、LED、蜂鸣器等模块构成。适用于智能车库自动门、无线控制车库门等相似项目。 可实现功能: 1、LCD1602实时显示北京时间和自动/手动模式&#xff0c;以及验证是否成…

【Arduino IDE安装】Arduino IDE的简介和安装详情

目录 &#x1f31e;1. Arduino IDE概述 &#x1f31e;2. Arduino IDE安装详情 &#x1f30d;2.1 获取安装包 &#x1f30d;2.2 安装详情 &#x1f30d;2.3 配置中文 &#x1f30d;2.4 其他配置 &#x1f31e;1. Arduino IDE概述 Arduino IDE&#xff08;Integrated Deve…

使用 Go 和 Gin 框架构建简单的用户和物品管理 Web 服务

使用 Go 和 Gin 框架构建简单的用户和物品管理 Web 服务 在本项目中&#xff0c;我们使用 Go 语言和 Gin 框架构建了一个简单的 Web 服务&#xff0c;能够管理用户和物品的信息。该服务实现了两个主要接口&#xff1a;根据用户 ID 获取用户名称&#xff0c;以及根据物品 ID 获…

模拟实现消息队列(基于SpringBoot实现)

项目代码 提要&#xff1a;此处的消息队列是仿照RabbitMQ实现&#xff08;参数之类的&#xff09;&#xff0c;实现一些基本的操作&#xff1a;创建/销毁交互机&#xff08;exchangeDeclare&#xff0c;exchangeDelete&#xff09;&#xff0c;队列&#xff08;queueDeclare&a…

【电路笔记】-求和运算放大器

求和运算放大器 文章目录 求和运算放大器1、概述2、反相求和放大器3、同相求和放大器4、减法放大器5、应用5.1 音频混合器5.2 数模转换器 (DAC)6、总结1、概述 在我们之前有关运算放大器的大部分文章中,仅将一个输入应用于反相或非反相运算放大器的输入。在本文中,将讨论一种…

Python:条件分支 if 语句全讲解

Python&#xff1a;条件分支 if 语句全讲解 如果我拿出下面的代码&#xff0c;阁下该做何应对&#xff1f; if not reset_excuted and (terminated or truncated):... else:...---- 前言&#xff1a; 消化论文代码的时候看到这个东西直接大脑冻结&#xff0c;没想过会在这么…