Wait the light to fall

在 IDEA 中运行 Flink 程序

焉知非鱼

我们需要把 Flink 安装路径中的 optlib 添加到项目的 Libraries 中。

依次点击 File -> Project Structure -> Project Settings -> Libraries, 然后点击右侧栏中的加号 , 选择 Java, 在弹出的对话框中选择 Flink 安装路径。

我的 Flink 安装在 /usr/local/Cellar/apache-flink/1.8.0/libexec, 默认是隐藏的, 所以要点击 Macintosh HD, 同时按住 Command + Shift + . 即可显示出隐藏目录, 分别添加 opt 目录和 lib 目录到相应的模块的 Libraries 中。这样就可以在 IDEA 中直接运行 Flink 程序了。

tree /usr/local/Cellar/apache-flink/1.8.0/libexec
├── bin
│   ├── config.sh
│   ├── start-cluster.sh
│   ├── ...
├── conf
│   ├── flink-conf.yaml
│   ├── log4j-cli.properties
│   ├── ...
├── lib
│   ├── flink-dist_2.11-1.8.0.jar
│   ├── log4j-1.2.17.jar
│   └── slf4j-log4j12-1.7.15.jar
├── libexec
│   ├── config.sh
│   ├── flink
│   ├── standalone-job.sh
│   ├── start-cluster.sh
│   └── ...
├── log
│   ├── flink-ohmycloud-client-summer.log
│   ├── flink-root-standalonesession-0-summer.log
│   ├── ...
└── opt
    ├── flink-cep-scala_2.11-1.8.0.jar
    ├── flink-cep_2.11-1.8.0.jar
    ├── flink-gelly-scala_2.11-1.8.0.jar
    ├── flink-gelly_2.11-1.8.0.jar
    ├── flink-metrics-datadog-1.8.0.jar
    ├── flink-metrics-graphite-1.8.0.jar
    ├── flink-metrics-influxdb-1.8.0.jar
    ├── flink-metrics-prometheus-1.8.0.jar
    ├── flink-metrics-slf4j-1.8.0.jar
    ├── flink-metrics-statsd-1.8.0.jar
    ├── flink-ml_2.11-1.8.0.jar
    ├── flink-oss-fs-hadoop-1.8.0.jar
    ├── flink-python_2.11-1.8.0.jar
    ├── flink-queryable-state-runtime_2.11-1.8.0.jar
    ├── flink-s3-fs-hadoop-1.8.0.jar
    ├── flink-s3-fs-presto-1.8.0.jar
    ├── flink-sql-client_2.11-1.8.0.jar
    ├── flink-streaming-python_2.11-1.8.0.jar
    ├── flink-swift-fs-hadoop-1.8.0.jar
    └── flink-table_2.11-1.8.0.jar

我们跑一个程序试试:

package datastream.org.wm.dtc

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

object KafkaProducer extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream: DataStream[String] = env.fromElements(
    """{"vin":"LMGAJ1S85J1003265","vintype":"A2APHEV","signals":"{\"时间\":20190510172335,\"数据头\":{\"answerFlag\":\"车载终端控制命令\",\"dataUnitLen\":299,\"orderFlag\":\"实时信息上传\",\"time\":20190510172335,\"vin\":\"LMGAJ1S85J1003265\"}}"}"""
  )
  val myProducer = new FlinkKafkaProducer[String](
    "localhost:9092",
    "test",
    new SimpleStringSchema
  )
  stream.addSink(myProducer)
  env.execute("start kafka producer")
}

直接在 IDEA 中运行, 就往 kafka 里面发送了一条数据。