Flink-Kafka 内置的反序列化 schemas
— 焉知非鱼- SimpleStringSchema
SimpleStringSchema
把 message 反序列化为字符串。如果你的 message 有键, 则忽略键。
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new SimpleStringSchema(),
properties
)
- JSONDeserializationSchema
JSONDeserializationSchema
使用 jackson 将 message 反序列化为 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode
对象流。你可以使用 .get("property")
方法访问字段。再一次, 键被忽略。
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new JSONDeserializationSchema(),
properties
)
- JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema
与前一个非常类似,但处理带有json编码的键和值的消息。
val myConsumer = new FlinkKafkaConsumer(
kafkaConf.topic,
new JSONKeyValueDeserializationSchema(true),
properties
)
返回的 ObjectNode
包含如下字段:
key
:键中存在的所有字段value
:所有的 message 字段metadata
(可选):暴露消息的offset
,partition
和topic
(将true
传递给构造函数以获取元数据)
例如:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
会被解码为:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}