乐趣区

关于spark:Livy探究四-从es读取数据

在后面的所有例子中,咱们只是运行了 livy 官网给的两个例子。这篇咱们要尝试运行一些有意义的代码。

如没有非凡阐明,当前所有的试验都是在 yarn-cluster 模式下运行的。

咱们打算尝试运行上面代码:

sparkSession.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

这段代码用 spark sql 加载了 elasticsearch 的某个 index,并应用 show() 打印几行数据。

为了实现这个试验,有两个问题必须解决:

  1. 大家晓得 spark sql 能够扩大 DataSource,elasticsearch 官网为 spark 开发的 DataSource 在 elasticsearch-spark-20_2.11-x.x.x.jar 外面。所以要运行下面的代码,必须保障这个 jar 包被正确加载到。
  2. 在之前的例子中,咱们用 sc 示意以后的 SparkContext 对象,而这里咱们须要的是 SparkSession 对象。当初咱们还不晓得应该如何援用“以后 SparkSession”对象。

这两个问题,livy 的文档没有波及。然而没关系,从源码外面找答案。

首先,种种迹象表明 livy 会主动将 LIVY_HOME/rsc-jars 目录下的 jar 包上传。于是咱们先把 elasticsearch-spark-20_2.11-x.x.x.jar 传到 LIVY_HOME/rsc-jars 目录下。

而后,从源码 org/apache/livy/repl/AbstractSparkInterpreter.scala 中能够找到 SparkSession 对象的bind

...
bind("spark",
 sparkEntries.sparkSession().getClass.getCanonicalName,
 sparkEntries.sparkSession(),
 List("""@transient"""))
bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))
execute("import org.apache.spark.SparkContext._")
execute("import spark.implicits._")
execute("import spark.sql")
execute("import org.apache.spark.sql.functions._")
...

能够看到,这里将 SparkSession 对象 bind 到 spark 变量上,而把 SparkContext 对象 bind 到 sc 变量上。

于是咱们的代码应该写成:

spark.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

接下来,还是用 python 来提交代码运行:

data = {'code': 'sc.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes"->"192.168.21.41:9200","es.resource"->"777_zabbix_item2020_09_23_09_50_41")).load().show()'}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)

从 webui 上查看运行后果:

能够看到 show() 成绩打印了后果

从 spark-web-ui 上找到环境页面,查看 spark.yarn.dist.jars,能够看到,elasticsearch-spark-20_2.11-x.x.x.jar 被加了进来:

总结

从这个试验,咱们把握了自定义的 jar 包应该如何利用 livy 上传到集群上;还晓得了 SparkSession 对象 bind 的变量是spark

退出移动版