KSQL Stream Processing CookBook
— 焉知非鱼KSQL Stream Processing CookBook
数据过滤 #
KSQL 流式查询持续运行。您可以使用 KSQL CREATE STREAM AS
语法将流式查询输出持久保存到 Kafka topic。 KSQL 从一个 Kafka topic 中实时获取事件,转换它们并将它们连续写入另一个 topic。
此示例显示如何从入站 topic 过滤数据流数据以排除源自特定地理位置的记录。
指示 #
在此示例中,使用名为 purchase 的源事件流。
{
"order_id": 1,
"customer_name": "Maryanna Andryszczak",
"date_of_birth": "1922-06-06T02:21:59Z",
"product": "Nut - Walnut, Pieces",
"order_total_usd": "1.65",
"town": "Portland",
"country": "United States"
}
1、在 KSQL 中, 注册 purchases 流:
ksql> CREATE STREAM purchases \
(order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
product VARCHAR, order_total_usd VARCHAR, town VARCHAR, country VARCHAR) \
WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');
2、检查收到的前几条消息:
SELECT * FROM PURCHASES LIMIT 5;
3、过滤以显示国家/地区为德国的记录:
SELECT ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES WHERE COUNTRY='Germany';
4、创建一个仅包含德国订单的新 KSQL 流:
CREATE STREAM PUCHASES_GERMANY AS SELECT * FROM PURCHASES WHERE COUNTRY='Germany';
5、新流 PUCHASES_GERMANY 填充了一个同名的 Kafka topic,如下所示:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
PUCHASES_GERMANY | true | 4 | 1 | 0 | 0
purchases | true | 1 | 1 | 1 | 1
------------------------------------------------------------------------------------------------
ksql>
更多例子, 查看 stream-processing-cookbook
使用嵌套的 JSON 数据 #
KSQL 支持 flat 和分层(嵌套)数据结构。在此示例中,源数据采用嵌套的 JSON 格式。从 Confluent Platform 5.0 开始,KSQL 支持 STRUCT 数据类型,使您可以直接建模和访问嵌套数据结构。
指示 #
源事件流称为 user_logons。
{
"user": {
"first_name": "Lars",
"last_name": "Treagus",
"email": "ltreagus0@timesonline.co.uk"
},
"ip_address": "242.115.235.56",
"logon_date": "2018-02-05T19:45:59Z"
}
1、在 KSQL 中, 注册 user_logons 流。注意 STRUCT 用于定义嵌套的用户元素(first_name, last_name, email)。
ksql> CREATE STREAM user_logons \
(user STRUCT<\
first_name VARCHAR, \
last_name VARCHAR, \
email VARCHAR>, \
ip_address VARCHAR, \
logon_date VARCHAR) \
WITH (KAFKA_TOPIC='user_logons', VALUE_FORMAT='JSON');
2、使用 ->
运算符访问嵌套的列。
ksql> SELECT user->first_name AS USER_FIRST_NAME, \
user->last_name AS USER_LAST_NAME, \
user->email AS USER_EMAIL, \
ip_address, \
logon_date \
FROM user_logons;
-- Lars | Treagus | ltreagus0@timesonline.co.uk | 242.115.235.56 | 2018-02-05T19:45:59Z
3、(可选)将展平的结构保留到新的 Kafka topic,并根据到达源 topic 的新消息不断更新:
ksql> CREATE STREAM user_logons_all_cols AS \
SELECT user->first_name AS USER_FIRST_NAME, \
user->last_name AS USER_LAST_NAME, \
user->email AS USER_EMAIL, \
ip_address, \
logon_date \
FROM user_logons;
设置 Kafka Message Key #
Kafka 消息是键/值对。key 通常用于分区,如果将 Kafka topic 建模为 KSQL(或Kafka Streams中的KTable)中的表以进行查询或连接,则该 key 尤为重要。
通常需要在 Kafka topic 中获取数据并创建包含基于消息有效负载本身内的字段的 key 的派生版本。这可能是数据需要除已设置的 key 之外的键,或者生成应用程序未设置任何 key 的情况。后者的一个例子是 Kafka Connect的 JDBC 连接器。
指示 #
使用 PRINT 命令检查源 purchases 数据。显示一些消息后,按 Ctrl-C 取消。
请注意,表示 Kafka 消息键的系统列 ROWKEY 为 null:
ksql> PRINT 'purchases';
Format:JSON
{"ROWTIME":1543232251151,"ROWKEY":"null","order_id":64,"customer_name":"Denna Hoopper","date_of_birth":"1929-08-09T13:23:58Z","product":"Table Cloth 90x90 Colour","order_total_usd":"2.86","town":"Berlin","country":"Germany"}
{"ROWTIME":1543232251621,"ROWKEY":"null","order_id":65,"customer_name":"Emera Fairham","date_of_birth":"1990-01-07T09:38:11Z","product":"Pear - Halves","order_total_usd":"0.58","town":"Newton","country":"United Kingdom"}
{"ROWTIME":1543232252125,"ROWKEY":"null","order_id":66,"customer_name":"Stefano Gerauld","date_of_birth":"1973-02-11T05:17:18Z","product":"Soup Campbells Mexicali Tortilla","order_total_usd":"4.23","town":"Atlanta","country":"United States"}
2、我们想要用于数据的键是 order_id。首先,通过提供 schema 将现有 topic 注册为 KSQL 流:
CREATE STREAM purchases \
(order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
product VARCHAR, order_total_usd DOUBLE, town VARCHAR, country VARCHAR) \
WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');
3、查询流, 再次注意 ROWKEY 为 null:
ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES LIMIT 5;
null | 975 | Wine - Red, Colio Cabernet | Saint Louis | United States
null | 976 | Straws - Cocktale | Dallas | United States
null | 977 | Magnotta - Bel Paese White | Jamaica | United States
null | 978 | Cumin - Whole | Huntsville | United States
null | 979 | Beef - Top Sirloin - Aaa | Saint Louis | United States
Limit Reached
Query terminated
4、使用 PARTITION BY 创建一个新的 KSQL 流(由 Kafka topic 支持)和 re-keyed 的数据:
CREATE STREAM PURCHASES_BY_ORDER_ID AS \
SELECT * FROM PURCHASES \
PARTITION BY ORDER_ID;
如果您想要转换 topic 中的所有现有消息,在执行此语句之前,请运行 SET ‘auto.offset.reset’ = ’earliest’;。这指示 KSQL 在填充新流时从 topic 中可用的最早消息中读取。
5、查询新流, 注意现在 ROWKEY 匹配 ORDER_ID:
ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES_BY_ORDER_ID LIMIT 5;
1248 | 1248 | Hagen Daza - Dk Choocolate | Hamburg | Germany
1249 | 1249 | Sesame Seed | Youngstown | United States
1250 | 1250 | Rum - White, Gg White | Stockton | United States
1251 | 1251 | Flower - Carnations | Kansas City | United States
1252 | 1252 | Wine - White, Pelee Island | Dallas | United States
Limit Reached
Query terminated
ksql>
6、检查具有相同名称的基础 Kafka topic。显示一些消息后,按Ctrl-C取消。
请注意,表示 Kafka 消息的键的系统列 ROWKEY 与 ORDER_ID 的所需值匹配:
ksql> PRINT 'PURCHASES_BY_ORDER_ID';
Format:JSON
{"ROWTIME":1543232884300,"ROWKEY":"1317","ORDER_ID":1317,"CUSTOMER_NAME":"Guillermo McNally","DATE_OF_BIRTH":"1992-06-26T10:57:35Z","PRODUCT":"Pasta - Rotini, Dry","ORDER_TOTAL_USD":4.74,"TOWN":"Garland","COUNTRY":"United States"}
{"ROWTIME":1543232884804,"ROWKEY":"1318","ORDER_ID":1318,"CUSTOMER_NAME":"Elwira Belverstone","DATE_OF_BIRTH":"1978-01-08T06:23:08Z","PRODUCT":"Schnappes - Peach, Walkers","ORDER_TOTAL_USD":4.9,"TOWN":"Largo","COUNTRY":"United States"}
{"ROWTIME":1543232885308,"ROWKEY":"1319","ORDER_ID":1319,"CUSTOMER_NAME":"Mollie Jaycocks","DATE_OF_BIRTH":"1985-02-13T10:03:55Z","PRODUCT":"Pork - Shoulder","ORDER_TOTAL_USD":5.41,"TOWN":"Hartford","COUNTRY":"United States"}
^C{"ROWTIME":1543232885815,"ROWKEY":"1320","ORDER_ID":1320,"CUSTOMER_NAME":"Barbara Caldeiro","DATE_OF_BIRTH":"1981-07-16T04:49:59Z","PRODUCT":"Cake - Dulce De Leche","ORDER_TOTAL_USD":0.19,"TOWN":"Lynchburg","COUNTRY":"United States"}
Topic printing ceased