Flink-Kafka 内置的反序列化 schemas

  • SimpleStringSchema

SimpleStringSchema 把 message 反序列化为字符串。如果你的 message 有键, 则忽略键。

1
2
3
4
5
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new SimpleStringSchema(),
properties
)
  • JSONDeserializationSchema

JSONDeserializationSchema 使用 jackson 将 message 反序列化为 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode 对象流。你可以使用 .get("property") 方法访问字段。再一次, 键被忽略。

1
2
3
4
5
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new JSONDeserializationSchema(),
properties
)
  • JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema 与前一个非常类似,但处理带有json编码的键值的消息。

1
2
3
4
5
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new JSONKeyValueDeserializationSchema(true),
properties
)

返回的 ObjectNode 包含如下字段:

  • key:键中存在的所有字段
  • value:所有的 message 字段
  • metadata(可选):暴露消息的 offset, partitiontopic (将 true 传递给构造函数以获取元数据)

例如:

1
2
3
4
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}

会被解码为:

1
2
3
4
5
6
7
8
9
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}