KSQL Stream Processing CookBook

KSQL Stream Processing CookBook

数据过滤

KSQL 流式查询持续运行。您可以使用 KSQL CREATE STREAM AS 语法将流式查询输出持久保存到 Kafka topic。 KSQL 从一个 Kafka topic 中实时获取事件,转换它们并将它们连续写入另一个 topic。

此示例显示如何从入站 topic 过滤数据流数据以排除源自特定地理位置的记录。

指示

在此示例中,使用名为 purchase 的源事件流。

1
2
3
4
5
6
7
8
9
{
"order_id": 1,
"customer_name": "Maryanna Andryszczak",
"date_of_birth": "1922-06-06T02:21:59Z",
"product": "Nut - Walnut, Pieces",
"order_total_usd": "1.65",
"town": "Portland",
"country": "United States"
}

1、在 KSQL 中, 注册 purchases 流:

1
2
3
4
ksql> CREATE STREAM purchases \
(order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
product VARCHAR, order_total_usd VARCHAR, town VARCHAR, country VARCHAR) \
WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

2、检查收到的前几条消息:

1
SELECT * FROM PURCHASES LIMIT 5;

3、过滤以显示国家/地区为德国的记录:

1
SELECT ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES WHERE COUNTRY='Germany';

4、创建一个仅包含德国订单的新 KSQL 流:

1
CREATE STREAM PUCHASES_GERMANY AS SELECT * FROM PURCHASES WHERE COUNTRY='Germany';

5、新流 PUCHASES_GERMANY 填充了一个同名的 Kafka topic,如下所示:

1
2
3
4
5
6
7
8
9
ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
PUCHASES_GERMANY | true | 4 | 1 | 0 | 0
purchases | true | 1 | 1 | 1 | 1
------------------------------------------------------------------------------------------------
ksql>

更多例子, 查看 stream-processing-cookbook

使用嵌套的 JSON 数据

KSQL 支持 flat 和分层(嵌套)数据结构。在此示例中,源数据采用嵌套的 JSON 格式。从 Confluent Platform 5.0 开始,KSQL 支持 STRUCT 数据类型,使您可以直接建模和访问嵌套数据结构。

指示

源事件流称为 user_logons。

1
2
3
4
5
6
7
8
9
{
"user": {
"first_name": "Lars",
"last_name": "Treagus",
"email": "ltreagus0@timesonline.co.uk"
},
"ip_address": "242.115.235.56",
"logon_date": "2018-02-05T19:45:59Z"
}

1、在 KSQL 中, 注册 user_logons 流。注意 STRUCT 用于定义嵌套的用户元素(first_name, last_name, email)。

1
2
3
4
5
6
7
8
ksql> CREATE STREAM user_logons \
(user STRUCT<\
first_name VARCHAR, \
last_name VARCHAR, \
email VARCHAR>, \
ip_address VARCHAR, \
logon_date VARCHAR) \
WITH (KAFKA_TOPIC='user_logons', VALUE_FORMAT='JSON');

2、使用 -> 运算符访问嵌套的列。

1
2
3
4
5
6
7
ksql> SELECT user->first_name AS USER_FIRST_NAME, \
user->last_name AS USER_LAST_NAME, \
user->email AS USER_EMAIL, \
ip_address, \
logon_date \
FROM user_logons;
-- Lars | Treagus | ltreagus0@timesonline.co.uk | 242.115.235.56 | 2018-02-05T19:45:59Z

3、(可选)将展平的结构保留到新的 Kafka topic,并根据到达源 topic 的新消息不断更新:

1
2
3
4
5
6
7
ksql> CREATE STREAM user_logons_all_cols AS \
SELECT user->first_name AS USER_FIRST_NAME, \
user->last_name AS USER_LAST_NAME, \
user->email AS USER_EMAIL, \
ip_address, \
logon_date \
FROM user_logons;

设置 Kafka Message Key

Kafka 消息是键/值对。key 通常用于分区,如果将 Kafka topic 建模为 KSQL(或Kafka Streams中的KTable)中的表以进行查询或连接,则该 key 尤为重要。

通常需要在 Kafka topic 中获取数据并创建包含基于消息有效负载本身内的字段的 key 的派生版本。这可能是数据需要除已设置的 key 之外的键,或者生成应用程序未设置任何 key 的情况。后者的一个例子是 Kafka Connect的 JDBC 连接器。

指示

使用 PRINT 命令检查源 purchases 数据。显示一些消息后,按 Ctrl-C 取消。

请注意,表示 Kafka 消息键的系统列 ROWKEY 为 null:

