kafka入门学习

简介

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。主要设计目标如下:

  • 高可用:多副本机制。
  • 高并发:网络架构设计 三层架构:多selector -> 多线程 -> 队列的设计(NIO)
  • 高性能:
    • 写数据:1.把数据先写入到OS Cache;2.写到磁盘上面是顺序写,性能很高。
    • 读数据:1.根据稀疏索引,快速定位到要消费的数据;2.零拷贝机制 减少数据的拷贝 减少了应用程序与操作系统上下文切换。

page-imgt1

Producer、Consumer

客户端有两种基本类型:

  • 生产者(Producer):创建消息。
  • 消费者(Consumer):负责消费/读取消息。

Topic、Partition

消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,类似于数据库中的表。
默认一个topic有一个分区(partition),自己可设置多个分区(分区分散存储在服务器不同节点上)。

Broker、Cluster

Kafka集群中,一个kafka服务器就是一个broker,它接受生产者发送的消息并存入磁盘。
若干个 Broker 组成一个集群(Cluster),集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。

在集群内,一个分区(partition)由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;
一个分区(partition)可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。

多集群

当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。

上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。

pull 模型

消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?

作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。
一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。
push 模式和 pull 模式各有优劣。

push 模式很难适应消费速率不同的消费者。
因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适。
优点

  • pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,可以自主决定是否批量的从从broker拉取数据。
  • Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,还能选择不同的提交方式从而实现不同的传输语义。

缺点

  • 如果broker没有可供消费的消息,将导致consume不断在循环中轮询,直到新消息到达。

    为了避免这点,Kafka有个参数可以让consumer阻塞直到新消息到达。

消息丢失

https://blog.csdn.net/u014753478/article/details/117932192
https://zhuanlan.zhihu.com/p/459610418
https://zhuanlan.zhihu.com/p/341546586

Kafka 中有三次消息传递过程(这三步中每一步都可能会出现丢失数据的情况):

  1. Producer 端发送消息给 Kafka Broker 端。
  2. Kafka Broker 将消息进行同步,并且持久化数据。
  3. Consumer 端从Kafka Broker 将消息拉取并进行消费。

Producer丢失

  1. 生产者是与leader直接交互,所以先从集群获取topic对应分区的leader元数据;
  2. 获取到leader分区元数据后直接将消息发给过去;
  3. Kafka Broker对应的leader分区收到消息后写入文件持久化;
  4. Follower拉取Leader消息与Leader的数据保持一致;
  5. Follower消息拉取完毕需要给Leader回复ACK确认消息;
  6. Kafka Leader和Follower分区同步完,Leader分区会给生产者回复ACK确认消息。

生产者发送消息到broker是会存在消息丢失的,大多可能是由于网络原因引起的,消息中间件中一般都是通过ack来解决这个问题的,kafka中可以通过设置ack来解决这个问题。

Kafka通过配置request.required.acks属性来确认消息的生产

  • ack==0表示不进行消息接收是否成功的确认;不能保证消息是否发送成功,生成环境基本不会用。
  • ack==1表示当Leader接收成功时确认;只要Leader存活就可以保证不丢失,保证了吞吐量。
  • ack==-1或者all表示Leader和Follower都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。

Broker持久化丢失

Broker 集群接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是「异步批量刷盘的策略」:按照一定的消息量和间隔时间进行刷盘。
首先会将数据存储到 「PageCache」 中,至于什么时候将 Cache 中的数据刷盘是由「操作系统」根据自己的策略决定或者调用 fsync 命令进行强制刷盘。
如果此时 Broker 宕机 Crash 掉,且选举了一个落后 Leader Partition 很多的 Follower Partition 成为新的 Leader Partition,那么落后的消息数据就会丢失。

解决方案

如果broker发生故障,那么此时page cache的数据就会丢失,broker端可以设置刷盘的参数,比如多久刷盘一次,不过这个参数不建议去修改。

最好的方案还是设置多副本,一个分区设置几个副本,当broker故障的时候,如果还有其他副本,那么数据就不会丢失。

Consumer消费丢失

Consumer的消费方式主要分为两种:

  1. 自动提交offset, Automatic Offset Committing
  2. 手动提交offset, Manual Offset Control

