在 Spark Streaming 程序中执行 shell 命令
— 焉知非鱼我有个 Spark FileStreaming 程序,当监控到一个批次完成后,就执行 hdfs 的 mv 命令:
import scala.sys.process._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
class StreamingMonitor(ssc: StreamingContext) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
try {
val day = Seq("sh", "-c", "hdfs dfs -cat /tmp/apps/days.txt | head -1").!!.trim // 取日期文本文件的第一行
Seq("sh", "-c", s"""
if hdfs dfs -test -e /a26_adapter_data/nation/vintype=A26/d=${day}; then
hdfs dfs -mv /a26_adapter_data/nation/vintype=A26/d=${day} /daily_parquet/nation/vintype=A26
fi
""").!!
Seq("sh", "-c", s"""
if hdfs dfs -test -e /a26_adapter_data/enterprise/vintype=A26/dt=${day}; then
hdfs dfs -mv /a26_adapter_data/enterprise/vintype=A26/dt=${day} /daily_parquet/enterprise/vintype=A26
fi
""").!!
Seq("sh", "-c", "hdfs dfs -cat /tmp/apps/days.txt | sed '1d' | hdfs dfs -put -f - /tmp/apps/days.txt").!! // 删除第一行
} catch {
case e: Exception =>
println(e)
}
}
}
没成功, 查看日志,报了如下错误:
mv: Permission denied: user=yarn, access=WRITE, inode="/a26_adapter_data/nation/vintype=A26":hdfs:supergroup:drwxr-xr-x
mv: Permission denied: user=yarn, access=WRITE, inode="/a26_adapter_data/nation/vintype=A26":hdfs:supergroup:drwxr-xr-x
解决方法, 修改 mv 目的地址的权限为 777
:
sudo -u hdfs hdfs dfs -chmod -R 777 /a26_adapter_data/