Wait the light to fall

Flink-Kafka 内置的反序列化 schemas

焉知非鱼
  • SimpleStringSchema

SimpleStringSchema 把 message 反序列化为字符串。如果你的 message 有键, 则忽略键。

val myConsumer = new FlinkKafkaConsumer(
    kafkaConf.topic, 
    new SimpleStringSchema(), 
    properties
)
  • JSONDeserializationSchema

JSONDeserializationSchema 使用 jackson 将 message 反序列化为 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode 对象流。你可以使用 .get("property") 方法访问字段。再一次, 键被忽略。

val myConsumer = new FlinkKafkaConsumer(
    kafkaConf.topic, 
    new JSONDeserializationSchema(), 
    properties
)
  • JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema 与前一个非常类似,但处理带有json编码的键值的消息。

val myConsumer = new FlinkKafkaConsumer(
    kafkaConf.topic, 
    new JSONKeyValueDeserializationSchema(true), 
    properties
)

返回的 ObjectNode 包含如下字段:

  • key:键中存在的所有字段
  • value:所有的 message 字段
  • metadata(可选):暴露消息的 offset, partitiontopic (将 true 传递给构造函数以获取元数据)

例如:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}

会被解码为:

{
    "key":{"keyField1":1,"keyField2":2},
    "value":{"valueField1":1,"valueField2":{"foo":"bar"}},
    "metadata":{
        "offset":43,
        "topic":"json-topic",
        "partition":0
    }
}