乐趣区

关于elasticsearch:Spark读取ElasticSearch数据优化

Spark 读取 ElasticSearch 数据优化

个别业务上,咱们只会关怀写 ElasticSearch,写也没有用到 spark-elasticsearch 组件。应用的是 ElasticSearch 原生的 bulkProcessor。

查问方面个别在 kibana 敲敲查问代码就够了。

然而某天还是须要用到这个组件,特此记录下。

第一次应用

引入 maven

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>7.8.0</version>
        </dependency>

写一手简略的代码,根本整个思路有了。

import org.elasticsearch.spark.sql._

// 配置信息
 val spark = SparkSession.builder().appName("ES 读取数据 商品关联视频")
    .config(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
    .config(ConfigurationOptions.ES_NODES, "***")
    .config(ConfigurationOptions.ES_PORT, "9200")
    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "***")
    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "***")
    .getOrCreate()
    
val query =
      """{
        |  "query": {
        |    "bool": {
        |      "must": [
        |        {
        |          "term": {
        |            "date": {
        |              "value": "20200816"
        |            }
        |          }
        |        },
        |        {
        |          "term": {
        |          "room_id": {
        |            "value": "6861399602093165326"
        |          }
        |        }
        |        }
        |      ]
        |    }
        |  }
        |}""".stripMargin

    val df = spark.esDF("live-product-room-alias/origin", query)    

放到线上怼一波,发现分区只有一个。跑了 4.6 小时~ 数据量在 1.2 亿左右。太慢了。

第二次应用

加两个参数下来,
意思就是 ES_SCROLL_SIZE 每次能够拿 1W 条,默认只有 50 条~

ES_MAX_DOCS_PER_PARTITION 代表每个分区解决的数据量,这个很要害,加上去之后,发现 spark stage 上的工作分区变成了 126。嗯~ 要的就是这个 feel。
.config(ConfigurationOptions.ES_SCROLL_SIZE, 10000)
.config(ConfigurationOptions.ES_MAX_DOCS_PER_PARTITION, 1000000)

往线上怼一波,报错超时。看到 Duration,发现都是在 1 分钟之后超时,很可能是有某个参数在作怪。发现了 HTTP_TIME_OUT 参数

Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 23, emr-worker-8.cluster-183597, executor 4): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[es-cn-mp91asj1e0007gyzj.elasticsearch.aliyuncs.com:9200]]
  /** HTTP connection timeout */
    String ES_HTTP_TIMEOUT = "es.http.timeout";
    String ES_HTTP_TIMEOUT_DEFAULT = "1m";

第三次应用

那么扭转一下参数好了。

.config(ConfigurationOptions.ES_SCROLL_SIZE, 10000)
.config(ConfigurationOptions.ES_MAX_DOCS_PER_PARTITION, 1000000)
.config(ConfigurationOptions.ES_HTTP_TIMEOUT,"5m")
.config(ConfigurationOptions.ES_SCROLL_KEEPALIVE,"10m")
.config("es.internal.spark.sql.pushdown", value = true)
.config(ConfigurationOptions.ES_READ_SOURCE_FILTER, "room_id,date,product_id,room_ticket_count,total_user")
退出移动版