Wait the light to fall

KSQL Stream Processing CookBook

焉知非鱼

KSQL Stream Processing CookBook

数据过滤 #

KSQL 流式查询持续运行。您可以使用 KSQL CREATE STREAM AS 语法将流式查询输出持久保存到 Kafka topic。 KSQL 从一个 Kafka topic 中实时获取事件,转换它们并将它们连续写入另一个 topic。

此示例显示如何从入站 topic 过滤数据流数据以排除源自特定地理位置的记录。

指示 #

在此示例中,使用名为 purchase 的源事件流。

{
  "order_id": 1,
  "customer_name": "Maryanna Andryszczak",
  "date_of_birth": "1922-06-06T02:21:59Z",
  "product": "Nut - Walnut, Pieces",
  "order_total_usd": "1.65",
  "town": "Portland",
  "country": "United States"
}

1、在 KSQL 中, 注册 purchases 流:

ksql> CREATE STREAM purchases \
      (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
       product VARCHAR, order_total_usd VARCHAR, town VARCHAR, country VARCHAR) \
       WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

2、检查收到的前几条消息:

SELECT * FROM PURCHASES LIMIT 5;

3、过滤以显示国家/地区为德国的记录:

SELECT ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES WHERE COUNTRY='Germany';

4、创建一个仅包含德国订单的新 KSQL 流:

CREATE STREAM PUCHASES_GERMANY AS SELECT * FROM PURCHASES WHERE COUNTRY='Germany';

5、新流 PUCHASES_GERMANY 填充了一个同名的 Kafka topic,如下所示:

ksql> LIST TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
 _confluent-metrics | false      | 12         | 1                  | 0         | 0
 PUCHASES_GERMANY   | true       | 4          | 1                  | 0         | 0
 purchases          | true       | 1          | 1                  | 1         | 1
------------------------------------------------------------------------------------------------
ksql>

更多例子, 查看 stream-processing-cookbook

使用嵌套的 JSON 数据 #

KSQL 支持 flat 和分层(嵌套)数据结构。在此示例中,源数据采用嵌套的 JSON 格式。从 Confluent Platform 5.0 开始,KSQL 支持 STRUCT 数据类型,使您可以直接建模和访问嵌套数据结构。

指示 #

源事件流称为 user_logons。

{
  "user": {
    "first_name": "Lars",
    "last_name": "Treagus",
    "email": "ltreagus0@timesonline.co.uk"
  },
  "ip_address": "242.115.235.56",
  "logon_date": "2018-02-05T19:45:59Z"
}

1、在 KSQL 中, 注册 user_logons 流。注意 STRUCT 用于定义嵌套的用户元素(first_name, last_name, email)。

ksql> CREATE STREAM user_logons \
      (user STRUCT<\
            first_name VARCHAR, \
            last_name VARCHAR, \
            email VARCHAR>, \
       ip_address VARCHAR, \
       logon_date VARCHAR) \
WITH (KAFKA_TOPIC='user_logons', VALUE_FORMAT='JSON');

2、使用 -> 运算符访问嵌套的列。

ksql> SELECT user->first_name AS USER_FIRST_NAME, \
            user->last_name AS USER_LAST_NAME, \
            user->email AS USER_EMAIL, \
            ip_address, \
            logon_date \
        FROM user_logons;
-- Lars | Treagus | ltreagus0@timesonline.co.uk | 242.115.235.56 | 2018-02-05T19:45:59Z

3、(可选)将展平的结构保留到新的 Kafka topic,并根据到达源 topic 的新消息不断更新:

ksql> CREATE STREAM user_logons_all_cols AS \
        SELECT user->first_name AS USER_FIRST_NAME, \
                    user->last_name AS USER_LAST_NAME, \
                    user->email AS USER_EMAIL, \
                    ip_address, \
                    logon_date \
                FROM user_logons;

设置 Kafka Message Key #

Kafka 消息是键/值对。key 通常用于分区,如果将 Kafka topic 建模为 KSQL(或Kafka Streams中的KTable)中的表以进行查询或连接,则该 key 尤为重要。

通常需要在 Kafka topic 中获取数据并创建包含基于消息有效负载本身内的字段的 key 的派生版本。这可能是数据需要除已设置的 key 之外的键,或者生成应用程序未设置任何 key 的情况。后者的一个例子是 Kafka Connect的 JDBC 连接器。

指示 #

使用 PRINT 命令检查源 purchases 数据。显示一些消息后,按 Ctrl-C 取消。

请注意,表示 Kafka 消息键的系统列 ROWKEY 为 null:

