写入到 Elasticsearch
— 焉知非鱼如果你使用 Elasticsearch 6.x 或者更高版本, 那么用 Spark Structured Streaming
写入到 Elasticsearch 就很直截了当。
不像 5.x 之前的版本那样, 为了写入到 Elasticsearch, 你必须实现一个自定义的 sink, 6.x 版本在 Spark Structured Streaming 里面开箱即用。
依赖如下:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.main}</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.main}</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.main}</artifactId>
<version>2.3.1</version>
</dependency>
然后写入到 Elasticsearch 就相当简单了。
- 1、创建带有 Elasticsearch 配置的
SparkSession
val spark = SparkSession
.builder
// run app locally utilizing all cores
.master("local[*]")
.appName(getClass.getName)
.config("es.nodes", "localhost")
.config("es.port", "9200")
.config("es.index.auto.create", "true") // this will ensure that index is also created on first POST
.config("es.nodes.wan.only", "true") // needed to run against dockerized ES for local tests
.config("es.net.http.auth.user", "elastic")
.config("es.net.http.auth.pass", "changeme")
.getOrCreate()
- 2、从 socket 源中读取 csv 输入
import spark.implicits._
val customerEvents: Dataset[Customer] = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "5555")
// maximum number of lines processed per trigger interval
.option("maxFilesPerTrigger", 5)
.load()
.as[String]
.map(line => {
val cols = line.split(",")
val age = cols(2).toInt
Customer(cols(0), cols(1), age, age > 18, System.currentTimeMillis())
})
- 3、写入到 Elasticsearch sink
customerEvents
.writeStream
.outputMode(OutputMode.Append())
.format("es")
.option("checkpointLocation", "/tmp/checkpointLocation")
.option("es.mapping.id", "id")
.trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
.start("customer/profile")
.awaitTermination()
完整的代码如下:
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{Dataset, SparkSession}
object ESWriter6 {
case class Customer(id: String,
name: String,
age: Int,
isAdult: Boolean,
nowEpoch: Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
// run app locally utilizing all cores
.master("local[*]")
.appName(getClass.getName)
.config("es.nodes", "localhost")
.config("es.port", "9200")
.config("es.index.auto.create", "true") // this will ensure that index is also created on first POST
.config("es.nodes.wan.only", "true") // needed to run against dockerized ES for local tests
.config("es.net.http.auth.user", "elastic")
.config("es.net.http.auth.pass", "changeme")
.getOrCreate()
import spark.implicits._
val customerEvents: Dataset[Customer] = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "5555")
// maximum number of lines processed per trigger interval
.option("maxFilesPerTrigger", 5)
.load()
.as[String]
.map(line => {
val cols = line.split(",")
val age = cols(2).toInt
Customer(cols(0), cols(1), age, age > 18, System.currentTimeMillis())
})
customerEvents
.writeStream
.outputMode(OutputMode.Append())
.format("es")
.option("checkpointLocation", "/tmp/checkpointLocation")
.option("es.mapping.id", "id")
.trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
.start("customer/profile")
.awaitTermination()
}
}