Kafka -- 生产者管理TCP连接

建立TCP连接

创建KafkaProducer实例

1
2
3
4
5
6
7
8
9
10
11
12
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
// try-with-resources
// 创建KafkaProducer实例时,会在后台创建并启动Sender线程,Sender线程开始运行时首先会创建与Broker的TCP连接
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, KEY, VALUE);
Callback callback = (metadata, exception) -> {
};
producer.send(record, callback);
}
  1. bootstrap.servers是Producer的核心参数之一,指定了Producer启动时要连接的Broker地址
  2. 如果bootstrap.servers指定了1000个Broker,那么Producer启动时会首先创建与这1000个Broker的TCP连接
  3. 因此不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常配置3~4台足够
    • Producer一旦连接到集群中的任意一台Broker,就能拿到整个集群的Broker信息(metadata request
  4. 在创建KafkaProducer实例时启动Sender线程是不合理
    • 在对象构造器中启动线程会造成this指针逃逸,理论上Sender线程能够观测到一个尚未构造完成的KafkaProducer实例
    • 在构造对象时创建线程是没有问题的,但最好不要同时启动线程

相关日志

1
2
3
4
5
6
7
8
9
10
11
12
13
Sender          - Starting Kafka producer I/O thread.
KafkaProducer - Kafka producer started
NetworkClient - Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
NetworkClient - Initiating connection to node localhost:9092 (id: -1 rack: null)
Selector - Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
NetworkClient - Completed connection to node -1. Fetching API versions.
NetworkClient - Initiating API versions fetch from node -1.
NetworkClient - Sending metadata request (type=MetadataRequest, topics=zhongmingmao) to node localhost:9092 (id: -1 rack: null)
KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
NetworkClient - Initiating connection to node 192.168.2.1:9092 (id: 0 rack: null)
Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
Sender - Shutdown of Kafka producer I/O thread has completed.
KafkaProducer - Kafka producer has been closed

其他场景

  1. 其他可能创建TCP连接的场景:更新元数据后消息发送时
  2. 当Producer更新了集群的元数据后,如果发现与某些Broker当前没有连接,那么Producer会创建一个TCP连接
    • 场景1
      • 当Producer尝试向不存在的主题发送消息时,Broker会告诉Producer这个主题不存在
      • 此时Producer会发送metadata requestKafka集群,去尝试获取最新的元数据信息
        • 集群中所有的Broker建立TCP连接
    • 场景2
      • Producer通过metadata.max.age.ms参数定期地去更新元数据信息,默认值300000,即5分钟
  3. 当Producer要发送消息时,Producer发现与目标Broker(依赖负载均衡算法)还没有连接,也会创建一个TCP连接

关闭TCP连接

  1. Producer端关闭TCP连接有两种方式:用户主动关闭Kafka自动关闭
  2. 用户主动关闭
    • 广义的主动关闭,包括用户调用kill -9来杀掉Producer,最推荐的方式:producer.close()
  3. Kafka自动关闭
    • Producer端参数connections.max.idle.ms,默认值540000,即9分钟
    • 如果9分钟内没有任何请求经过某个TCP连接,Kafka会主动把TCP连接关闭
    • connections.max.idle.ms=-1禁用这种机制,TCP连接将成为永久长连接
      • Kafka创建的Socket连接都开启了keepalive
    • 关闭TCP连接的发起方是Kafka客户端,属于被动关闭的场景
      • 被动关闭的后果就是会产生大量的CLOSE_WAIT连接
      • Producer端或Client端没有机会显式地观测到此TCP连接已被中断
0%