Kafka -- 高水位 + Leader Epoch

高水位

水位的定义

  1. 经典教科书
    • 在时刻T,任意创建时间(Event Time)为T',且T'<=T的所有事件都已经到达,那么T就被定义为水位
  2. 《Streaming System》
    • 水位是一个单调增加且表征最早未完成工作的时间戳
  3. 上图中标注为Completed的蓝色部分代表已经完成的工作,标注为In-Flight的红色部分代表正在进行中的工作
    • 两者的边界就是水位线
  4. Kafka中,水位不是时间戳,而是与位置信息绑定的,即用消息位移来表征水位
    • Kafka中也有低水位(Low Watermark),是与Kafka删除消息相关联的概念

高水位的作用

  1. 两个作用
    • 定义消息可见性,即用来标识分区下的哪些消息可以被消费者消费的
    • 帮助Kafka完成副本同步
  2. 上图是某个分区Leader副本的高水位图,在分区高水位以下的消息被认为是已提交消息,反之为未提交消息
    • 消费者只能消费已提交消息,即位移小于8的所有消息
    • 暂不讨论Kafka事务,Kafka的事务机制会影响消费者所能看到的消息的范围,不只是简单依赖高水位来判断
      • 而是依靠LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性
    • 位移值等于高水位的消息也属于未提交消息,即高水位上的消息是不能被消费者消费的
    • 图中还有一个日志末端位移(Log End Offset,LEO)的概念,表示副本写入下一条消息的位移值
      • LEO为15,方框是虚线,表示当前副本只有15条消息,位移从0到14,下一条新消息的位移为15
    • [高水位,LEO)的消息属于未提交消息,在同一个副本对象,高水位值不会大于LEO值
    • 高水位LEO是副本对象的两个重要属性
      • Kafka所有副本对象都有对应的高水位和LEO,而Kafka使用Leader副本的高水位来定义所在分区的高水位

高水位的更新机制

远程副本

  1. 每个副本对象都保存了一组高水位和LEO值,Leader副本所在的Broker还保存了其它Follower副本的LEO值
  2. Kafka把Broker 0上保存的Follower副本又称为远程副本Remote Replica)
  3. Kafka副本机制在运行过程中
    • 会更新
      • Broker 1上Follower副本的高水位和LEO值
      • Broker 0上Leader副本的高水位和LEO以及所有远程副本的LEO
    • 不会更新
      • Broker 0所有远程副本的高水位值,即图中标记为灰色的部分
  4. Broker 0保存远程副本的作用
    • 帮助Leader副本确定高水位,即分区高水位

更新时机

更新对象 更新时机
Broker 0上Leader副本的LEO Leader副本接收到生产者发送的消息,写入到本地磁盘后,会更新其LEO值
Broker 1上Follower副本的LEO Follower副本从Leader副本拉取消息,写入本地磁盘后,会更新其LEO值
Broker 0上远程副本的LEO Follower副本从Leader副本拉取消息时,会告诉Leader副本从哪个位移开始拉取
Leader副本会使用这个位移值来更新远程副本的LEO
Broker 0上Leader副本的高水位 两个更新时机:一个是Leader副本更新其LEO之后,一个是更新完远程副本LEO之后
具体算法:取Leader副本和所有与Leader同步的远程副本LEO中的最小值
Broker 1上Follower副本的高水位 Follower副本成功更新完LEO后,会比较其LEO与Leader副本发来的高水位值
并用两者的较小值去更新自己的高水位
  1. 与Leader副本保持同步,需要满足两个条件
    • 该远程Follower副本在ISR
    • 该远程Follower副本LEO值落后Leader副本LEO值的时间不超过参数replica.lag.time.max.ms10秒
  2. 某个副本能否进入ISR是由第二个条件判断的
    • 2个条件判断是为了应对意外情况:Follower副本已经追上Leader,却不在ISR中
    • 假设Kafka只判断第1个条件,副本F刚刚重启,并且已经具备进入ISR的资格,但此时尚未进入到ISR
      • 由于缺少了副本F的判断,分区高水位有可能超过真正ISR中的副本LEO,而高水位>LEO不允许

Leader副本

  1. 处理生产者请求
    • 写入消息到本地磁盘,更新LEO
    • 更新分区高水位
      • 获取Leader副本所在Broker端保存的所有远程副本LEO值{LEO-1, LEO-2,... LEO-n}
      • 获取Leader副本的LEO值:currentLEO
      • 更新currentHW = min(currentLEO, LEO-1, LEO-2,... LEO-n)
  2. 处理Follower副本拉取消息
    • 读取磁盘(或页缓存)中的消息数据
    • 使用Follower副本发送请求中的位移值来更新远程副本的LEO
    • 更新分区高水位值(与上面一致)

