Kafka -- Java消费者管理TCP连接

创建TCP连接

  1. 消费者端的主要程序入口是KafkaConsumer,但构建KafkaConsumer实例不会创建任何TCP连接
    • 构建KafkaProducer实例时,会在后台默默地启动一个Sender线程,Sender线程负责Socket连接的创建
    • 在Java构造函数中启动线程,会造成this指针逃逸,是一个隐患
  2. 消费者的TCP连接是在调用KafkaConsumer.poll方法时被创建的,poll方法内部有3个时机可以创建TCP连接

发起FindCoordinator请求时

  1. 消费者组有个组件叫作协调者(Coordinator)
    • 驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理
  2. 当消费者程序首次启动调用poll方法时,需要向Kafka集群(集群中的任意Broker)发送FindCoordinator请求
    • 社区优化:消费者程序会向集群中当前负载最小的那台Broker发送请求
    • 单向负载评估(非最优解):消费者连接的所有Broker中,谁的待发送请求最少,谁的负载就越小

连接Coordinator时

  1. Broker处理完FindCoordinator请求后,会返回对应的响应结果,显式地告诉消费者哪个Broker是真正的Coordinator
  2. 消费者向真正的Coordinator所在的Broker发起Socket连接
  3. 成功接入Coordinator后,Coordinator开启组协调操作(加入组、等待组分配、心跳请求处理、位移获取和提交)

消费数据时

  1. 消费者会为每个要消费的分区创建与该分区领导者副本所在Broker的Socket连接
  2. 假设消费者要消费5个分区的数据,这5个分区各自的领导者副本分布在4台Broker上
    • 那么消费者在消费时会创建与这4台Broker的Socket连接

TCP连接数

日志详解

[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者程序创建的第一个TCP连接,该Socket用于发送FindCoordinator请求
此时消费者对要连接的Kafka集群一无所知,因此它连接的Broker节点的ID为-1,表示不知道要连接的Broker的任何信息


[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=’t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)

消费者复用刚刚创建的Socket连接,向Kafka集群发送元数据请求以获取整个集群的信息


[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)

消费者程序开始发送FindCoordinator请求给第一步中连接的Broker,即localhost:9092(nodeId为-1


[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)

十几毫秒后,消费者程序成功地获悉Coordinator所在的Broker,即node_id=2,host=localhost,port=9094


[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者此时已经知道协调者Broker的连接信息了,发起第二个Socket连接,创建连向localhost:9094的TCP连接
只有连接了Coordinator,消费者才能正常地开启消费组的各种功能以及后续的消息消费
此时的id是由Integer.MAX_VALUE减去Coordinator所在的Broker的Id计算出来的,即2147483647 - 2 = 2147483645
这种节点ID的标记方式是Kafka社区特意为之,目的是要让组协调请求真正的数据获取请求使用不同的Socket连接


[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者又分别创建了新的TCP连接,主要用于实际的消息获取

3类TCP连接

  1. 确定协调者获取集群元数据
  2. 连接协调者,令其执行组成员管理操作
  3. 执行实际的消息获取

关闭TCP连接

  1. 与生产者类似,消费者关闭Socket分为主动关闭Kafka自动关闭
    • 主动关闭
      • 手动调用KafkaConsumer.close或者执行kill(-2/-9)命令
    • 自动关闭
      • 消费端参数connection.max.idle.ms,默认是9分钟
      • 如果使用循环的方式来调用poll方法来消费消息,上面的所有请求都会定期发送到Broker,达到长连接的效果
  2. 第三类TCP连接成功创建后,消费者程序就会废弃第一类TCP连接,之后定期请求元数据,会改用第三类TCP连接
    • 第一类TCP连接会在后台被默默关闭,运行一段时间的消费者只会有后面两类TCP连接存在
0%