阅读:4965回复:0
搭建 Spark 交互引擎搭建 Spark 交互引擎 前言 最近,帮助公司搭建了一套基于 Spark 的即席查询平台,受到 Databricks分析平台 的启发,将 Spark Shell 和 Spark SQL 通过一个共同的接口整合在了一起,用户可以采用 SQL 和 Scala Code 混合编程的方式来实现自己相关的分析需求。其中这套平台的实现基石是 Spark REPL,而 Spark REPL 则是在 Scala REPL 中加入自己的 Spark 相关环境,比如说: SparkSession、SparkContext以及一些通用包、隐式转换的引入。Livy 对于 Spark REPL 有了一套比较健全的实现机制,笔者根据实际的项目需求,将这部分代码抽取出来、再加工。相关的代码在 githup 中所示。 理解 Spark REPL 走进 Spark-Shell 跟踪 Spark-Shell 脚本的执行情况,发现最后调用的是 org.apache.spark.repl.Main.main() ,跟踪代码流程,发现其执行的核心步骤: // 代码里面没有,笔者自行补充的,因为在 sparkILoop.process(settings) 中会初始化 SparkSession, 如果没有 Spark 环境的话,会报错。通过以上代码可以发现,Spark REPL 的核心 是 SparkILoop ,我们想要的只是通过纯代码的形式和 Spark 进行交互,而不需要交互界面功能以及去掉 welcome 打印信息等等,我们对 sparkILoop.process(settings) 代码进行了深层次的研究,发现实际所需的操作为: sparkILoop.settings = settings 完整代码 val sparkILoop = new SparkILoop 添加 Spark 执行环境 继续研究 SparkILoop 代码,发现其提供 bind() ,用于变量的绑定 val spark = SparkSession .builder() .master("local") .appName("demo") .getOrCreate() sparkILoop.bind("spark",spark.getClass.getCanonicalName, spark, List("""@transient""")) sparkILoop.bind("sc","org.apache.spark.SparkContext", spark.sparkContext, List("""@transient""")) // Spark 环境测试 intp.interpret( """ |println(spark.sessionState.conf.getAllConfs) """.stripMargin) |
|