Consumer自动提交机制是根据一定的时间间隔,将收到的消息进行commit。commit的过程和消费过程是异步的,也就是说可能存在消费过程未成功,但是commit已经提交了。此时消息就丢失了。

解决方式是把自动提交改为手动提交。可以保证消息至少被消费一次,但是此时可能出现重复消费的情况。

消息重复

Producer重复发送

Consumer重复消费

原因

数据消费完没有及时提交offset到broker。
消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

解决方案

取消自动提交:
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

下游做幂等:
一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。

zk的作用

Zookeeper是分布式协调(它不是数据库)。
kafka中使用了zookeeper的分布式锁和分布式配置及统一命名的分布式协调解决方案。
在kafka的broker集群中的controller的选择,是通过zk的临时节点争抢获得的。
brokerlD等如果自增的话也是通过zk的节点version实现的全局唯一。
kafka中broker中的状态数据也是存储在zk中,不过这里要注意,zk不是数据库,所以存储的属于元数据
而,新旧版本变化中,就把曾经的offset从zk中迁移出了zk

Rebalance/重平衡

https://zhuanlan.zhihu.com/p/108564156
https://cloud.tencent.com/developer/article/1852157

触发条件

消费组有多个消费者,消费组在消费一个Topic的时候,kafka为了保证消息消费不重不漏,kafka将每个partition唯一性地分配给了消费者。但是如果某个消费组在消费的途中有消费者宕机或者有新的消费者加入的时候那么partition分配就是不公平的,可能导致某些消费者负载特别重,某些消费者又没有负载的情况。

Kafka有一种专门的机制处理这种情况,这种机制称为Rebalance机制。

当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:

  • 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机。
  • 消费者无法在指定的时间之内完成消息的消费。
  • 消费组订阅的Topic发生了变化。
  • 订阅的Topic的partition发生了变化。

Coordinator

Coordinator是Rebalance机制中非常重要的一个角色。

每个消费组都会有一个coordinator,Coordinator负责处理管理组内的消费者和位移管理,Coordinator并不负责消费组内的partition分配。消费者通过心跳的方式告知Coordinator自己仍然处于存活状态,Coordinator以session. timeout. ms参数的频率检测消费组group内消费者存活情况,该参数的默认值是10s,如果该值太大,那么coordinator需要非常长时间才能检测到消费者宕机。

消费组leader

在Kafka的消费端,会有一个消费者协调器以及消费组,组协调器(Group Coordinator)需要为消费组内的消费者选举出一个消费组的leader。
如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,选举方式如下:

1
2
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption

在组协调器中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。而leader的取值为HashMap中的第一个键值对的key(等同于随机)。

消费组的Leader和Coordinator没有关联。
消费组的leader负责Rebalance过程中消费分配方案的制定。

Rebalance流程

Rebalance 发生时,Group 下所有Consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。
在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。

Join

所有成员都向Group Coordinator发送JoinGroup请求,请求加入消费组。
一旦所有成员都发送了JoinGroup请求,Coordinator会从中选择一个Consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

Sync

这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。
一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator。(非leader也会发SyncGroup请求,只是内容为空。)

coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

避免Rebalance

后两个时机是可以人为避免的(订阅topic或者Topic的partition发生了变化)。发生Rebalance最常见的原因是消费组成员个数发生变化。这其中消费者成员正常的添加和停掉导致Rebalance,也是无法避免。

但是在某些情况下,Consumer实例会被Coordinator错误地认为已停止从而被踢出Group。从而导致rebalance。
这种情况可以通过Consumer端的参数session.timeout.msmax.poll.interval.ms进行配置。

Consumer还提供了控制发送心跳请求频率的参数,就是heartbeat.interval.ms
这个值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更快地知道是否开启Rebalance,因为Coordinator通知各个Consumer实例是否开启Rebalance就是将REBALANCE_NEEDED标志封装进心跳请求的响应体中。

总之,要为业务处理逻辑留下充足的时间使Consumer不会因为处理这些消息的时间太长而引发Rebalance,但也不能时间设置过长导致Consumer宕机但迟迟没有被踢出Group。

如何保证可靠性

  • 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息A,然后写入消息B,那么消费者会先读取消息A再读取消息B。
  • 当消息写入所有in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有in-sync状态副本写入才返回。
  • 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
  • 消费者只能读取到已提交的消息。