Wait the light to fall

在 Spark Streaming 程序中执行 shell 命令

焉知非鱼

我有个 Spark FileStreaming 程序,当监控到一个批次完成后,就执行 hdfs 的 mv 命令:

import scala.sys.process._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class StreamingMonitor(ssc: StreamingContext) extends StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    try {
      val day = Seq("sh", "-c", "hdfs dfs -cat /tmp/apps/days.txt | head -1").!!.trim // 取日期文本文件的第一行

      Seq("sh", "-c", s"""
if hdfs dfs -test -e /a26_adapter_data/nation/vintype=A26/d=${day}; then
    hdfs dfs -mv /a26_adapter_data/nation/vintype=A26/d=${day} /daily_parquet/nation/vintype=A26
fi
""").!!

      Seq("sh", "-c", s"""
if hdfs dfs -test -e /a26_adapter_data/enterprise/vintype=A26/dt=${day}; then
    hdfs dfs -mv /a26_adapter_data/enterprise/vintype=A26/dt=${day} /daily_parquet/enterprise/vintype=A26
fi
""").!!

      Seq("sh", "-c", "hdfs dfs -cat /tmp/apps/days.txt | sed '1d' | hdfs dfs -put -f - /tmp/apps/days.txt").!! // 删除第一行
    } catch {
      case e: Exception =>
        println(e)
    }
  }
}

没成功, 查看日志,报了如下错误:

mv: Permission denied: user=yarn, access=WRITE, inode="/a26_adapter_data/nation/vintype=A26":hdfs:supergroup:drwxr-xr-x
mv: Permission denied: user=yarn, access=WRITE, inode="/a26_adapter_data/nation/vintype=A26":hdfs:supergroup:drwxr-xr-x

解决方法, 修改 mv 目的地址的权限为 777:

sudo -u hdfs hdfs dfs -chmod -R 777 /a26_adapter_data/