ksql> PRINT 'purchases';
 Format:JSON
 {"ROWTIME":1543232251151,"ROWKEY":"null","order_id":64,"customer_name":"Denna Hoopper","date_of_birth":"1929-08-09T13:23:58Z","product":"Table Cloth 90x90 Colour","order_total_usd":"2.86","town":"Berlin","country":"Germany"}
 {"ROWTIME":1543232251621,"ROWKEY":"null","order_id":65,"customer_name":"Emera Fairham","date_of_birth":"1990-01-07T09:38:11Z","product":"Pear - Halves","order_total_usd":"0.58","town":"Newton","country":"United Kingdom"}
 {"ROWTIME":1543232252125,"ROWKEY":"null","order_id":66,"customer_name":"Stefano Gerauld","date_of_birth":"1973-02-11T05:17:18Z","product":"Soup Campbells Mexicali Tortilla","order_total_usd":"4.23","town":"Atlanta","country":"United States"}

2、我们想要用于数据的键是 order_id。首先,通过提供 schema 将现有 topic 注册为 KSQL 流:

CREATE STREAM purchases \
 (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
 product VARCHAR, order_total_usd DOUBLE, town VARCHAR, country VARCHAR) \
 WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

3、查询流, 再次注意 ROWKEY 为 null:

ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES LIMIT 5;
 null | 975 | Wine - Red, Colio Cabernet | Saint Louis | United States
 null | 976 | Straws - Cocktale | Dallas | United States
 null | 977 | Magnotta - Bel Paese White | Jamaica | United States
 null | 978 | Cumin - Whole | Huntsville | United States
 null | 979 | Beef - Top Sirloin - Aaa | Saint Louis | United States
 Limit Reached
 Query terminated

4、使用 PARTITION BY 创建一个新的 KSQL 流(由 Kafka topic 支持)和 re-keyed 的数据:

CREATE STREAM PURCHASES_BY_ORDER_ID AS \
SELECT * FROM PURCHASES \
PARTITION BY ORDER_ID;

如果您想要转换 topic 中的所有现有消息,在执行此语句之前,请运行 SET ‘auto.offset.reset’ = ’earliest’;。这指示 KSQL 在填充新流时从 topic 中可用的最早消息中读取。

5、查询新流, 注意现在 ROWKEY 匹配 ORDER_ID:

ksql> SELECT ROWKEY, ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES_BY_ORDER_ID LIMIT 5;
 1248 | 1248 | Hagen Daza - Dk Choocolate | Hamburg | Germany
 1249 | 1249 | Sesame Seed | Youngstown | United States
 1250 | 1250 | Rum - White, Gg White | Stockton | United States
 1251 | 1251 | Flower - Carnations | Kansas City | United States
 1252 | 1252 | Wine - White, Pelee Island | Dallas | United States
 Limit Reached
 Query terminated
 ksql>

6、检查具有相同名称的基础 Kafka topic。显示一些消息后,按Ctrl-C取消。

请注意,表示 Kafka 消息的键的系统列 ROWKEY 与 ORDER_ID 的所需值匹配:

ksql> PRINT 'PURCHASES_BY_ORDER_ID';
 Format:JSON
 {"ROWTIME":1543232884300,"ROWKEY":"1317","ORDER_ID":1317,"CUSTOMER_NAME":"Guillermo McNally","DATE_OF_BIRTH":"1992-06-26T10:57:35Z","PRODUCT":"Pasta - Rotini, Dry","ORDER_TOTAL_USD":4.74,"TOWN":"Garland","COUNTRY":"United States"}
 {"ROWTIME":1543232884804,"ROWKEY":"1318","ORDER_ID":1318,"CUSTOMER_NAME":"Elwira Belverstone","DATE_OF_BIRTH":"1978-01-08T06:23:08Z","PRODUCT":"Schnappes - Peach, Walkers","ORDER_TOTAL_USD":4.9,"TOWN":"Largo","COUNTRY":"United States"}
 {"ROWTIME":1543232885308,"ROWKEY":"1319","ORDER_ID":1319,"CUSTOMER_NAME":"Mollie Jaycocks","DATE_OF_BIRTH":"1985-02-13T10:03:55Z","PRODUCT":"Pork - Shoulder","ORDER_TOTAL_USD":5.41,"TOWN":"Hartford","COUNTRY":"United States"}
 ^C{"ROWTIME":1543232885815,"ROWKEY":"1320","ORDER_ID":1320,"CUSTOMER_NAME":"Barbara Caldeiro","DATE_OF_BIRTH":"1981-07-16T04:49:59Z","PRODUCT":"Cake - Dulce De Leche","ORDER_TOTAL_USD":0.19,"TOWN":"Lynchburg","COUNTRY":"United States"}
 Topic printing ceased