Wait the light to fall

写入到 Elasticsearch

焉知非鱼

如果你使用 Elasticsearch 6.x 或者更高版本, 那么用 Spark Structured Streaming 写入到 Elasticsearch 就很直截了当。

不像 5.x 之前的版本那样, 为了写入到 Elasticsearch, 你必须实现一个自定义的 sink, 6.x 版本在 Spark Structured Streaming 里面开箱即用。

依赖如下:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.2.4</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.main}</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.main}</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.main}</artifactId>
    <version>2.3.1</version>
</dependency>

然后写入到 Elasticsearch 就相当简单了。

  • 1、创建带有 Elasticsearch 配置的 SparkSession
val spark = SparkSession
  .builder
  // run app locally utilizing all cores
  .master("local[*]")
  .appName(getClass.getName)
  .config("es.nodes", "localhost")
  .config("es.port", "9200")
  .config("es.index.auto.create", "true") // this will ensure that index is also created on first POST
  .config("es.nodes.wan.only", "true") // needed to run against dockerized ES for local tests
  .config("es.net.http.auth.user", "elastic")
  .config("es.net.http.auth.pass", "changeme")
  .getOrCreate()
  • 2、从 socket 源中读取 csv 输入
import spark.implicits._

val customerEvents: Dataset[Customer] = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "5555")
  // maximum number of lines processed per trigger interval
  .option("maxFilesPerTrigger", 5)
  .load()
  .as[String]
  .map(line => {
    val cols = line.split(",")
    val age = cols(2).toInt
    Customer(cols(0), cols(1), age, age > 18, System.currentTimeMillis())
  })
  • 3、写入到 Elasticsearch sink
customerEvents
  .writeStream
  .outputMode(OutputMode.Append())
  .format("es")
  .option("checkpointLocation", "/tmp/checkpointLocation")
  .option("es.mapping.id", "id")
  .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
  .start("customer/profile")
  .awaitTermination()

完整的代码如下:

import java.util.concurrent.TimeUnit

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{Dataset, SparkSession}

object ESWriter6 {

  case class Customer(id: String,
                      name: String,
                      age: Int,
                      isAdult: Boolean,
                      nowEpoch: Long)
  

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      // run app locally utilizing all cores
      .master("local[*]")
      .appName(getClass.getName)
      .config("es.nodes", "localhost")
      .config("es.port", "9200")
      .config("es.index.auto.create", "true") // this will ensure that index is also created on first POST
      .config("es.nodes.wan.only", "true") // needed to run against dockerized ES for local tests
      .config("es.net.http.auth.user", "elastic")
      .config("es.net.http.auth.pass", "changeme")
      .getOrCreate()

    import spark.implicits._

    val customerEvents: Dataset[Customer] = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", "5555")
      // maximum number of lines processed per trigger interval
      .option("maxFilesPerTrigger", 5)
      .load()
      .as[String]
      .map(line => {
        val cols = line.split(",")
        val age = cols(2).toInt
        Customer(cols(0), cols(1), age, age > 18, System.currentTimeMillis())
      })

    customerEvents
      .writeStream
      .outputMode(OutputMode.Append())
      .format("es")
      .option("checkpointLocation", "/tmp/checkpointLocation")
      .option("es.mapping.id", "id")
      .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
      .start("customer/profile")
      .awaitTermination()
  }

}