Kafka -- Avro + Twitter Bijection

Avro + Kafka Native API

  1. 比较繁琐
    • 编译Schema
    • 依赖于Avro实现自定义的序列化器和反序列化器

引入依赖

1
2
3
4
5
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.12</artifactId>
<version>0.9.6</version>
</dependency>

Schema

路径:src/main/resources/user.json

1
2
3
4
5
6
7
8
9
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", i);
record.put("name", TOPIC + i);
record.put("age", i);
byte[] bytes = recordInjection.apply(record);
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(TOPIC, bytes);
RecordMetadata metadata = producer.send(producerRecord).get();
log.info("id={}, timestamp={}, partition={}, offset={}",
record.get("id"), metadata.timestamp(), metadata.partition(), metadata.offset());
}
producer.close();

消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", i);
record.put("name", TOPIC + i);
record.put("age", i);
byte[] bytes = recordInjection.apply(record);
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(TOPIC, bytes);
RecordMetadata metadata = producer.send(producerRecord).get();
log.info("id={}, timestamp={}, partition={}, offset={}",
record.get("id"), metadata.timestamp(), metadata.partition(), metadata.offset());
}
producer.close();
0%