Wait the light to fall

在 Spark Structured Streaming 中管理 Kafka Offsets

焉知非鱼

HBase 表 #

create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>1209600}

RowKey 布局:

row:              <topicName>:<groupID>:<EPOCH_BATCHTIME_MS>
column family:    offsets
qualifier:        <PARTITION_ID>
value:            <OFFSET_ID>
/*
 Save offsets for each batch into HBase
*/
def saveOffsets(topicName: String,
                groupID: String,
                offsetRanges: Array[OffsetRange],
                hbaseTableName: String,
                batchTime: org.apache.spark.streaming.Time) = {

  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.addResource("src/main/resources/hbase-site.xml")
  val conn   = ConnectionFactory.createConnection(hbaseConf)
  val table  = conn.getTable(TableName.valueOf(hbaseTableName))
  val rowKey = topicName + ":" + groupID + ":" + String.valueOf(batchTime.milliseconds)
  val put    = new Put(rowKey.getBytes)

  for(offset <- offsetRanges){
    put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
          Bytes.toBytes(offset.untilOffset.toString))
  }

  table.put(put)
  conn.close()
}
/* Returns last committed offsets for all the partitions of a given topic from HBase in  
following  cases.
*/
    
def getLastCommittedOffsets(topicName: String,
                            groupID: String,
                            hbaseTableName: String,
                            zkQuorum: String,
                            zkRootDir: String,
                            sessionTimeout: Int,
                            connectionTimeOut: Int): Map[TopicPartition, Long] = {
 
  val hbaseConf = HBaseConfiguration.create()
  val zkUrl     = zkQuorum+"/"+zkRootDir
  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
                                                sessionTimeout,connectionTimeOut)
  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
  val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(topicName
                                                 )).get(topicName).toList.head.size
  zkClientAndConnection._1.close()
  zkClientAndConnection._2.close()
 
  //Connect to HBase to retrieve last committed offsets
  val conn     = ConnectionFactory.createConnection(hbaseConf)
  val table    = conn.getTable(TableName.valueOf(hbaseTableName))
  val startRow = topicName + ":" + groupID + ":" +
                                              String.valueOf(System.currentTimeMillis())
  val stopRow = topicName + ":" + groupID + ":" + 0
  val scan = new Scan()
  val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
                                                   stopRow.getBytes).setReversed(true))
  val result = scanner.next()
  var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
  if (result != null){
    // If the result from hbase scanner is not null, set number of partitions from hbase to the  number of cells
    hbaseNumberOfPartitionsForTopic = result.listCells().size()
  }

val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
 
  if(hbaseNumberOfPartitionsForTopic == 0){
    // initialize fromOffsets to beginning
    for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
      fromOffsets += (new TopicPartition(topicName,partition) -> 0)
    }
  } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
  // handle scenario where new partitions have been added to existing kafka topic
    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
                                        Bytes.toBytes(partition.toString)))
      fromOffsets += (new TopicPartition(topicName,partition) -> fromOffset.toLong)
    }
    for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
      fromOffsets += (new TopicPartition(topicName,partition) -> 0)
    }
  } else {
  //initialize fromOffsets from last run
    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
                                        Bytes.toBytes(partition.toString)))
      fromOffsets += (new TopicPartition(topicName,partition) -> fromOffset.toLong)
    }
  }
  scanner.close()
  conn.close()
  fromOffsets.toMap
}