1
2
3
4
5
ksql> PRINT 'purchases';
Format:JSON
{"ROWTIME":1543232251151,"ROWKEY":"null","order_id":64,"customer_name":"Denna Hoopper","date_of_birth":"1929-08-09T13:23:58Z","product":"Table Cloth 90x90 Colour","order_total_usd":"2.86","town":"Berlin","country":"Germany"}
{"ROWTIME":1543232251621,"ROWKEY":"null","order_id":65,"customer_name":"Emera Fairham","date_of_birth":"1990-01-07T09:38:11Z","product":"Pear - Halves","order_total_usd":"0.58","town":"Newton","country":"United Kingdom"}
{"ROWTIME":1543232252125,"ROWKEY":"null","order_id":66,"customer_name":"Stefano Gerauld","date_of_birth":"1973-02-11T05:17:18Z","product":"Soup Campbells Mexicali Tortilla","order_total_usd":"4.23","town":"Atlanta","country":"United States"}

2、我们想要用于数据的键是 order_id。首先,通过提供 schema 将现有 topic 注册为 KSQL 流:

1
2
3
4
CREATE STREAM purchases \
(order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
product VARCHAR, order_total_usd DOUBLE, town VARCHAR, country VARCHAR) \
WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

3、查询流, 再次注意 ROWKEY 为 null:

1
2
3
4
5
6
7
8
ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES LIMIT 5;
null | 975 | Wine - Red, Colio Cabernet | Saint Louis | United States
null | 976 | Straws - Cocktale | Dallas | United States
null | 977 | Magnotta - Bel Paese White | Jamaica | United States
null | 978 | Cumin - Whole | Huntsville | United States
null | 979 | Beef - Top Sirloin - Aaa | Saint Louis | United States
Limit Reached
Query terminated

4、使用 PARTITION BY 创建一个新的 KSQL 流(由 Kafka topic 支持)和 re-keyed 的数据:

1
2
3
CREATE STREAM PURCHASES_BY_ORDER_ID AS \
SELECT * FROM PURCHASES \
PARTITION BY ORDER_ID;

如果您想要转换 topic 中的所有现有消息,在执行此语句之前,请运行 SET ‘auto.offset.reset’ = ‘earliest’;。这指示 KSQL 在填充新流时从 topic 中可用的最早消息中读取。

5、查询新流, 注意现在 ROWKEY 匹配 ORDER_ID:

1
2
3
4
5
6
7
8
9
ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES_BY_ORDER_ID LIMIT 5;
1248 | 1248 | Hagen Daza - Dk Choocolate | Hamburg | Germany
1249 | 1249 | Sesame Seed | Youngstown | United States
1250 | 1250 | Rum - White, Gg White | Stockton | United States
1251 | 1251 | Flower - Carnations | Kansas City | United States
1252 | 1252 | Wine - White, Pelee Island | Dallas | United States
Limit Reached
Query terminated
ksql>

6、检查具有相同名称的基础 Kafka topic。显示一些消息后,按Ctrl-C取消。

请注意,表示 Kafka 消息的键的系统列 ROWKEY 与 ORDER_ID 的所需值匹配:

1
2
3
4
5
6
7
ksql> PRINT 'PURCHASES_BY_ORDER_ID';
Format:JSON
{"ROWTIME":1543232884300,"ROWKEY":"1317","ORDER_ID":1317,"CUSTOMER_NAME":"Guillermo McNally","DATE_OF_BIRTH":"1992-06-26T10:57:35Z","PRODUCT":"Pasta - Rotini, Dry","ORDER_TOTAL_USD":4.74,"TOWN":"Garland","COUNTRY":"United States"}
{"ROWTIME":1543232884804,"ROWKEY":"1318","ORDER_ID":1318,"CUSTOMER_NAME":"Elwira Belverstone","DATE_OF_BIRTH":"1978-01-08T06:23:08Z","PRODUCT":"Schnappes - Peach, Walkers","ORDER_TOTAL_USD":4.9,"TOWN":"Largo","COUNTRY":"United States"}
{"ROWTIME":1543232885308,"ROWKEY":"1319","ORDER_ID":1319,"CUSTOMER_NAME":"Mollie Jaycocks","DATE_OF_BIRTH":"1985-02-13T10:03:55Z","PRODUCT":"Pork - Shoulder","ORDER_TOTAL_USD":5.41,"TOWN":"Hartford","COUNTRY":"United States"}
^C{"ROWTIME":1543232885815,"ROWKEY":"1320","ORDER_ID":1320,"CUSTOMER_NAME":"Barbara Caldeiro","DATE_OF_BIRTH":"1981-07-16T04:49:59Z","PRODUCT":"Cake - Dulce De Leche","ORDER_TOTAL_USD":0.19,"TOWN":"Lynchburg","COUNTRY":"United States"}
Topic printing ceased