Wait the light to fall

Kafka KSQL 用例

焉知非鱼

有一次, 在威马项目, 项目经理说, xxx, 你能不能给我查一下 kafka 中, 这个车某一时间段的数据? 我说不好查, 查询 Kafka 中某一条记录, 需要 Consumer 程序来消费。虽然 kt 之类的 Kafka 命令行工具可以查询 Kafka 中数据. 但是能力有限, 它只能从某个 offset 开始查询, 满足不了我们的过滤条件。

幸运的是, ksql 可以让我们像查询 sql 一样来查询 kafka, 毕竟写 sql, 谁不会呢?于是我们到它的官网, 看到 ksql 这个产品的宣传是 Streaming SQL for Apache Kafka。 毕竟底层用的是 Kafka-Streams, 所以 ksql 支持流式查询。我们下载最新的 ksql。我们将程序解压到 ~/opt/confluent-5.0.0

启动 zk #

cd ~/opt/confluent-5.0.0
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

启动 kafka #

cd ~/opt/confluent-5.0.0
bin/kafka-server-start -daemon etc/kafka/server.properties

创建 topic 和 data #

confluent 自带了一个 ksql-datagen 工具,可以创建和产生相关的 topic 和数据,

  • 创建 pageviews,数据格式为 delimited
cd ~/opt/confluent-5.0.0/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
  • 创建 users,数据格式为 json
cd ~/opt/confluent-5.0.0/bin
./ksql-datagen quickstart=users format=json topic=users maxInterval=100

启动 ksql #

cd ~/opt/confluent-5.0.0/
bin/ksql-server-start -daemon etc/ksql/ksql-server.properties

连接 ksql #

cd ~/opt/confluent-5.0.0/
bin/ksql http://127.0.0.1:8088

创建 stream 和 table #

  • stream

根据 topic pageviews 创建一个 stream 为 pageviews_original,value_format 为 DELIMITED

ksql>CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
  • table

根据 topic users 创建一个 table 为 users_original,value_format 为 json

ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH (kafka_topic='users', value_format='JSON', key = 'userid');

查询数据 #

ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
ksql> SELECT * FROM pageviews_original LIMIT 3;

ksql 默认是从 kafka 最新的数据查询消费的,如果你想从开头查询,则需要在会话上进行设置:SET ‘auto.offset.reset’ = ’earliest’;

持久化查询 #

持久化查询可以源源不断的把查询出的数据发送到你指定的 topic 中去,查询的时候在 select 前面添加 create stream 关键字即可创建持久化查询。

ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;
  • 查询新 stream
ksql> SHOW STREAMS;

可以看到新创建了stream PAGEVIEWS2,并且创建了topic PAGEVIEWS2

  • 查询执行任务
ksql> SHOW QUERIES;

可以看到 ID 为 CSAS_PAGEVIEWS2_0 的任务在执行,并且有显示执行的语句。

消费新数据 #

cd cd ~/opt/confluent-5.0.0/bin
./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic PAGEVIEWS2

可以看到PAGEVIEWS2 topic里面正是我们通过select筛选出来的数据

终止查询任务 #

ksql> TERMINATE CSAS_PAGEVIEWS2_0;