Wait the light to fall

用于 Spark 结构化流的 Cassandra 接收器

焉知非鱼

Cassandra Sink for Spark Structured Streaming

我最近开始使用 Spark,并且必须将结构化流式 API 生成的结果存储在 Cassandra 数据库中。

在这篇文章中,我提供了一个如何在 Spark Structured Streaming 中创建和使用 Cassandra 接收器的简单示例。 我希望它对那些刚刚开始使用 Structured Streaming API 并想知道如何将它与数据库连接的人有用。

应用程序的想法非常简单。 它从 Kafka 读取消息,解析它们并将它们保存到 Cassandra 中。

Why Structured Streaming?

文档中可以看出,Structured Streaming 是一个基于 Spark SQL 引擎的可扩展且容错的流处理引擎。 您可以使用 Dataset/DataFrame API 来表示流聚合,事件时间窗口,流到批处理连接等。在 Spark 中进行流式计算,并允许使用熟悉的 SQL 处理流数据。

What Is the Problem?

Spark Structured Streaming 在 2017 年已经被标记为稳定。因此,它是一个相对较新的 API,并不是所有功能都在那里。例如,有几种类型的内置输出接收器:File,Kafka,Console 和内存接收器。但是,如果您想将流式计算的结果输出到数据库中,则需要使用 foreach 接收器并实现 ForeachWriter 接口。从 Spark 2.3.1 开始,这仅适用于 Scala 和 Java。

在这里,我假设您已经大致了解 Structured Streaming 如何工作,并且知道如何读取和处理流数据,现在可以将其输出到数据库中。如果上述某些步骤不清楚,可以使用一些很好的在线资源来帮助您开始使用结构化流。特别是,官方文档是一个很好的起点。在本文中,我想关注您需要将结果存储在数据库中的最后一步。

我将描述如何为结构化流实现 Cassandra 接收器,提供一个简单示例,并解释如何在集群上运行它。完整代码可在此处获得。

当我最初遇到上述问题时,这个项目非常有帮助。但是,如果您刚开始使用结构化流媒体并且需要一个如何将数据输出到 Cassandra 的简单示例,那么该回购可能看起来很复杂。此外,该代码在 Spark 本地模式下工作,并且需要在群集上运行一些更改。

此外,还有很好的示例说明如何为结构化流创建 JDBC 接收器MongoDB 接收器

Simple Solution

要将数据发送到外部系统,您需要使用 foreach 接收器。 你可以在这里读更多关于它的内容。 简而言之,您需要实现 ForeachWriter 接口。 那就是定义如何打开连接,处理每个数据分区,以及在处理结束时关闭连接。 源代码如下:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output
  val cassandraDriver = new CassandraDriver();
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }
  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record(0)}', '${record(1)}', '${record(2)}')""")
    )
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

你会发现 CassandraDriver 的定义和下面的输出表,但在此之前让我们仔细看看上面的代码是如何工作的。 在这里,为了从 Spark 连接到 Cassandra,我创建了 CassandraDriver 对象,该对象提供对 CassandraConnector 的访问 - CassandraConnector 是 DataStax 中广泛使用的连接器。 你可以在这里读更多关于它的内容。 CassandraConnector 负责打开和关闭与数据库的连接,因此我只需在 CassandraSinkForeach 类中的 open 和 close 方法中打印调试消息。

上面的代码在主应用程序中调用如下:

val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

将为每一行创建 CassandraSinkForeach,每个 worker 将其部分行插入到数据库中。 所以,每个 worker 都运行 val cassandraDriver = new CassandraDriver(); 这就是 CassandraDriver 类的样子:

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession
  import spark.implicits._
  val connector = CassandraConnector(spark.sparkContext.getConf)
  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       fx_marker text,
       timestamp_ms timestamp,
       timestamp_dt date,
       primary key (fx_marker));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

让我们仔细看看上面代码中的 spark 对象。 这是 SparkSessionBuilder 的代码:

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")
    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()
    spark
  }
}

在每个 worker 上,SparkSessionBuilder 提供对在驱动程序上创建的SparkSession的访问。 为了使它工作,我们需要使SparkSessionBuilder可序列化并使用@transient lazy val,它允许序列化系统忽略conf和spark对象。 现在,buildSparkSession正在被序列化并发送给每个worker,但是当worker需要conf和spark对象时,它们正在被解析。

现在让我们看一下应用程序主体:

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession
    import spark.implicits._
    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"
    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
    {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
    {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
    {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
    */
    // Read incoming stream
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()
    val schema = StructType(
      Seq(
        StructField("fx_marker", StringType, false),
        StructField("timestamp_ms", StringType, false)
      )
    )
    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))
    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
    // Process data. Create a new date column
    val parsed = jsons
      .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
      .filter("fx_marker != ''")
    // Output results into a database
    val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()
    sink.awaitTermination()
  }
}

当应用程序被发送到执行时,buildSparkSession 被序列化并发送给 worker,但是,conf 和 spark 对象尚未解决。 接下来,driver 在 KafkaToCassandra 中创建 spark 对象,并在 executors 之间分配工作。 worker 从 Kafka 读取数据,进行简单的转换,当 worker 准备将结果写入数据库时,他们解析 conf 和 spark 对象,从而访问在 driver 上创建的 SparkSession。

How to Build and Run the App?

当我从 PySpark 迁移到 Scala 时,我花了一些时间来了解如何构建应用程序。 所以,我将 Maven pom.xml 包含在 repo 中。

您可以通过运行 mvn package 命令使用 Maven 构建应用程序。 之后,您可以使用执行应用程序

./bin/spark-submit \ 
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 \
--class com.insight.app.CassandraSink.KafkaToCassandra \
--master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 \
target/cassandra-sink-0.0.1-SNAPSHOT.jar

此示例在 AWS 群集上运行,因此,如果您要测试它,只需用您的 AWS 实例替换我的 AWS 实例的地址(所有内容类似于ec2-xx-xxx-xx-xx.compute-1.amazonaws.com)。