Follower副本

  1. 从Leader拉取消息
    • 写入消息到本地磁盘
    • 更新LEO
    • 更新高水位
      • 获取Leader发送的高水位值:currentHW
      • 获取步骤2中更新的LEO值:currentLEO
      • 更新高水位min(currentHW, currentLEO)

副本同步样例

主题是单分区两副本,首先是初始状态,所有值都是0

当生产者向主题分区发送一条消息后,状态变更为

此时,Leader副本成功将消息写入到本地磁盘,将LEO值更新为1(更新高水位值为0,并把结果发送给Follower副本)
Follower再次尝试从Leader拉取消息,此时有消息可以拉取,Follower副本也成功更新LEO为1(并将高水位更新为0)
此时,Leader副本和Follower副本的LEO都是1,但各自的高水位依然是0,需要等到下一轮的拉取中被更新

在新一轮的拉取请求中,由于位移值为0的消息已经拉取成功,因此Follower副本这次拉取请求的位移值为1
Leader副本接收到此请求后,更新远程副本LEO1,然后更新Leader高水位值为1
最后,Leader副本会将当前更新过的高水位值1发送给Follower副本,Follower副本接收到后,也会将自己的高水位值更新为1

Leader Epoch

基本概念

  1. 上面的副本同步过程中,Follower副本的高水位更新需要一轮额外的拉取请求才能实现
    • 如果扩展到多个Follower副本,可能需要多轮拉取请求
    • Leader副本高水位更新Follower副本高水位更新时间上存在错配
      • 这种错配是很多数据丢失数据不一致问题的根源
      • 因此,社区在0.11版本正式引入了Leader Epoch概念,来规避高水位更新错配导致的各种不一致问题
  2. Leader Epoch可以大致认为是Leader版本,由两部分数据组成
    • Epoch
      • 一个单调递增的版本号
      • 每当副本领导权发生变更时,都会增加该版本号
      • 小版本号的Leader被认为是过期Leader,不能再行使Leader权利
    • 起始位移(Start Offset)
      • Leader副本在该Epoch值上写入的首条消息的位移
  3. 两个Leader Epoch,<0,0><1,120>
    • <0,0>表示版本号为0,该版本的Leader从位移0开始保存消息,一共保存了120条消息
    • 之后Leader发生了变更,版本号增加到1,新版本的起始位移是120
  4. Broker在内存中为每个分区都缓存Leader Epoch数据,同时还会定期地将这些数据持久化到一个checkpoint文件中
    • Leader副本写入消息到磁盘时,Broker会尝试更新这部分缓存
    • 如果Leader是首次写入消息,那么Broker会向缓存中增加Leader Epoch条目,否则不做更新
    • 这样每次有Leader变更时,新的Leader副本会查询这部分缓存,取出对应的Leader Epoch的起始位移
      • 然后进行相关的逻辑判断,避免数据丢失数据不一致的情况

数据丢失

  1. 开始时,副本A和副本B都处于正常状态,A是Leader副本
  2. 某个的生产者(默认acks设置)向A发送了两条消息,A全部写入成功,Kafka会通知生产者说两条消息全部发送成功
  3. 假设Leader和Follower都写入了这两条消息,而且Leader副本的高水位也更新了,但Follower副本的高水位还未更新
  4. 此时副本B所在的Broker宕机,当它重启回来后,副本B会执行日志截断!!
    • 将LEO值调整为之前的高水位值!!,也就是1
    • 位移值为1的那条消息被副本B从磁盘中删除,此时副本B的底层磁盘文件中只保留1条消息,即位移为0的消息
  5. 副本B执行完日志截断操作后,开始从A拉取消息,此时恰好副本A所在的Broker也宕机了,副本B自然成为新的Leader
    • 当A回来后,需要执行相同的日志截断操作,但不能超过新Leader,即将高水位调整与B相同的值,也就是1
    • 操作完成后,位移值为1的那条消息就从两个副本中被永远抹掉,造成了数据丢失

Leader Epoch规避数据丢失

  1. Follower副本B重启后,需要向A发送一个特殊的请求去获取Leader的LEO值,该值为2
  2. 当获知Leader LEO后,B发现该LEO值大于等于自己的LEO,而且缓存中也没有保存任何起始位移值>2的Epoch条目
    • B无需执行任何日志截断操作
    • 明显改进:副本是否执行日志截断不再依赖于高水位进行判断
  3. A宕机,B成为Leader,当A重启回来后,执行与B相同的逻辑判断,发现同样不需要执行日志截断
    • 至此位移值为1的那条消息在两个副本中均得到保留
    • 后面生产者向B写入新消息后,副本B所在的Broker缓存中会生成新的Leader Epoch条目:[Epoch=1, Offset=2]

小结

  1. 高水位在界定Kafka消息对外可见性以及实现副本机制方面起到非常重要的作用
    • 但设计上的缺陷给Kafka留下了很多数据丢失数据不一致的潜在风险
  2. 为此,社区引入了Leader Epoch机制,尝试规避这类风险,并且效果不错
0%