在 IDEA 中运行 Flink 程序

我们需要把 Flink 安装路径中的 optlib 添加到项目的 Libraries 中。

依次点击 File -> Project Structure -> Project Settings -> Libraries, 然后点击右侧栏中的加号 , 选择 Java, 在弹出的对话框中选择 Flink 安装路径。

我的 Flink 安装在 /usr/local/Cellar/apache-flink/1.8.0/libexec, 默认是隐藏的, 所以要点击 Macintosh HD, 同时按住 Command + Shift + . 即可显示出隐藏目录, 分别添加 opt 目录和 lib 目录到相应的模块的 Libraries 中。这样就可以在 IDEA 中直接运行 Flink 程序了。

1
tree /usr/local/Cellar/apache-flink/1.8.0/libexec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
├── bin
│   ├── config.sh
│   ├── start-cluster.sh
│   ├── ...
├── conf
│   ├── flink-conf.yaml
│   ├── log4j-cli.properties
│   ├── ...
├── lib
│   ├── flink-dist_2.11-1.8.0.jar
│   ├── log4j-1.2.17.jar
│   └── slf4j-log4j12-1.7.15.jar
├── libexec
│   ├── config.sh
│   ├── flink
│   ├── standalone-job.sh
│   ├── start-cluster.sh
│   └── ...
├── log
│   ├── flink-ohmycloud-client-summer.log
│   ├── flink-root-standalonesession-0-summer.log
│   ├── ...
└── opt
├── flink-cep-scala_2.11-1.8.0.jar
├── flink-cep_2.11-1.8.0.jar
├── flink-gelly-scala_2.11-1.8.0.jar
├── flink-gelly_2.11-1.8.0.jar
├── flink-metrics-datadog-1.8.0.jar
├── flink-metrics-graphite-1.8.0.jar
├── flink-metrics-influxdb-1.8.0.jar
├── flink-metrics-prometheus-1.8.0.jar
├── flink-metrics-slf4j-1.8.0.jar
├── flink-metrics-statsd-1.8.0.jar
├── flink-ml_2.11-1.8.0.jar
├── flink-oss-fs-hadoop-1.8.0.jar
├── flink-python_2.11-1.8.0.jar
├── flink-queryable-state-runtime_2.11-1.8.0.jar
├── flink-s3-fs-hadoop-1.8.0.jar
├── flink-s3-fs-presto-1.8.0.jar
├── flink-sql-client_2.11-1.8.0.jar
├── flink-streaming-python_2.11-1.8.0.jar
├── flink-swift-fs-hadoop-1.8.0.jar
└── flink-table_2.11-1.8.0.jar

我们跑一个程序试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package datastream.org.wm.dtc

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

object KafkaProducer extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[String] = env.fromElements(
"""{"vin":"LMGAJ1S85J1003265","vintype":"A2APHEV","signals":"{\"时间\":20190510172335,\"数据头\":{\"answerFlag\":\"车载终端控制命令\",\"dataUnitLen\":299,\"orderFlag\":\"实时信息上传\",\"time\":20190510172335,\"vin\":\"LMGAJ1S85J1003265\"}}"}"""
)
val myProducer = new FlinkKafkaProducer[String](
"localhost:9092",
"test",
new SimpleStringSchema
)
stream.addSink(myProducer)
env.execute("start kafka producer")
}

直接在 IDEA 中运行, 就往 kafka 里面发送了一条数据。