陈冲
新手
新手
  • UID449
  • 粉丝0
  • 关注0
  • 发帖数2
阅读:4965回复:0

搭建 Spark 交互引擎

楼主#
更多 发布于:2019-07-30 15:18


搭建 Spark 交互引擎




前言


最近,帮助公司搭建了一套基于 Spark 的即席查询平台,受到 Databricks分析平台 的启发,将 Spark Shell 和 Spark SQL 通过一个共同的接口整合在了一起,用户可以采用 SQL 和 Scala Code 混合编程的方式来实现自己相关的分析需求。其中这套平台的实现基石是 Spark REPL,而 Spark REPL  则是在 Scala REPL 中加入自己的 Spark 相关环境,比如说: SparkSession、SparkContext以及一些通用包、隐式转换的引入。Livy 对于 Spark REPL 有了一套比较健全的实现机制,笔者根据实际的项目需求,将这部分代码抽取出来、再加工。相关的代码在 githup 中所示。


理解 Spark REPL




走进 Spark-Shell

跟踪 Spark-Shell 脚本的执行情况,发现最后调用的是 org.apache.spark.repl.Main.main() ,跟踪代码流程,发现其执行的核心步骤:
// 代码里面没有,笔者自行补充的,因为在 sparkILoop.process(settings) 中会初始化 SparkSession, 如果没有 Spark 环境的话,会报错。
SparkSession
      .builder()
      .master("local")
      .appName("sas")
      .getOrCreate()
val sparkILoop = new SparkILoop
val settings = new GenericRunnerSettings((msg: String) => Console.err.println(msg))
val interpArguments = List(
     "-Yrepl-class-based",
     "-Yrepl-outdir", "G:/spark",
     "-classpath", System.getProperty("java.class.path")
   ) ++ args.toList
settings.processArguments(interpArguments, true)
sparkILoop.process(settings)
通过以上代码可以发现,Spark REPL 的核心 是 SparkILoop ,我们想要的只是通过纯代码的形式和 Spark 进行交互,而不需要交互界面功能以及去掉 welcome 打印信息等等,我们对 sparkILoop.process(settings) 代码进行了深层次的研究,发现实际所需的操作为:

sparkILoop.settings = settings
sparkILoop.createInterpreter()
var intp: IMain = sparkILoop.intp
intp.initializeSynchronous()
// output -> res0: Int = 2
intp.interpret("1+1")


完整代码

val sparkILoop = new SparkILoop
val settings = new GenericRunnerSettings((msg: String) => Console.err.println(msg))
val interpArguments = List(
      "-Yrepl-class-based",
      "-Yrepl-outdir", "G:/spark",
      "-classpath", System.getProperty("java.class.path")
    ) ++ args.toList
settings.processArguments(interpArguments, true)

//  sparkILoop.process(settings)
sparkILoop.settings = settings
sparkILoop.createInterpreter()
var intp: IMain = sparkILoop.intp
intp.initializeSynchronous()
// 测试
intp.interpret("1+1")


添加 Spark 执行环境

继续研究 SparkILoop 代码,发现其提供 bind() ,用于变量的绑定



val spark = SparkSession
      .builder()
      .master("local")
      .appName("demo")
      .getOrCreate()
sparkILoop.bind("spark",spark.getClass.getCanonicalName,
      spark,
      List("""@transient"""))
sparkILoop.bind("sc","org.apache.spark.SparkContext",
      spark.sparkContext,
      List("""@transient"""))
 //  Spark 环境测试
intp.interpret(
      """
        |println(spark.sessionState.conf.getAllConfs)
      """.stripMargin)
游客

返回顶部