关于spark:Sparksubmit执行流程了解一下

摘要:本文次要是通过Spark代码走读来理解spark-submit的流程。1.工作命令提交咱们在进行Spark工作提交时,会应用“spark-submit -class .....”款式的命令来提交工作,该命令为Spark目录下的shell脚本。它的作用是查问spark-home,调用spark-class命令。 if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefi# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"随后会执行spark-class命令,以SparkSubmit类为参数进行工作向Spark程序的提交,而Spark-class的shell脚本次要是执行以下几个步骤: (1)加载spark环境参数,从conf中获取 if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-homefi. "${SPARK_HOME}"/bin/load-spark-env.sh# 寻找javahomeif [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java"else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fifi(2)载入java,jar包等 # Find Spark jars.if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars"else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"fi(3)调用org.apache.spark.launcher中的Main进行参数注入 ...

December 11, 2020 · 2 min · jiezi

关于spark:spark系列2spark-301-AQEAdaptive-Query-Exection分析

AQE简介从spark configuration,到在最早在spark 1.6版本就曾经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实际;到了spark 3.0时代,Databricks和intel一起为社区奉献了新的AQE spark 3.0.1中的AQE的配置配置项默认值官网阐明剖析spark.sql.adaptive.enabledfalse是否开启自适应查问此处设置为true开启spark.sql.adaptive.coalescePartitions.enabledtrue是否合并邻近的shuffle分区(依据'spark.sql.adaptive.advisoryPartitionSizeInBytes'的阈值来合并)此处默认为true开启,剖析见: 剖析1spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值剖析见:剖析2spark.sql.adaptive.coalescePartitions.minPartitionNum(none)shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度剖析见: 剖析3spark.sql.adaptive.advisoryPartitionSizeInBytes64MB倡议的shuffle分区的大小,在合并分区和解决join数据歪斜的时候用到剖析见:剖析3spark.sql.adaptive.skewJoin.enabledtrue是否开启join中数据歪斜的自适应解决 spark.sql.adaptive.skewJoin.skewedPartitionFactor5数据歪斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes剖析见:剖析4spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB数据歪斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes剖析见:剖析4剖析1在OptimizeSkewedJoin.scala中,咱们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被援用的中央, (OptimizeSkewedJoin是物理打算中的规定) /** * The goal of skew join optimization is to make the data distribution more even. The target size * to split skewed partitions is the average size of non-skewed partition, or the * advisory partition size if avg size is smaller than it. */ private def targetSize(sizes: Seq[Long], medianSize: Long): Long = { val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize)) // It's impossible that all the partitions are skewed, as we use median size to define skew. assert(nonSkewSizes.nonEmpty) math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) }其中: ...

December 1, 2020 · 6 min · jiezi

关于spark:spark系列1deltaio到底解决了什么问题

本文转载自 https://mp.weixin.qq.com/s/ZN... 2019 年 10 月 16 日,在荷兰阿姆斯特丹举办的 Spark+AI 欧洲峰会上,Databricks 正式发表将 Delta Lake 捐献给了 Linux 基金会,其成为了该基金会中的一个正式我的项目。咱们期待在往年(2019 年)或者是将来,很快, Delta Lake 将会成为数据湖的支流或者说是事实标准。 在 9 月份颁布的 2019 年最佳开源软件奖名单中,Delta Lake 也榜上有名。正如官网对 Delta Lake 的颁奖评语形容,大家都很诧异,Databricks 公司居然把 Delta Lake 这个外围的拳头产品开源了。Delta Lake 的推出实际上是为了解决 Spark 作为大数据分析平台的诸多痛点,也置信它将会普惠整个 Spark 社区以及其余大数据社区,真正解决数据湖治理的各种关键问题。 很有幸,我参加了 Delta Lake 晚期的开发,尤其是 merge、update、delete 这种要害 DML 的设计和实现。这个我的项目最早启动于 2017 年 6 月。过后,多位用户向咱们埋怨 Spark 的有余和应用的不便,咱们公司的工程师们探讨后发现,是时候去提出咱们本人的存储架构。Spark 作为一种存储和计算拆散的一种计算引擎,之前咱们次要依赖于其余开源或非开源的我的项目去解决各种数据存储的问题,但实际上咱们发现在用户的生产环境中,现有的存储计划都没方法真正的解决数据湖。于是乎,咱们就和客户一起尝试去开发,去解决理论生产环境中的问题。通过四个月的疾速开发,咱们在 2017 年 10 月正式发表了 Delta Lake 产品的诞生。在第二年 6 月份的 Spark+AI 峰会中,Apple 的工程师和咱们的工程师 Michael 一起做了主题演讲,分享了 Apple 在应用 Delta Lake 的一些贵重教训,比如说他们过后用 Delta Lake 解决了 trillion 级别数据的大表的读写。 ...

November 20, 2020 · 4 min · jiezi

关于spark:Spark的五种JOIN策略解析

JOIN操作是十分常见的数据处理操作,Spark作为一个对立的大数据处理引擎,提供了十分丰盛的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,心愿对你有所帮忙。本文次要包含以下内容: 影响JOIN操作的因素Spark中JOIN执行的5种策略Spark是如何抉择JOIN策略的影响JOIN操作的因素数据集的大小参加JOIN的数据集的大小会间接影响Join操作的执行效率。同样,也会影响JOIN机制的抉择和JOIN的执行效率。 JOIN的条件JOIN的条件会波及字段之间的逻辑比拟。依据JOIN的条件,JOIN可分为两大类:等值连贯和非等值连贯。等值连贯会波及一个或多个须要同时满足的相等条件。在两个输出数据集的属性之间利用每个等值条件。当应用其余运算符(运算连接符不为=)时,称之为非等值连贯。 JOIN的类型在输出数据集的记录之间利用连贯条件之后,JOIN类型会影响JOIN操作的后果。次要有以下几种JOIN类型: 内连贯(Inner Join):仅从输出数据集中输入匹配连贯条件的记录。外连贯(Outer Join):又分为左外连贯、右外链接和全外连贯。半连贯(Semi Join):右表只用于过滤左表的数据而不呈现在后果集中。穿插连贯(Cross Join):穿插联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。穿插联接也称作笛卡尔积。Spark中JOIN执行的5种策略Spark提供了5种JOIN机制来执行具体的JOIN操作。该5种JOIN机制如下所示: Shuffle Hash JoinBroadcast Hash JoinSort Merge JoinCartesian JoinBroadcast Nested Loop JoinShuffle Hash Join简介当要JOIN的表数据量比拟大时,能够抉择Shuffle Hash Join。这样能够将大表进行依照JOIN的key进行重分区,保障每个雷同的JOIN key都发送到同一个分区中。如下图示: 如上图所示:Shuffle Hash Join的根本步骤次要有以下两点: 首先,对于两张参加JOIN的表,别离依照join key进行重分区,该过程会波及Shuffle,其目标是将雷同join key的数据发送到同一个分区,不便分区内进行join。其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,而后依据join key与大表的分区数据记录进行匹配。条件与特点仅反对等值连贯,join key不须要排序反对除了全外连贯(full outer joins)之外的所有join类型须要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比拟大,可能会造成OOM将参数spark.sql.join.prefersortmergeJoin (default true)置为falseBroadcast Hash Join简介也称之为Map端JOIN。当有一张表较小时,咱们通常抉择Broadcast Hash Join,这样能够防止Shuffle带来的开销,从而进步性能。比方事实表与维表进行JOIN时,因为维表的数据通常会很小,所以能够应用Broadcast Hash Join将维表进行Broadcast。这样能够防止数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而进步JOIN的效率。在进行 Broadcast Join 之前,Spark 须要把处于 Executor 端的数据先发送到 Driver 端,而后 Driver 端再把数据播送到 Executor 端。如果咱们须要播送的数据比拟多,会造成 Driver 端呈现 OOM。具体如下图示: Broadcast Hash Join次要包含两个阶段: Broadcast阶段 :小表被缓存在executor中Hash Join阶段:在每个 executor中执行Hash Join条件与特点仅反对等值连贯,join key不须要排序反对除了全外连贯(full outer joins)之外的所有join类型Broadcast Hash Join相比其余的JOIN机制而言,效率更高。然而,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,须要在Driver端缓存数据,所以当小表的数据量较大时,会呈现OOM的状况被播送的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)被播送表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scalalongMetric("dataSize") += dataSize if (dataSize >= (8L << 30)) { throw new SparkException( s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB") }基表不能被broadcast,比方左连贯时,只能将右表进行播送。形如:fact_table.join(broadcast(dimension_table),能够不应用broadcast提醒,当满足条件时会主动转为该JOIN形式。Sort Merge Join简介该JOIN机制是Spark默认的,能够通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先应用Sort Merge Join。个别在两张大表进行JOIN时,应用该形式。Sort Merge Join能够缩小集群中的数据传输,该形式不会先加载所有数据的到内存,而后进行hashjoin,然而在JOIN之前须要对join key进行排序。具体图示: ...

November 18, 2020 · 3 min · jiezi

关于spark:Spark-SQL百万级数据批量读写入MySQL

Spark SQL读取MySQL的形式Spark SQL还包含一个能够应用JDBC从其余数据库读取数据的数据源。与应用JdbcRDD相比,应优先应用此性能。这是因为后果作为DataFrame返回,它们能够在Spark SQL中轻松解决或与其余数据源连贯。JDBC数据源也更易于应用Java或Python,因为它不须要用户提供ClassTag。 能够应用Data Sources API将近程数据库中的表加载为DataFrame或Spark SQL长期视图。用户能够在数据源选项中指定JDBC连贯属性。 user和password通常作为用于登录数据源的连贯属性。除连贯属性外,Spark还反对以下不辨别大小写的选项: 属性名称解释url要连贯的JDBC URLdbtable读取或写入的JDBC表query指定查问语句driver用于连贯到该URL的JDBC驱动类名partitionColumn, lowerBound, upperBound如果指定了这些选项,则必须全副指定。另外, numPartitions必须指定numPartitions表读写中可用于并行处理的最大分区数。这也确定了并发JDBC连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过coalesce(numPartitions)在写入之前进行调用将其升高到此限度queryTimeout默认为0,查问超时工夫fetchsizeJDBC的获取大小,它确定每次要获取多少行。这能够帮忙进步JDBC驱动程序的性能batchsize默认为1000,JDBC批处理大小,这能够帮忙进步JDBC驱动程序的性能。isolationLevel事务隔离级别,实用于以后连贯。它能够是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED。此选项仅实用于写作。sessionInitStatement在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句,应用它来实现会话初始化代码。truncate这是与JDBC writer相干的选项。当SaveMode.Overwrite启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为falsepushDownPredicate用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种状况下,Spark将尽可能将过滤器下推到JDBC数据源。源码SparkSession/** * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a * `DataFrame`. * {{{ * sparkSession.read.parquet("/path/to/file.parquet") * sparkSession.read.schema(schema).json("/path/to/file.json") * }}} * * @since 2.0.0 */ def read: DataFrameReader = new DataFrameReader(self)DataFrameReader // ...省略代码... /** *所有的数据由RDD的一个分区解决,如果你这个表很大,很可能会呈现OOM *能够应用DataFrameDF.rdd.partitions.size办法查看 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") this.extraOptions ++= properties.asScala this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) format("jdbc").load() }/** * @param url 数据库url * @param table 表名 * @param columnName 分区字段名 * @param lowerBound `columnName`的最小值,用于分区步长 * @param upperBound `columnName`的最大值,用于分区步长. * @param numPartitions 分区数量 * @param connectionProperties 其余参数 * @since 1.4.0 */ def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame = { this.extraOptions ++= Map( JDBCOptions.JDBC_PARTITION_COLUMN -> columnName, JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString, JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString, JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString) jdbc(url, table, connectionProperties) } /** * @param predicates 每个分区的where条件 * 比方:"id <= 1000", "score > 1000 and score <= 2000" * 将会分成两个分区 * @since 1.4.0 */ def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition } val relation = JDBCRelation(parts, options)(sparkSession) sparkSession.baseRelationToDataFrame(relation) }示例 private def runJdbcDatasetExample(spark: SparkSession): Unit = { // 从JDBC source加载数据(load) val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/test") .option("dbtable", "mytable") .option("user", "root") .option("password", "root") .load() val connectionProperties = new Properties() connectionProperties.put("user", "root") connectionProperties.put("password", "root") val jdbcDF2 = spark.read .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties) // 指定读取schema的数据类型 connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties) }值得注意的是,下面的形式如果不指定分区的话,Spark默认会应用一个分区读取数据,这样在数据量特地大的状况下,会呈现OOM。在读取数据之后,调用DataFrameDF.rdd.partitions.size办法能够查看分区数。 ...

November 18, 2020 · 3 min · jiezi

关于spark:k8s系列5KubernetesClientException-too-old-resource-version-原因分析

背景公司目前在基于k8s做调度(基于io.fabric8:kubernetes-client:4.2.0),在运行的过程中,遇到了如下问题: OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason: [OkHttp WebSocket https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Current reconnect backoff is 1000 milliseconds (T0)[reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a199 - 2020-11-17T06:39:13.874Z -[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06:39:13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127)at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?]at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?]at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?]at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?]at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]单凭这个问题其实没什么,然而代码中是: ...

November 18, 2020 · 4 min · jiezi

关于spark:go系列1go的安装

因为之前我的项目依赖到了k8s,且与k8s的集成比拟多,而k8s的源码是go写的,网上搜寻了一大堆对于go的信息, 发现很多云原生的我的项目都是由go写的,所以又来折腾以下go语言,因为之前用brew install mac装置了go,也用了pkg包装置了go,所以环境够的很乱,至此,特定来清理一下 装置形式brew install go官网下载对应的安装包,间接装置留神: 第一种形式装置,go可执行文件在/usr/local/bin下,其实这个目录下的go是一个软连贯, ➜ bin ll /usr/local/bin |grep golrwxr-xr-x 1 ljh admin 26 9 28 10:46 go -> ../Cellar/go/1.15.2/bin/golrwxr-xr-x 1 ljh admin 29 9 28 10:46 godoc -> ../Cellar/go/1.15.2/bin/godoclrwxr-xr-x 1 ljh admin 29 9 28 10:46 gofmt -> ../Cellar/go/1.15.2/bin/gofmt咱们发现其实这个是链接到了brew install的go的装置目录下了 第二种形式装置,go的可执行文件在/usr/local/go/bin下,这个是go pkg包默认的装置目录 然而无论怎么装置,都得配置GOPATH/GOBIN/PATH,如下, 这三个变量稍后做解释 #goexport GOPATH=/Users/ljh/goexport GOBIN=$GOPATH/binexport PATH=$PATH:$GOBIN这个时候,运行go env GO111MODULE=""GOARCH="amd64"GOBIN="/Users/ljh/go/bin"GOCACHE="/Users/ljh/Library/Caches/go-build"GOENV="/Users/ljh/Library/Application Support/go/env"GOEXE=""GOFLAGS=""GOHOSTARCH="amd64"GOHOSTOS="darwin"GOINSECURE=""GOMODCACHE="/Users/ljh/go/pkg/mod"GONOPROXY=""GONOSUMDB=""GOOS="darwin"GOPATH="/Users/ljh/go"GOPRIVATE=""GOPROXY="https://proxy.golang.org,direct"GOROOT="/usr/local/Cellar/go/1.15.2/libexec"GOSUMDB="sum.golang.org"GOTMPDIR=""GOTOOLDIR="/usr/local/Cellar/go/1.15.2/libexec/pkg/tool/darwin_amd64"GCCGO="gccgo"AR="ar"CC="clang"CXX="clang++"CGO_ENABLED="1"GOMOD=""CGO_CFLAGS="-g -O2"CGO_CPPFLAGS=""CGO_CXXFLAGS="-g -O2"CGO_FFLAGS="-g -O2"CGO_LDFLAGS="-g -O2"PKG_CONFIG="pkg-config"GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/1l/mwvs7rf563x72kqcv7l6rb840000gn/T/go-build705548469=/tmp/go-build -gno-record-gcc-switches -fno-common"发现还有一个GOROOT,上面咱们来解释一下 ...

November 11, 2020 · 1 min · jiezi

关于spark:mlflow系列5一站式开源测试平台MeterSphere

背景因为咱们公司部署mlflow 服务的须要,而且之前的mlflow的钻研以及局部上线,都是我亲手操刀的,尽管mlflow算法服务是开源完满的部署下来了, 然而这里波及到该mlflow服务的性能问题,也就是该服务能反对的最大QPS,因为之前比较忙,所以间接用jmeter做的压测,这当然是能够满足要求的, 然而操作起来比拟麻烦: 下载对应的jmeter tgz包,解压到指标机器上启动bin/jmeter服务,在弹出的jconsole设置一系列参数上传生成的jmx测试文件到linux服务器因为个别linux是不开图形界面的,所以得开启CLI模式进行测试得本人手动分析测试报告这一系列操作很繁琐,因为最近有工夫,所以网上搜寻到了一款开源一站式测试平台MeterSphere 装置MeterSphere的装置在官网上很分明,文档也很清晰 咱们剖析一下重点:装置的机器上必须得装置Docker和docker-compose 应用应用在官网文档也很具体 咱们剖析一下重点: 这里的最大并发数,能够调整,默认是50,否则影响前面的应用: 如果不调整,设置超过50时,就会报错 这里的并发用户数/压测时长 会笼罩xml文件中ThreadGroup.num_threads/ThreadGroup.ramp_time的值 这里的RPS下限是设置压测的rps下限 以下截图展现以下测试报告: 当然还能够把测试报告 以pdf模式导出进行展现, 用到这里,作为开发的我,感觉还是不错的

November 10, 2020 · 1 min · jiezi

关于spark:DevOps是什么

DevOps的维基百科定义是: DevOps是一组过程、办法与零碎的统称,用于促成开发、技术经营和品质保障(QA)部门之间的沟通、合作与整合。 具体每个步骤就是: plancodebuildtestreleasedeployoperatormonitorplancode... 一张图总结: 也就是说 开发 测试 部署 操作 监控 一系列的操作造成一个闭环, 具体的流程就是,运维人员在我的项目开发阶段就渗入到我的项目中,从而提供跟好的部署监控等运维计划,而开发人员也得在运维部署阶段渗入到运维工作中, 其实这样对于开发和运维人员的能力要求都有所提高,运维得懂开发,开发也得懂运维常识 而具体在施行过程中,每个环节都很多的软件撑持,如 jenkins:build/relase kubernetes:deploy sentry:monitorjmeter :test

November 6, 2020 · 1 min · jiezi

关于spark:mlflow-upgrade升级-Cannot-add-foreign-key-constraint

背景在mlflow upgrade这篇文章中,咱们说到了mlflow 降级的步骤, 很侥幸,零打碎敲, 并没有产生什么谬误, 明天要说的就是降级过程中如果遇到mysql Cannot add foreign key constraint的谬误该怎么解决 其中: mlflow 从1.4.0降级到1.11.0 mysql版本 5.7.21-log 遇到如下问题: (mlflow-1.11.0) ➜ mlflow db upgrade mysql://root:root@localhost/mlflow-online2020/11/04 14:20:13 INFO mlflow.store.db.utils: Updating database tablesINFO [alembic.runtime.migration] Context impl MySQLImpl.INFO [alembic.runtime.migration] Will assume non-transactional DDL.INFO [alembic.runtime.migration] Running upgrade 0a8213491aaa -> 728d730b5ebd, add registered model tags tableTraceback (most recent call last): File "/Users/ljh/opt/miniconda3/envs/mlflow-1.11.0/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context cursor, statement, parameters, context File "/Users/ljh/opt/miniconda3/envs/mlflow-1.11.0/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute cursor.execute(statement, parameters) File "/Users/ljh/opt/miniconda3/envs/mlflow-1.11.0/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute res = self._query(query) File "/Users/ljh/opt/miniconda3/envs/mlflow-1.11.0/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query db.query(q) File "/Users/ljh/opt/miniconda3/envs/mlflow-1.11.0/lib/python3.6/site-packages/MySQLdb/connections.py", line 259, in query _mysql.connection.query(self, query)MySQLdb._exceptions.IntegrityError: (1215, 'Cannot add foreign key constraint')...sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1215, 'Cannot add foreign key constraint')[SQL:CREATE TABLE registered_model_tags ( `key` VARCHAR(250) NOT NULL, value VARCHAR(5000), name VARCHAR(256) NOT NULL, CONSTRAINT registered_model_tag_pk PRIMARY KEY (`key`, name), FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE cascade)问题剖析咱们间接把以上的CREATE TABLE registered_model_tags 语句复制到mysql的客户端执行,发现也是Cannot add foreign key constraint谬误, 继续执行SHOW ENGINE INNODB STATUS 截取Status字段中一部分: ...

November 4, 2020 · 2 min · jiezi

关于spark:存算分离下写性能提升10倍以上EMR-Spark引擎是如何做到的

引言随着大数据技术架构的演进,存储与计算拆散的架构能更好的满足用户对升高数据存储老本,按需调度计算资源的诉求,正在成为越来越多人的抉择。相较 HDFS,数据存储在对象存储上能够节约存储老本,但与此同时,对象存储对海量文件的写性能也会差很多。 腾讯云弹性 MapReduce(EMR) 是腾讯云的一个云端托管的弹性开源泛 Hadoop 服务,反对 Spark、Hbase、Presto、Flink、Druid 等大数据框架。 近期,在反对一位 EMR 客户时,遇到典型的存储计算拆散利用场景。客户应用了 EMR 中的 Spark 组件作为计算引擎,数据存储在对象存储上。在帮忙客户技术调优过程中,发现了 Spark 在海量文件场景下写入性能比拟低,影响了架构的整体性能体现。 在深入分析和优化后,咱们最终将写入性能大幅晋升,特地是将写入对象存储的性能晋升了 10 倍以上,减速了业务解决,取得了客户好评。 本篇文章将介绍在存储计算拆散架构中,腾讯云 EMR Spark 计算引擎如何晋升在海量文件场景下的写性能,心愿与大家一起交换。文章作者:钟德艮,腾讯后盾开发工程师。 一、问题背景Apache Spark 是专为大规模数据处理而设计的疾速通用的计算引擎,可用来构建大型的、低提早的数据分析应用程序。Spark 是 UC Berkeley AMP lab (加州大学伯克利分校的 AMP 实验室)所开源的类 Hadoop MapReduce 的通用并行框架,Spark 领有 Hadoop MapReduce 所具备的长处。 与 Hadoop 不同,Spark 和 Scala 可能严密集成,其中的 Scala 能够像操作本地汇合对象一样轻松地操作分布式数据集。只管创立 Spark 是为了反对分布式数据集上的迭代作业,然而实际上它是对 Hadoop 的补充,能够在 Hadoop 文件系统中并行运行,也能够运行在云存储之上。 在这次技术调优过程中,咱们钻研的计算引擎是 EMR 产品中的 Spark 组件,因为其优异的性能等长处,也成为越来越多的客户在大数据计算引擎的抉择。 存储上,客户抉择的是对象存储。在数据存储方面,对象存储领有牢靠,可扩大和更低成本等个性,相比 Hadoop 文件系统 HDFS,是更优的低成本存储形式。海量的温冷数据更适宜放到对象存储上,以降低成本。 ...

November 2, 2020 · 2 min · jiezi

关于spark:mlflow-升级upgrade

背景mlflow 的更新迭代速度还是很快的,均匀一个月一个大版本的更新,截止到11月1号,曾经更新到了1.11.0版本 咱们查看mlflow release,就能看到早在1.10.0版本,就提供了对model registry的更好的feature反对,以及可能对试验进行逻辑删除操作, 而这些features 在mlflow 1.4.0是没有的,特地是删除试验的个性,如果试验很多的状况下,咱们看到的试验是横七竖八的,很不不便咱们进行治理,所以咱们进行mlflow的降级 降级以及筹备参照之前mlflow的搭建应用 ,咱们先建设mlflow 1.4.0 和mlflow 1.11.0的conda环境 假如你曾经建设好了对应的conda环境,且别离为mlflow-1.4.0 和mlflow-1.11.0 则执行: conda activate mlflow-1.11.0参考mlflow db upgrade ,执行 mlflow db upgrade mysql://user:passwd@host:port/db如:mlflow db upgrade mysql://root:root@localhost/mlflow其中 名词解释user数据库的用户名passwd数据库的明码host数据库的主机地址port数据库的端口,如默认为3306则能够省略db数据库的database如果执行胜利则会看到如下输入信息: 2020/11/02 10:24:50 INFO mlflow.store.db.utils: Updating database tablesINFO [alembic.runtime.migration] Context impl MySQLImpl.INFO [alembic.runtime.migration] Will assume non-transactional DDL.INFO [alembic.runtime.migration] Running upgrade 2b4d017a5e9b -> cfd24bdc0731, Update run status constraint with killedINFO [alembic.runtime.migration] Running upgrade cfd24bdc0731 -> 0a8213491aaa, drop_duplicate_killed_constraintWARNI [0a8213491aaa_drop_duplicate_killed_constraint_py] Failed to drop check constraint. Dropping check constraints may not be supported by your SQL database. Exception content: (MySQLdb._exceptions.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'CHECK status' at line 1")[SQL: ALTER TABLE runs DROP CHECK status](Background on this error at: http://sqlalche.me/e/f405)INFO [alembic.runtime.migration] Running upgrade 0a8213491aaa -> 728d730b5ebd, add registered model tags tableINFO [alembic.runtime.migration] Running upgrade 728d730b5ebd -> 27a6a02d2cf1, add model version tags tableINFO [alembic.runtime.migration] Running upgrade 27a6a02d2cf1 -> 84291f40a231, add run_link to model_version如果此时再在mlflow 1.4.0的环境下 再执行: ...

November 2, 2020 · 2 min · jiezi

关于spark:mlflow的搭建使用

背景mlflow是Databrick开源的机器学习治理平台,它很好的解藕了算法训练和算法模型服务,使得算法工程师专一于模型的训练,而不须要过多的关注于服务的, 而且在咱们公司曾经有十多个服务稳固运行了两年多。 搭建mlflow的搭建次要是mlflow tracking server的搭建,tracking server次要是用于模型的元数据以及模型的数据存储 咱们这次以minio作为模型数据的存储后盾,mysql作为模型元数据的存储,因为这种模式能满足线上的需要,不仅仅是用于测试 minio的搭建 参考我之前的文章MinIO的搭建应用,并且创立名为mlflow的bucket,便于后续操作mlflow的搭建 conda的装置 参照install conda,依据本人的零碎装置不同的conda环境mlfow tracking server装置 # 创立conda环境 并装置 python 3.6 conda create -n mlflow-1.11.0 python==3.6#激活conda环境conda activate mlflow-1.11.0# 装置mlfow tracking server python须要的依赖包pip install mlflow==1.11.0 pip install mysqlclientpip install boto3mlflow tracking server的启动 暴露出minio url以及须要的ID和KEY,因为mlflow tracking server在上传模型文件时须要 export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLEexport AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEYexport MLFLOW_S3_ENDPOINT_URL=http://localhost:9001mlflow server \ --backend-store-uri mysql://root:AO,h07ObIeH-@localhost/mlflow_test \ --host 0.0.0.0 -p 5002 \ --default-artifact-root s3://mlflow拜访localhost:5002, 就能看到如下界面: 应用拷贝以下的wine.py文件 import osimport warningsimport sysimport pandas as pdimport numpy as npfrom sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scorefrom sklearn.model_selection import train_test_splitfrom sklearn.linear_model import ElasticNetimport mlflow.sklearndef eval_metrics(actual, pred): rmse = np.sqrt(mean_squared_error(actual, pred)) mae = mean_absolute_error(actual, pred) r2 = r2_score(actual, pred) return rmse, mae, r2if __name__ == "__main__": warnings.filterwarnings("ignore") np.random.seed(40) # Read the wine-quality csv file (make sure you're running this from the root of MLflow!) wine_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wine-quality.csv") data = pd.read_csv(wine_path) # Split the data into training and test sets. (0.75, 0.25) split. train, test = train_test_split(data) # The predicted column is "quality" which is a scalar from [3, 9] train_x = train.drop(["quality"], axis=1) test_x = test.drop(["quality"], axis=1) train_y = train[["quality"]] test_y = test[["quality"]] alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5 l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5 mlflow.set_tracking_uri("http://localhost:5002") client = mlflow.tracking.MlflowClient() mlflow.set_experiment('http_metrics_test') with mlflow.start_run(): lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42) lr.fit(train_x, train_y) predicted_qualities = lr.predict(test_x) (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities) print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio)) print(" RMSE: %s" % rmse) print(" MAE: %s" % mae) print(" R2: %s" % r2) mlflow.log_param("alpha", alpha) mlflow.log_param("l1_ratio", l1_ratio) mlflow.log_metric("rmse", rmse) mlflow.log_metric("r2", r2) mlflow.log_metric("mae", mae) mlflow.sklearn.log_model(lr, "model")留神: ...

October 30, 2020 · 2 min · jiezi

关于spark:akka-cluster-splitbrainresolverSBR

背景最近我的项目中,用akka(2.6.8) cluster在k8s做分布式的部署,,其中遇到unreachable node 如果始终未手动重启,则会导致其余的node退出不到cluster中来, 具体的操作为其中的一个非seed node节点因为pod 重启导致,部署到了其余的节点上,而之前的node(ip),cluster则会始终去连贯该node(ip),从而导致异样 具体起因剖析首先咱们先看一下概念Gossip Convergence,如下: Gossip convergence cannot occur while any nodes are unreachable. The nodes need to become reachable again, or moved to the down and removed states (see the Cluster Membership Lifecycle section). This only blocks the leader from performing its cluster membership management and does not influence the application running on top of the cluster. For example this means that during a network partition it is not possible to add more nodes to the cluster. The nodes can join, but they will not be moved to the up state until the partition has healed or the unreachable nodes have been downed.翻译过去就是: 当任何节点都不可达时,Gossip convergence就不达成统一。节点须要再次变得reachable,或转移到down和removed状态。这仅阻止领导者执行其集群成员资格治理,并且不会影响在集群顶部运行的应用程序。例如,这意味着在网络分 区期间不可能将更多节点增加到群集。节点能够退出,但在分区修复或无法访问的节点已敞开之前,它们将不会移入up状态。 很显著,akka就是要保障每个节点是reachable或者down,这样能力进行一致性协商 ...

October 21, 2020 · 2 min · jiezi

关于spark:Uber-jvm-profiler-使用

背景uber jvm profiler是用于在分布式监控收集jvm 相干指标,如:cpu/memory/io/gc信息等 装置确保装置了maven和JDK>=8前提下,间接mvn clean package java application阐明 间接以java agent的部署就能够应用 应用 java -javaagent:jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.KafkaOutputReporter,brokerList='kafka1:9092',topicPrefix=demo_,tag=tag-demo,metricInterval=5000,sampleInterval=0 -cp target/jvm-profiler-1.0.0.jar 选项解释 |参数|阐明| |------|-----| |reporter|reporter类别, 此处间接默认为com.uber.profiling.reporters.KafkaOutputReporter就能够| |brokerList|如reporter为com.uber.profiling.reporters.KafkaOutputReporter,则brokerList为kafka列表,以逗号分隔| |topicPrefix|如reporter为com.uber.profiling.reporters.KafkaOutputReporter,则topicPrefix为kafka topic的前缀| |tag|key为tag的metric,会输入到reporter中| |metricInterval|metric report的频率,依据理论状况设置,单位为ms| |sampleInterval|jvm堆栈metrics report的频率,依据理论状况设置,单位为ms| 后果展现 "nonHeapMemoryTotalUsed": 11890584.0, "bufferPools": [ { "totalCapacity": 0, "name": "direct", "count": 0, "memoryUsed": 0 }, { "totalCapacity": 0, "name": "mapped", "count": 0, "memoryUsed": 0 } ], "heapMemoryTotalUsed": 24330736.0, "epochMillis": 1515627003374, "nonHeapMemoryCommitted": 13565952.0, "heapMemoryCommitted": 257425408.0, "memoryPools": [ { "peakUsageMax": 251658240, "usageMax": 251658240, "peakUsageUsed": 1194496, "name": "Code Cache", "peakUsageCommitted": 2555904, "usageUsed": 1173504, "type": "Non-heap memory", "usageCommitted": 2555904 }, { "peakUsageMax": -1, "usageMax": -1, "peakUsageUsed": 9622920, "name": "Metaspace", "peakUsageCommitted": 9830400, "usageUsed": 9622920, "type": "Non-heap memory", "usageCommitted": 9830400 }, { "peakUsageMax": 1073741824, "usageMax": 1073741824, "peakUsageUsed": 1094160, "name": "Compressed Class Space", "peakUsageCommitted": 1179648, "usageUsed": 1094160, "type": "Non-heap memory", "usageCommitted": 1179648 }, { "peakUsageMax": 1409286144, "usageMax": 1409286144, "peakUsageUsed": 24330736, "name": "PS Eden Space", "peakUsageCommitted": 67108864, "usageUsed": 24330736, "type": "Heap memory", "usageCommitted": 67108864 }, { "peakUsageMax": 11010048, "usageMax": 11010048, "peakUsageUsed": 0, "name": "PS Survivor Space", "peakUsageCommitted": 11010048, "usageUsed": 0, "type": "Heap memory", "usageCommitted": 11010048 }, { "peakUsageMax": 2863661056, "usageMax": 2863661056, "peakUsageUsed": 0, "name": "PS Old Gen", "peakUsageCommitted": 179306496, "usageUsed": 0, "type": "Heap memory", "usageCommitted": 179306496 } ], "processCpuLoad": 0.0008024004394748531, "systemCpuLoad": 0.23138430784607697, "processCpuTime": 496918000, "appId": null, "name": "24103@machine01", "host": "machine01", "processUuid": "3c2ec835-749d-45ea-a7ec-e4b9fe17c23a", "tag": "mytag", "gc": [ { "collectionTime": 0, "name": "PS Scavenge", "collectionCount": 0 }, { "collectionTime": 0, "name": "PS MarkSweep", "collectionCount": 0 }} ...

October 16, 2020 · 3 min · jiezi

关于spark:spark-on-k8skubernetes-DynamicResourceAllocation

随着大数据时代的到来,以及kubernetes的愈发炽热,好多公司曾经把spark利用从yarn迁徙到k8s,当然也踩了不少的坑, 当初咱们来剖析一下spark on k8s的DynamicResourceAllocation这个坑留神:该文基于spark 3.0.0剖析 spark on yarn 中的DynamicResourceAllocationspark on yarn对于DynamicResourceAllocation调配来说,从spark 1.2版本就曾经开始反对了. 对于spark相熟的人都晓得,如果咱们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务, 对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下: <property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value></property><property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value></property><property> <name>spark.shuffle.service.port</name> <value>7337</value></property>重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark利用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的成果 咱们间接从CoarseGrainedExecutorBackend中SparkEnv创立开始说,每一个executor的启动,必然会通过CoarseGrainedExecutorBackend main办法,而main中就波及到SparkEnv的创立 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)而sparkEnv的创立就波及到BlockManager的创立。沿着代码往下走,最终 val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)val blockManager = new BlockManager( executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)在blockManager的initialize办法中,就会进行registerWithExternalShuffleServer ...

October 16, 2020 · 2 min · jiezi

关于spark:spark-on-k8s-与-spark-on-k8s-operator的对比

对于目前基于k8s的的spark利用,次要采纳两种形式运行spark原生反对的 spark on k8s基于k8s的operator的 spark on k8s operator前者是spark社区反对k8s这种资源管理框架而引入的k8s client的实现 后者是k8s社区为了反对spark而开发的一种operator 区别spark on k8sspark on k8s operator社区反对spark社区GoogleCloudPlatform非官方反对版本要求spark>=2.3,Kubernetes>=1.6spark>2.3,Kubernetes>=1.13装置依照官网装置,须要k8s pod的create list edit delete权限,且须要本人编译源码进行镜像的构建,构建过程繁琐须要k8s admin装置incubator/sparkoperator,须要pod create list edit delete的权限应用间接spark submit提交,如:上面code 1,反对client和cluster模式,spark on k8s通过yaml配置文件模式提交,反对client和cluster模式,提交如code2,具体参数参考spark operator configuration长处合乎sparker的形式进行工作提交,对于习惯了spark的使用者来说,应用起来更棘手k8s配置文件形式提交工作,复用性强毛病运行完后driver的资源不会主动开释运行完后driver的资源不会主动开释实现形式对于spark提交形式来说,无论是client提交还是cluster提交,都是继承SparkApplication。以client提交,子类则是JavaMainApplication,该形式以反射运行,对于k8s工作来剖析,clusterManager为KubernetesClusterManager,该形式和向yarn提交工作的形式没什么区别;以cluster形式提交,对于k8s工作来说,spark程序的入口为KubernetesClientApplication,client端会建设clusterIp为None的service,executor跟该service进行rpc,如工作的提交的交互,且会建设以driver-conf-map后缀的configMap,该configMap在建设spark driver pod的时候,以volumn挂载的模式被援用,而该文件的内容最终在driver提交工作的时候以--properties-file模式提交给spark driver,从而spark.driver.host等配置项就传输给了driver,与此同时也会建设以-hadoop-config为后缀的configMap,可是 k8s 镜像怎么辨别是运行executor还是driver的呢?一切都在dockerfile(具体构建的时候依据hadoop和kerbeors环境的不一样进行区别配置)和entrypoint中,其中shell中是辨别driver和executor的;采纳k8s CRD Controller的机制,自定义CRD,依据operator SDK,监听对应的增删改查event,如监听到对应的CRD的创立事件,则依据对应yaml文件配置项,建设pod,进行spark工作的提交,具体的实现,可参考spark on k8s operator design,具体以cluster和client模式提交的原理和spark on k8s统一,因为镜像复用的是spark的官网镜像code 1 ---bin/spark-submit \ --master k8s://https://192.168.202.231:6443 \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=2 \ --conf "spark.kubernetes.namespace=dev" \ --conf "spark.kubernetes.authenticate.driver.serviceAccountName=lijiahong" \ --conf "spark.kubernetes.container.image=harbor.k8s-test.uc.host.dxy/dev/spark-py:cdh-2.6.0-5.13.1" \ --conf "spark.kubernetes.container.image.pullSecrets=regsecret" \ --conf "spark.kubernetes.file.upload.path=hdfs:///tmp" \ --conf "spark.kubernetes.container.image.pullPolicy=Always" \ hdfs:///tmp/spark-examples_2.12-3.0.0.jarcode 2---apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata: name: spark-pi namespace: devspec: type: Scala mode: cluster image: "gcr.io/spark-operator/spark:v3.0.0" imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar" sparkVersion: "3.0.0" restartPolicy: type: Never volumes: - name: "test-volume" hostPath: path: "/tmp" type: Directory driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.0.0 serviceAccount: lijiahong volumeMounts: - name: "test-volume" mountPath: "/tmp" executor: cores: 1 instances: 1 memory: "512m" labels: version: 3.0.0 volumeMounts: - name: "test-volume" mountPath: "/tmp"本文由博客群发一文多发等经营工具平台 OpenWrite 公布

October 16, 2020 · 1 min · jiezi

关于spark:kubernetesk8s-scheduler-backend调度的实现

背景 随着k8s快来越炽热,以及主动部署,主动伸缩等长处,咱们明天来探讨一下,基于k8s的backend的调度怎么来实现组件图 组件阐明整个数据流就是消费者-生产者模型 组件解释kubernetesClient跟k8s进行交互,如:工作的提交,杀工作podsPollingSnapshotSource从k8s中拉取pod的工作状态,存储到podSnapshotStorepodsWatchSnapshotSource监控工作的watcher,以获取工作状态,存储到podSnapshotStorepodSnapshotStorepod状态的存储podStatepod外部状态转换podsSnapshotpod 的状态镜像taskPodsLifecycleManager从podSnapshotStore生产pod的状态,以便依据工作的状态进行后续操作特地阐明 对于podsWatchSnapshotSource的实现,咱们是基于k8s watch机制实现的,然而存在一个问题: 如果某一时刻,podsWatchSnapshotSource产生了故障导致了该组件产生了重启,那么问题来了,重启这段时间就会失落event, 这里咱们采纳k8s的resourceVersion机制,如果咱们定时存储resourceVersion,且在重启的时候读取,就能做到断点续传的作用 留神一点的是:该resourceVersion在 Kubernetes 服务器的保留是有限度的。应用etcd2的旧集群最多可保留1000次更改。 默认状况下,应用etcd3的较新集群会在最近5分钟内保留更改,如果超过了该resourceVersion超过了服务器的resourceVersion的值 则会报错数据流程图 流程阐明backend通过被调用reviveOffer获取能获取到的backend资源.获取到资源后,通过kubernetesClient向k8s提交工作缩小对应向k8s 提交工作的资源量更新backend外部的对应job状态为Running状态,如果该存在job状态为Runnnig状态,则更新对应的job状态为updated状态podsWatchSnapshotSource 监控方才提交的工作,获取工作更新的状态,存储到podSnapshotStore中,以便后续工作的解决podsPollingSnapshotSource 定时拉取利用提交的所有工作,存储到podSnapshotStore中,以便进行final工作的清理podSnapshotStore 对工作状态更新为外部状态,并对订阅此podSnapshotStore的snapshot进行函数回调taskPodsLifecycleManager 订阅了上述的snapshot,对该snapshot进行解决: 1.如果工作状态为podFailed或者PodSucceeded时,更新backend job的内猪状态,如果存在对应的Running的job,调用k8s api删除该pod,以及删除该pod所占用的资源(cpus,mem等),如果存在对应updated的job状态,则把updated的状态更新为Running状态,避免外界工作的更新,导致工作的资源量更新不统一 2.调用kubernetesTaskSchedulerBackend的statusUpdate办法进行工作的更新进行解决UML类继承图 和spark on k8s的区别因为公司有本人的调度平台,所以次要从调度的粒度来进行比照: spark on k8s调度的是executor级别的,是粗粒度调度 k8s backend 调度的是job级别,每个job一个pod container,属于细粒度的精准调度 本文由博客群发一文多发等经营工具平台 OpenWrite 公布

October 16, 2020 · 1 min · jiezi

关于spark:Livy探究六-RPC的实现

Livy基于netty构建了一个RPC通信层。本篇咱们来探索一下Livy的RPC层的实现细节。读者该当具备netty编程的基础知识。 RPC相干的代码次要在rsc目录和org.apache.livy.rsc包中。 KryoMessageCodecKryo是一种对象序列化和反序列化工具。通信单方须要相互发送音讯,livy抉择了Kryo作为音讯的编解码器,并在netty框架中实现编码和解码接口: class KryoMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...} @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}}当申请音讯到来时,netty首先会调用decode对音讯进行解码;当音讯要发送到对端的最初关头,netty会调用encode对音讯进行编码。 SaslHandlerlivy的rpc通信反对基于sasl的认证。所以在livy的rpc实现中,有一个叫SaslHandler的SimpleChannelInboundHandler。在正式通信前,客户端和服务端须要通过一次认证的过程。这里不列举代码,然而将认证的过程做一个剖析。回顾一下第三篇中外围架构细节局部的时序图,一个session的创立过程为:livyServer启动一个RpcServer1和一个SparkSubmit(提交driver)。这时有个细节是,livyServer会生成一个clientId,记录在内存中,并把clientId通过配置文件传给driver。driver启动后要连贯RpcServer1,就要带上这个clientId。livy通过SaslMessage音讯来封装clientId: static class SaslMessage { final String clientId; final byte[] payload; SaslMessage() { this(null, null); } SaslMessage(byte[] payload) { this(null, payload); } SaslMessage(String clientId, byte[] payload) { this.clientId = clientId; this.payload = payload; }}driver会先发送SaslMessage给RpcServer1,livyServer收到后,从本人内存中寻找是否存在SaslMessage.clientId,如果存在就算认证通过了。driver接下来才得以进一步发送其余音讯。 所以,一个rpc信道的建设分为未认证阶段和认证实现阶段。livy是基于netty实现的通信层,咱们晓得netty是通过增加pipeline的形式增加解决环节的。在服务端实现bind,或者客户端实现connect后的pipeline是这样的: 客户端通过发送hello发动"认证"(认证的逻辑下面提到了)。认证实现后,SaslHandler会从pipeline中移除,并增加新的业务handler,称为RpcDispatcher。RpcDispatcher依据性能不同有不同的实现。上面的代码片段中,SaslHandler将本身从netty的pipeline中移除: abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {... @Override protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg) throws Exception { LOG.debug("Handling SASL challenge message..."); ... // If negotiation is complete, remove this handler from the pipeline, and register it with // the Kryo instance to handle encryption if needed. ctx.channel().pipeline().remove(this); ... }...}上面的代码片段,在netty中增加须要的RpcDispatcher: ...

October 7, 2020 · 1 min · jiezi

关于spark:Livy探究五-解释器的实现

本篇咱们深刻源码,探索一下livy解释器的实现原理。 ReplDriverReplDriver是真正最终运行的Driver程序对应的类(其基类是第三篇中提到的RSCDrvier)。在这一层,重点关注handle系列办法: def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = { ...}def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = { ...}这些办法其实负责解决各种类型的request,例如BaseProtocol.ReplJobRequest就是解决执行代码申请。后面有篇提到的RpcServer,负责基于netty启动服务端,并且绑定解决申请的类,其外部的dispatcher会负责通过反射,找到对应的handle办法并调用。 对于RPC,这里只是提一下,前面的篇章再跟大家一起剖析细节本篇的重点是探索REPL,所以咱们重点从BaseProtocol.ReplJobRequest解决办法跟入: def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = { session.execute(EOLUtils.convertToSystemEOL(msg.code), msg.codeType)}这里调用了session对象的execute,所以持续进去看session对象 SessionReplDriver持有Session对象的实例,在ReplDriver初始化阶段实例化,并调用了session.start()办法: session会创立SparkInterpreter,并调用SparkInterpreter.start。 session的execute办法最终会调用SparkInterpreter.execute。 SparkInterpreter在Livy中SparkInterpreter是一种Interpreter(接口)。同样是Interpreter的还有: PythonInterpreterSparkRInterpreterSQLInterpreter... SparkInterpreter.start次要干的事件就是初始化SparkILoop。SparkILoop是org.apache.spark.repl包下的类,它其实就是spark自身实现REPL的外围类。livy在这里其实只是包装了spark自身曾经实现的性能。另外一件事件,就是第三篇中提到的在解释器中bind变量,上面的代码就是bind变量的过程: 下面代码中的bind办法和execute办法就是外围办法,其实现办法就是间接调用SparkILoop的对应办法: // execute其实最初调到interpret// code就是要执行的代码override protected def interpret(code: String): Result = { sparkILoop.interpret(code)}// name: 变量名// tpe: 变量类型// value: 变量对象实在援用// modifier: 变量各种修饰符override protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { sparkILoop.beQuietDuring { sparkILoop.bind(name, tpe, value, modifier) }}到这里其实思路曾经比拟清晰了,咱们失去上面的档次关系图: ...

October 4, 2020 · 1 min · jiezi

关于spark:Livy探究四-从es读取数据

在后面的所有例子中,咱们只是运行了livy官网给的两个例子。这篇咱们要尝试运行一些有意义的代码。 如没有非凡阐明,当前所有的试验都是在yarn-cluster模式下运行的。咱们打算尝试运行上面代码: sparkSession.read.format("org.elasticsearch.spark.sql").options(Map( "es.nodes" -> "192.168.21.41:9200", "es.resource" -> "xxxxxxxxxxxxx")).load().show()这段代码用spark sql加载了elasticsearch的某个index,并应用show()打印几行数据。 为了实现这个试验,有两个问题必须解决: 大家晓得spark sql能够扩大DataSource,elasticsearch官网为spark开发的DataSource在elasticsearch-spark-20_2.11-x.x.x.jar外面。所以要运行下面的代码,必须保障这个jar包被正确加载到。在之前的例子中,咱们用sc示意以后的SparkContext对象,而这里咱们须要的是SparkSession对象。当初咱们还不晓得应该如何援用“以后SparkSession”对象。这两个问题,livy的文档没有波及。然而没关系,从源码外面找答案。 首先,种种迹象表明livy会主动将LIVY_HOME/rsc-jars目录下的jar包上传。于是咱们先把elasticsearch-spark-20_2.11-x.x.x.jar传到LIVY_HOME/rsc-jars目录下。 而后,从源码org/apache/livy/repl/AbstractSparkInterpreter.scala中能够找到SparkSession对象的bind: ...bind("spark", sparkEntries.sparkSession().getClass.getCanonicalName, sparkEntries.sparkSession(), List("""@transient"""))bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))execute("import org.apache.spark.SparkContext._")execute("import spark.implicits._")execute("import spark.sql")execute("import org.apache.spark.sql.functions._")...能够看到,这里将SparkSession对象bind到spark变量上,而把SparkContext对象bind到sc变量上。 于是咱们的代码应该写成: spark.read.format("org.elasticsearch.spark.sql").options(Map( "es.nodes" -> "192.168.21.41:9200", "es.resource" -> "xxxxxxxxxxxxx")).load().show()接下来,还是用python来提交代码运行: data = {'code': 'sc.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes" -> "192.168.21.41:9200", "es.resource" -> "777_zabbix_item2020_09_23_09_50_41")).load().show()'}r = requests.post(statements_url, data=json.dumps(data), headers=headers)从webui上查看运行后果: 能够看到show()成绩打印了后果 从spark-web-ui上找到环境页面,查看spark.yarn.dist.jars,能够看到,elasticsearch-spark-20_2.11-x.x.x.jar被加了进来: 总结从这个试验,咱们把握了自定义的jar包应该如何利用livy上传到集群上;还晓得了SparkSession对象bind的变量是spark。

October 3, 2020 · 1 min · jiezi

关于spark:Livy探究三-核心架构细节探索

在前两篇中,咱们通过官网的example体验了livy的性能,留下了一个疑难,到底livy是如何做到的呢?这一篇从源码外面找一下答案。 在间接剖析源码前,先把论断通过时序图画进去,有个直观的映像: 客户端创立session,LivyServer收到申请后启动一个RpcServer。RpcServer会程序抉择一个从10000~10010之间的可用端口启动监听,假如此时是10000。LivyServer随后通过SparkSubimit提交Application。Application在远端,最终会启动RSCDrvier。RSCDrvier首先也从10000~10010之间程序抉择一个可用的端口,启动RpcServer。图中打出了要害的日志,第二篇中已经提到过这个日志。RSCDrvier实现bind后,反向连贯到LivyServer端的RpcServer。图中打出了要害的日志,第二篇中同样提到过。RSCDrvier次要向LivyServer所在的RpcServer上报本人bind的端口和ip。这一步其实就是最要害的步骤。RpcServer收到申请后将RSCDrvier的端口和ip封装成ContextInfo返回给LivyServer。同时敞开RpcServer。LivyServer通过RSCDrvier的端口和ip连贯到RSCDriver,从而实现tcp连贯。至此session建设实现以上就是简化的外围工作原理,能够通过netstat证实一下网络连接关系。vm3198是livyServer,vm3196是driver所在机器 下图是driver上的相干连贯: 能够看到driver启用了10000端口的监听 下图是livyServer上的相干连贯: 能够看到livyServer有一条连贯到driver的链路,端口是能够对应上的 读者可能留神到,这里Driver的监听端口并不是10001,而是10000。这是因为尽管livyServer会率先占用10000,但因为Driver与livyServer不在一台机器上,所以对于Driver来说,10000过后并没有被占用,所以就应用10000端口了留神到,咱们在livyServer上并没有找到10000端口的监听。这是因为,一旦driver将本人的地址回发过来(通过回发RemoteDriverAddress音讯),livyServer的Rpc监听就敞开了。 读者可能会思考,RSCDrvier是如何晓得livyServer的Rpc监听端点的呢?答案在启动Spark工作时上送的配置文件,咱们摘取其中要害的配置项: spark.__livy__.livy.rsc.launcher.address=vm3198spark.__livy__.livy.rsc.launcher.port=10000launcher.address/port;就是livyServer启动的Rpc监听端点。从RSCDriver的源码能够看到,程序从配置文件中读取了信息: ...// Address for the RSC driver to connect back with it's connection info.LAUNCHER_ADDRESS("launcher.address", null)LAUNCHER_PORT("launcher.port", -1)...String launcherAddress = livyConf.get(LAUNCHER_ADDRESS);int launcherPort = livyConf.getInt(LAUNCHER_PORT);...LOG.info("Connecting to: {}:{}", launcherAddress, launcherPort);总结本篇咱们探索了livy的外围工作机制,理解了建设session时,在livyServer和Driver之间是如何建设连贯关系的。更多对于rpc通信的细节有机会还会再基于源码具体开展剖析

October 3, 2020 · 1 min · jiezi

关于spark:Livy探究二-运行模式

上一篇的例子采纳Livy默认的Local模式运行Spark工作。本篇咱们尝试一下其余模式。因为批改配置文件须要重启,而每次重启服务都要设置SPARK_HOME和HADOOP_CONF_DIR比拟麻烦。咱们首先批改一下conf/livy-env.sh: cp conf/livy-env.sh.template conf/livy-env.shvi conf/livy-env.sh# 将环境变量增加进来,每次重启会主动使环境变量失效HADOOP_CONF_DIR=/etc/hadoop/confSPARK_HOME=/home/spark-2.4.1-bin-hadoop2.7许多apache软件都采纳同样的套路,例如spark, zeppelin。所以弄的货色多了,即便没有文档领导也能猜测出配置的形式Standalone集群模式首先咱们须要部署一个spark的standalone集群,此处略过部署的过程。假如集群的master地址如下: spark://vm3198:7077批改conf/livy.conf,设置如下参数,指向部署好的spark集群: livy.spark.master = spark://vm3198:7077重启服务 bin/livy-server stopbin/livy-server start用第一篇中的命令创立session,并运行两个例子,能够发现是可能胜利的,这里略过这个过程了。重点来看一看提交到集群上的利用。察看spark集群上的利用咱们看到livy在集群上提交了一个application叫livy-session-0: 这个session关联了1个driver和2个executor: driver其实运行在livy所在的服务器上,作为livy的子过程,由livy治理。尽管从过程关系上与local模式没什么区别。然而咱们晓得,实质上,local模式其实是在一个过程中通过多个线程来运行driver和executor的;而standalone模式时,这个过程仅仅运行driver,而真正的job是在spark集群运行的。显然,standalone模式更正当一些。 笔者尝试通过批改livy.spark.deploy-mode = cluster,然而这种模式下无奈胜利运行session。所以standalone模式中,只能采纳client模式yarn模式咱们晓得,生产环境最好配合yarn来运行spark工作。所以必须要试验一下yarn模式。因为yarn-client模式实质上与standalone区别不大。所以间接抉择yarn-cluster模式。 批改conf/livy.conf,设置如下参数,设置yarn-cluster模式: livy.spark.master = yarn-cluster起初通过日志发现Warning: Master yarn-cluster is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead. 所以更好的配置是livy.spark.master = yarn,并且 livy.spark.deploy-mode = cluster因为咱们提前设置了HADOOP_CONF_DIR,所以显然livy是能够晓得yarn的RM地位的。重启livy后,创立一个session。咱们通过yarn的webui界面能够看到启动的Spark利用: 进一步到spark界面查看executor: 留神到这次同样启动了1个driver和2个executor,然而区别在于driver并不是启动在livy所在服务器的。这与yarn-cluster模式的行为始终。 再次查看livy的webui,看到刚刚创立的这个利用: 这里留神到一个细节,Logs列有两个链接,一个是session,一个是drvier。点进去看,能够察觉到: session日志显示的是提交spark工作时client打印的日志drvier日志跳转到yarn日志,显示的是driver运行输入的日志进一步,咱们还是通过python代码提交两个job。查看ui界面看到两个工作曾经执行胜利: 查看livy服务器上与livy无关的过程,之前无论是local模式还是standalone模式都存在一个SparkSubmit过程。而这次,在yarn-cluster模式下,并没有这个过程。那么问题来了,咱们通过restful接口,提交的代码,到底是如何传输到driver过程,并执行的呢?察看日志咱们大略找到写蛛丝马迹。在driver端,找到如下日志: ...20/10/01 20:04:18 INFO driver.RSCDriver: Connecting to: vm3198:1000020/10/01 20:04:18 INFO driver.RSCDriver: Starting RPC server...20/10/01 20:04:18 INFO rpc.RpcServer: Connected to the port 10001...带着这些问题,下一篇咱们一起去源码中找一下线索。 ...

October 2, 2020 · 1 min · jiezi

关于spark:如何实现Spark-on-Kubernetes

简介: 大数据时代,以Oracle为代表的数据库中间件曾经逐步无奈适应企业数字化转型的需要,Spark将会是比拟好的大数据批处理引擎。而随着Kubernetes越来越火,很多数字化企业曾经把在线业务搬到了Kubernetes之上,并心愿在此之上建设一套对立的、残缺的大数据基础架构。那么Spark on Kubernetes面临哪些挑战?又该如何解决? 云原生背景介绍与思考“数据湖”正在被越来越多人提起,只管定义并不对立,但企业已纷纷投入实际,无论是在云上自建还是应用云产品。 阿里云大数据团队认为:数据湖是大数据和AI时代交融存储和计算的全新体系。为什么这么说?在数据量爆发式增长的明天,数字化转型成为IT行业的热点,数据须要更深度的价值开掘,因而确保数据中保留的原始信息不失落,应答将来一直变动的需要。以后以Oracle为代表的数据库中间件曾经逐步无奈适应这样的需要,于是业界也一直地产生新的计算引擎,以便应答大数据时代的到来。企业开始纷纷自建开源Hadoop数据湖架构,原始数据对立寄存在对象存储OSS或HDFS零碎上,引擎以Hadoop和Spark开源生态为主,存储和计算一体。 图1是基于ECS底座的EMR架构,这是一套十分残缺的开源大数据生态,也是近10年来每个数字化企业必不可少的开源大数据解决方案。 次要分为以下几层: ECS物理资源层,也就是Iaas层。数据接入层,例如实时的Kafka,离线的Sqoop。存储层,包含HDFS和OSS,以及EMR自研的缓存减速JindoFS。计算引擎层,包含熟知的Spark,Presto、Flink等这些计算引擎。数据应用层,如阿里自研的Dataworks、PAI以及开源的Zeppelin,Jupyter。每一层都有比拟多的开源组件与之对应,这些层级组成了最经典的大数据解决方案,也就是EMR的架构。咱们对此有以下思考: 是否可能做到更好用的弹性,也就是客户能够齐全依照本人业务理论的峰值和低谷进行弹性扩容和缩容,保障速度足够快,资源足够短缺。不思考现有情况,看将来几年的倒退方向,是否还须要反对所有的计算引擎和存储引擎。这个问题也十分理论,一方面是客户是否有能力保护这么多的引擎,另一方面是某些场景下是否用一种通用的引擎即可解决所有问题。举个例子来说,Hive和Mapreduce,诚然现有的一些客户还在用Hive on Mapreduce,而且规模也的确不小,然而将来Spark会是一个很好的替代品。存储与计算拆散架构,这是公认的将来大方向,存算拆散提供了独立的扩展性,客户能够做到数据入湖,计算引擎按需扩容,这样的解耦形式会失去更高的性价比。基于以上这些思考,咱们考虑一下云原生的这个概念,云原生比拟有前景的实现就是Kubernetes,所以有时候咱们一提到云原生,简直就等价于是Kubernetes。随着Kubernetes的概念越来越火,客户也对该技术充斥了趣味,很多客户曾经把在线的业务搬到了Kubernetes之上,并且心愿在这种相似操作系统上,建设一套对立的、残缺的大数据基础架构。所以咱们总结为以下几个特色: 心愿可能基于Kubernetes来容纳在线服务、大数据、AI等基础架构,做到运维体系统一化。 存算拆散架构,这个是大数据引擎能够在Kubernetes部署的前提,将来的趋势也都在向这个方向走。 通过Kubernetes的天生隔离个性,更好的实现离线与在线混部,达到降本增效目标。 Kubernetes生态提供了十分丰盛的工具,咱们能够省去很多工夫搞根底运维工作,从而能够分心来做业务。 EMR计算引擎 on ACK图2是EMR计算引擎 on ACK的架构。ACK就是阿里云版本的Kubernetes,在兼容社区版本的API同时,做了大量的优化。在本文中不会辨别ACK和Kubernetes这两个词,能够认为代表同一个概念。 基于最开始的探讨,咱们认为比拟有心愿的大数据批处理引擎是Spark和Presto,当然咱们也会随着版本迭代逐渐的退出一些比拟有前景的引擎。 EMR计算引擎提供以Kubernetes为底座的产品状态,实质上来说是基于CRD+Operator的组合,这也是云原生最根本的哲学。咱们针对组件进行分类,分为service组件和批处理组件,比方Hive Metastore就是service组件,Spark就是批处理组件。 图中绿色局部是各种Operator,技术层面在开源的根底上进行了多方面的改良,产品层面针对ACK底座进行了各方面的兼容,可能保障用户在集群中很不便的进行管控操作。左边的局部,包含Log、监控、数据开发、ECM管控等组件,这里次要是集成了阿里云的一些基础设施。咱们再来看下边的局部: 引入了JindoFS作为OSS缓存减速层,做计算与存储拆散的架构。买通了现有EMR集群的HDFS,不便客户利用已有的EMR集群数据。引入Shuffle Service来做Shuffle 数据的解耦,这也是EMR容器化区别于开源计划的比拟大的亮点,之后会重点讲到。这里明确一下,因为自身Presto是无状态的MPP架构,在ACK中部署是绝对容易的事件,本文次要探讨Spark on ACK的解决方案。 Spark on Kubernetes的挑战整体看,Spark on Kubernetes面临以下问题: 我集体认为最重要的,就是Shuffle的流程,依照目前的Shuffle形式,咱们是没方法关上动静资源个性的。而且还须要挂载云盘,云盘面临着Shuffle数据量的问题,挂的比拟大会很节约,挂的比拟小又反对不了Shuffle Heavy的工作。调度和队列治理问题,调度性能的掂量指标是,要确保当大量作业同时启动时,不应该有性能瓶颈。作业队列这一概念对于大数据畛域的同学应该十分相熟,他提供了一种治理资源的视图,有助于咱们在队列之间管制资源和共享资源。读写数据湖相比拟HDFS,在大量的Rename,List等场景下性能会有所降落,同时OSS带宽也是一个不可避免的问题。针对以上问题,咱们别离来看下解决方案。 Spark on Kubernetes的解决方案Remote Shuffle Service架构Spark Shuffle的问题,咱们设计了Shuffle 读写拆散架构,称为Remote Shuffle Service。首先探讨一下为什么Kubernetes不心愿挂盘呢,咱们看一下可能的选项: 如果用是Docker的文件系统,问题是不言而喻的,因为性能慢不说,容量也是极其无限,对于Shuffle过程是非常不敌对的。如果用Hostpath,相熟Spark的同学应该晓得,是不可能启动动静资源个性的,这个对于Spark资源是一个很大的节约,而且如果思考到后续迁徙到Serverless K8s,那么从架构上自身就是不反对Hostpath的。如果是Executor挂云盘的PV,同样情理,也是不反对动静资源的,而且须要提前晓得每个Executor的Shuffle数据量,挂的大比拟节约空间,挂的小Shuffle数据又不肯定可能包容下。所以Remote Shuffle架构针对这一痛点、对现有的Shuffle机制有比拟大的优化,图3两头有十分多的控制流,咱们不做具体的探讨。次要来看数据流,对于Executor所有的Mapper和Reducer,也就是图中的蓝色局部是跑在了K8s容器里,两头的架构是Remote Shuffle Service,蓝色局部的Mapper将Shuffle数据近程写入service里边,打消了Executor的Task对于本地磁盘的依赖。Shuffle Service会对来自不同Mapper的同一partition的数据进行merge操作,而后写入到分布式文件系统中。等到Reduce阶段,Reducer通过读取程序的文件,能够很好地晋升性能。这套零碎最次要的实现难点就是控制流的设计,还有各方面的容错,数据去重,元数据管理等等工作。 简而言之,咱们总结为以下3点: Shuffle数据通过网络写出,两头数据计算与存储拆散架构DFS 2正本,打消Fetch Failed引起的重算,Shuffle Heavy作业更加稳固Reduce阶段程序读磁盘,防止现有版本的随机IO,大幅晋升性能Remote Shuffle Service性能咱们在这里展现一下对于性能的问题,图4和图5是Terasort Benchmark。之所以选取Terasrot这种workload来测试,是因为它只有3个stage,而且是一个大Shuffle的工作,大家能够十分有体感的看出对于Shuffle性能的变动。 图4中,蓝色局部是Shuffle Service版本的运行工夫,橙色局部是原版Shuffle的运行工夫。咱们测试了2T,4T,10T的数据,能够察看到随着数据量越来越大,Shuffle Service劣势就越来越显著。图5红框局部阐明了作业的性能晋升次要体现在Reduce阶段,可见10T的Reduce Read从1.6小时降落到了1小时。起因前边曾经解释的很分明了,相熟Spark shuffle机制的同学晓得,原版的sort shuffle是M*N次的随机IO,在这个例子中,M是12000,N是8000,而Remote Shuffle就只有N次程序IO,这个例子中是8000次,所以这是晋升性能的基本所在。 ...

September 28, 2020 · 1 min · jiezi

关于spark:案例解析丨-Spark-Hive-自定义函数应用

摘要:Spark目前反对UDF,UDTF,UDAF三种类型的自定义函数。1. 简介 Spark目前反对UDF,UDTF,UDAF三种类型的自定义函数。UDF应用场景:输出一行,返回一个后果,一对一,比方定义一个函数,性能是输出一个IP地址,返回一个对应的省份。UDTF应用场景: 输出一行,返回多行(hive),一对多, 而sparkSQL中没有UDTF, spark中用flatMap即可实现该性能。UDAF: 输出多行,返回一行, aggregate(次要用于聚合性能,比方groupBy,count,sum), 这些是spark自带的聚合函数,然而简单绝对简单。 Spark底层其实以CatalogFunction构造封装了一个函数,其中FunctionIdentifier形容了函数名字等根本信息,FunctionResource形容了文件类型(jar或者file)和文件门路;Spark的SessionCatalog提供了函数注册,删除,获取等一些列接口,Spark的Executor在接管到函数执行sql申请时,通过缓存的CatalogFunction信息,找到CatalogFunction中对应的jar地址以及ClassName, JVM动静加载jar,并通过ClassName反射执行对应的函数。 图1. CatalogFunction构造体 图2. 注册加载函数逻辑 Hive的HiveSessionCatalog是继承Spark的SessionCatalog,对Spark的基本功能做了一层装璜以适配Hive的基本功能,其中包含函数性能。HiveSimpleUDF对应UDF,HiveGenericUDF对应GenericUDF,HiveUDAFFunction对应AbstractGenericUDAFResolve以及UDAF,HiveGenericUDTF对应GenericUDTF 图3. Hive装璜spark函数逻辑 2. UDF UDF是最罕用的函数,应用起来绝对比较简单,次要分为两类UDF:简略数据类型,继承UDF接口;简单数据类型,如Map,List,Struct等数据类型,继承GenericUDF接口。 简略类型实现UDF时,可自定义若干个名字evaluate为的办法,参数和返回类型依据须要本人设置。因为UDF接口默认应用DefaultUDFMethodResolver去办法解析器获取办法,解析器是依据用户输出参数和写死的名字evaluate去反射寻找办法元数据。当然用户也能够自定义解析器解析办法。 图4. 自定义UDF简略示例 图5.默认UDF办法解析器 3. UDAF UDAF是聚合函数,目前实现形式次要有三种:实现UDAF接口,比拟老的简答实现形式,目前曾经被废除;实现UserDefinedAggregateFunction,目前应用比拟广泛形式,按阶段实现接口汇集数据;实现AbstractGenericUDAFResolver,实现绝对UserDefinedAggregateFunction形式略微简单点,还须要实现一个计算器Evaluator(如通用计算器GenericUDAFEvaluator),UDAF的逻辑解决次要产生在Evaluator。 UserDefinedAggregateFunction定义输入输出数据结构,实现初始化缓冲区(initialize),聚合单条数据(update),聚合缓存区(merge)以及计算最终后果(evaluate)。 图6.自定义UDAF简略示例 4. UDTF UDTF简略粗犷的了解是一行生成多行的主动函数,能够生成多行多列,又被称为表生成函数。目前实现形式是实现GenericUDTF接口,实现2个接口,initialize接口参数校验,列的定义,process接口承受一行数据,切割数据。 图7.自定义UDTF简略示例

September 14, 2020 · 1 min · jiezi

关于spark:SAP-Fiori-Elements如何基于domain-fixed-value创建下拉菜单

Several days ago I wrote a blog How to build a drop down list using Smart template + CDS view which introduces how to create a drop down list based on values from a backend table. For example, the status list in above screenshot comes from the three entries in database table ZSTATUS_FIXEDVAL. And now a new requirement is to use the fixed value defined in an ABAP domain instead. ...

September 4, 2020 · 4 min · jiezi

关于spark:Spark任务的coreexecutormemory资源配置方法

动态调配:OS(操作系统预留) 1 core 1Gcore 并发能力 <=5executor AM预留1个executor 余executor=总executor-1memory 预留每个executor0.07比例MemoryOverhead max(384M, 0.07 × spark.executor.memory)ExecutorMemory (总m-1G(OS))/nodes_num-MemoryOverhead 例子1 硬件资源: 6 节点,每个节点16 cores, 64 GB 内存 每个节点在计算资源时候,给操作系统和Hadoop的过程预留1core,1GB,所以每个节点剩下15个core和63GB 内存。 core的个数,决定一个executor可能并发工作的个数。所以通常认为,一个executor越多的并发工作可能失去更好的性能,但有钻研显示一个利用并发工作超过5,导致更差的性能。所以core的个数暂设置为5个。 5个core是表明executor并发工作的能力,并不是说一个零碎有多少个core,即便咱们一个CPU有32个core,也设置5个core不变。 executor个数,接下来,一个executor调配 5 core,一个node有15 core,从而咱们计算一个node上会有3 executor(15 / 5),而后通过每个node的executor个数失去整个工作能够调配的executors个数。 咱们有6个节点,每个节点3个executor,6 × 3 = 18个executors,额定预留1个executor给AM,最终要配置17个executors。 最初spark-submit启动脚本中配置 –num-executors = 17 memory,配置每个executor的内存,一个node,3 executor, 63G内存可用,所以每个executor可配置内存为63 / 3 = 21G 从Spark的内存模型角度,Executor占用的内存分为两局部:ExecutorMemory和MemoryOverhead,预留出MemoryOverhead的内存量之后,才是ExecutorMemory的内存。 MemoryOverhead的计算公式: max(384M, 0.07 × spark.executor.memory) 因而 MemoryOverhead值为0.07 × 21G = 1.47G > 384M最终executor的内存配置值为 21G – 1.47 ≈ 19 GB至此, Cores = 5, Executors= 17, Executor Memory = 19 GB ...

August 27, 2020 · 1 min · jiezi

关于spark:Spark-javascala项目打包jar

1.法1:maven打包pom.xml文件 <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 此处指定main办法入口的class --> <mainClass>ch.kmeans2.SparkStreamingKMeansKafkaExample</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <id>scala-compile-first</id> <goals> <goal>compile</goal> </goals> <configuration> <includes> <include>**/*.scala</include> </includes> </configuration> </execution> <execution> <id>scala-test-compile</id> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins>而后应用mvn clean package命令进行打包,如果提交到集群运行遇到Exception in thread "main" java.lang.AbstractMethodError,思考我的项目中spark的版本和集群运行版本是否始终2.法2:idea打包Project->Module Setting->Artifacts而后Build Artifact输入jar包

August 18, 2020 · 1 min · jiezi

关于spark:Spark-javascala项目打包jar

1.法1:maven打包pom.xml文件 <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 此处指定main办法入口的class --> <mainClass>ch.kmeans2.SparkStreamingKMeansKafkaExample</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <id>scala-compile-first</id> <goals> <goal>compile</goal> </goals> <configuration> <includes> <include>**/*.scala</include> </includes> </configuration> </execution> <execution> <id>scala-test-compile</id> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins>而后应用mvn clean package命令进行打包,如果提交到集群运行遇到Exception in thread "main" java.lang.AbstractMethodError,思考我的项目中spark的版本和集群运行版本是否始终2.法2:idea打包Project->Module Setting->Artifacts而后Build Artifact输入jar包

August 18, 2020 · 1 min · jiezi

关于spark:Spark优化之小文件是否需要合并

咱们晓得,大部分Spark计算都是在内存中实现的,所以Spark的瓶颈个别来自于集群(standalone, yarn, mesos, k8s)的资源缓和,CPU,网络带宽,内存。Spark的性能,想要它快,就得充分利用好系统资源,尤其是内存和CPU。有时候咱们也须要做一些优化调整来缩小内存占用,例如将小文件进行合并的操作。 一、问题景象咱们有一个15万条总数据量133MB的表,应用SELECT * FROM bi.dwd_tbl_conf_info全表查问耗时3min,另外一个500万条总数据量6.3G的表ods_tbl_conf_detail,查问耗时23秒。两张表均为列式存储的表。 大表查问快,而小表反而查问慢了,为什么会产生如此奇怪的景象呢? 二、问题探询数据量6.3G的表查问耗时23秒,反而数据量133MB的小表查问耗时3min,这十分奇怪。咱们收集了对应的建表语句,发现两者没有太大的差别,大部分为String,两表的列数也相差不大。 CREATE TABLE IF NOT EXISTS `bi`.`dwd_tbl_conf_info` ( `corp_id` STRING COMMENT '', `dept_uuid` STRING COMMENT '', `user_id` STRING COMMENT '', `user_name` STRING COMMENT '', `uuid` STRING COMMENT '', `dtime` DATE COMMENT '', `slice_number` INT COMMENT '', `attendee_count` INT COMMENT '', `mr_id` STRING COMMENT '', `mr_pkg_id` STRING COMMENT '', `mr_parties` INT COMMENT '', `is_mr` TINYINT COMMENT 'R', `is_live_conf` TINYINT COMMENT '')CREATE TABLE IF NOT EXISTS `bi`.`ods_tbl_conf_detail` ( `id` string, `conf_uuid` string, `conf_id` string, `name` string, `number` string, `device_type` string, `j_time` bigint, `l_time` bigint, `media_type` string, `dept_name` string, `UPDATETIME` bigint, `CREATETIME` bigint, `user_id` string, `USERAGENT` string, `corp_id` string, `account` string )因为两张表均为很简略的SELECT查问操作,无任何简单的聚合join操作,也无UDF相干的操作,所以根本确认查问慢的应该产生的读表的时候,咱们将狐疑的点放到了读表操作上。通过查问两个查问语句的DAG和工作散布,咱们发现了不一样的中央。 ...

August 13, 2020 · 1 min · jiezi

关于spark:第七篇Spark平台下基于LDA的kmeans算法实现

本文次要在Spark平台下实现一个机器学习利用,该利用次要波及LDA主题模型以及K-means聚类。通过本文你能够理解到: 文本开掘的根本流程LDA主题模型算法K-means算法Spark平台下LDA主题模型实现Spark平台下基于LDA的K-means算法实现1.文本开掘模块设计1.1文本开掘流程文本剖析是机器学习中的一个很宽泛的畛域,并且在情感剖析、聊天机器人、垃圾邮件检测、举荐零碎以及自然语言解决等方面失去了广泛应用。 文本聚类是信息检索畛域的一个重要概念,在文本开掘畛域有着宽泛的利用。文本聚类可能主动地将文本数据集划分为不同的类簇,从而更好地组织文本信息,能够实现高效的常识导航与浏览。 本文抉择主题模型LDA(Latent Dirichlet Allocation)算法对文档进行分类解决,抉择在Spark平台上通过Spark MLlib实现LDA算法,其中Spark Mllib是Spark提供的机器学习库,该库提供了罕用的机器学习算法。其根本设计思路如下图所示: 1.2文本开掘流程剖析首先是数据源局部,次要的数据包含文档数据和互联网爬虫数据。而后是数据抽取局部,将曾经收集好的数据通过同步工具上传至分布式文件系统HDFS,作为模型训练的数据源。其次是数据摸索与预处理局部,该局部次要是对原始数据集进行预处理,包含分词、停用词过滤、特征向量提取等。再次是模型训练局部,次要包含训练与测试,从而失去一个模型。最初是模型评估,对学得模型进行评估之后,进行线上部署。 2.文本开掘模块算法钻研2.1LDA主题模型算法LDA(Latent Dirichlet allocation)由David M. Blei,Andrew Y. Ng,Michael I. Jordan于2003年提出的基于概率模型的主题模型算法,即隐含狄利克雷散布,它能够将文档集中每篇文档的主题以概率分布的模式给出,将文本向量投射到更容易剖析解决的主题空间当中,去除文本中存在的噪声,是一种罕用的文本剖析技术,能够用来辨认大规模文档集或语料库中潜在的主题信息,通常被用来对大规模文档数据进行建模。通过主题模型和设定的主题数,能够训练出文档汇合中不同的主题所占的比例以及每个主题下的要害词语呈现的概率。从文档汇合中学习失去主题散布和主题比例,能够进一步在数据挖掘工作中应用。 LDA借用词袋的思维,以某一概率选取某个主题,再以某一概率选出主题中的每个单词,通过一直反复该步骤产生文档中的所有语词。该办法对词汇进行了含糊聚类,汇集到一类的词能够间接地示意一个隐含的主题。LDA对文本信息进行了开掘,能用来掂量不同文档之间的潜在关系,也能通过某一类词来表白文档中暗藏的主题。 2.2K均值算法聚类(Clustering)是一种将数据集划分为若干组或类的办法。通过聚类过程将一群形象的对象分为若干组,每一组由类似的对象形成,称之为一个类别。与分类不同(将数据依照当时定义好的分类规范进行划分),聚类是一种无监督学习(unsupervised learning),训练数据集的标签信息是未知的,指标是通过对无标记训练样本依照特定的测度的形似性水平进行聚合,为进一步数据分析提供根底。 K均值(k-means)算法的根本思维是初始随机给定K 个簇核心,即从n个数据对象中抉择k个任意对象作为初始的簇核心,依照最邻近准则把待分类样本点分到各个簇。而后按平均法从新计算各个簇的核心(该类别中的所有数据对象的均值),从而确定新的簇心。始终迭代,直到簇心的挪动间隔小于某个给定的值。 K均值算法采纳了贪婪策略,通过迭代优化来近似求解上式E值,算法流程如下图所示 2.3文本开掘算法优化LDA主题模型算法利用于文档聚类,计算得出的主题能够看做是文档的聚类核心,利用主题模型进行文档聚类,能够无效地组织文档数据集。同时,因为LDA主题模型能够计算出每篇文档在不同主题下的概率分布,因而能够将此主题的概率分布作为文档的特征向量,从而将高维的文档向量投影到低维的特色空间中。 计算文本之间的间隔是传统的K-means算法在进行文本聚类时的关键步骤,而文本通常是非结构化数据,构建的文本向量具备稠密性和维度高的特点,同时,构建文本特征向量并未思考到文字之间的语义关系,因而可能会造成位于同一类簇的文本之间具备非相似性。 因而本文基于LDA主题模型改良K-means算法,首先通过LDA主题模型对文档数据集进行建模,挖掘出每篇文档的主题概率分布,既可能达到文档降维和去除噪声的成果,又能补救通过关键词构建文档特征向量容易造成失落信息的缺点。最初每篇文档的主题概率分布作为K-means算法的输出数据集。 3.试验剖析3.1基于Spark的LDA主题模型算法实现数据集介绍抉择Newsgroups数据集作为该试验的训练集和测试集。Newgroups是一个新闻数据集,该数据集包含大概20000篇新闻文档,总共分为6个大类别,每个大类别又分不同的小类别,小类别共计20个,如下表所示。该新闻数据集曾经成为了学界和业界在机器学习的文本开掘试验中罕用的数据集,比方文本分类和文本聚类。 该数据集共蕴含7个文件,其中3个文件为训练数据(train.data、train.label、train.map),共计11269篇,另外3个文件为测试数据(test.data、test.label、test.map),共计7505篇,另外一个文件为词汇表(vocabulary.txt),其第i行示意编号为i的单词的名称。文件扩大名为.data的文件格式为[docIdx wordIdx count],其中docIdx示意文档编号,wordIdx示意词语的编号,count示意该词语的词频统计。文件扩大名为.label的文件示意文档的主题分类,每行数据代表某篇文档的类别。文件扩大名为.map的文示意类别编号与类别名称的映射关系,其具体格局为[labelName labelId]。 原始数据集解决原始的数据集格局为[docIdx wordIdx count],例如[1,20,2]示意在编号为1的文档中,编号为20的词语的词频是2。LDA承受的参数格局为: [label,(vector_ size, [wiIdx,wjIdx,···wnIdx ],[tfi,tfj,···tfn])]上述格局的数据代表一个带有标签的稠密向量,其中label示意文档编号,vector_ size示意向量的维度,wnIdx示意词n的索引编号,tfn示意词n的词频。须要将原始数据转换成上述的格局,具体步骤如下: Step1:将原始数据集上传至HDFS[kms@kms-1 ~]$ hdfs dfs -put /opt/modules/train_data/lda/train.data /train/lda/Step2:初始化SparkSession并加载数据val spark = SparkSession .builder .appName(s"${this.getClass.getSimpleName}") .getOrCreate() //设置日志级别 Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) //加载原始数据集val rowDS = spark.read.textFile("/train/lda/train.data") Step3:数据集矩阵变换处理//创立形如MatrixEntry(row_index, column_index, value)的MatrixEntryval matrixEntry:RDD[MatrixEntry] = rowDS.rdd.map(_.split(" ")) .map(rowdata => MatrixEntry(rowdata(0).toLong,rowdata(1).toLong,rowdata(2).toDouble))//创立稠密矩阵val sparseMatrix: CoordinateMatrix = new CoordinateMatrix(matrixEntry)//创立LabeledPoint数据集val labelPointData = sparseMatrix.toIndexedRowMatrix.rows.map(r => (r.index, r.vector.asML))val corpusDF = spark.createDataFrame(labelPointData).toDF("label","features")corpusDF.saveAsTextFile("/tarin/lda/labelPointData")解决之后的局部数据集如下所示,其中一行代表一篇文档的特征向量 ...

August 8, 2020 · 2 min · jiezi

关于spark:第六篇Spark-MLlib机器学习1

MLlib是Spark提供的一个机器学习库,通过调用MLlib封装好的算法,能够轻松地构建机器学习利用。它提供了十分丰盛的机器学习算法,比方分类、回归、聚类及举荐算法。除此之外,MLlib对用于机器学习算法的API进行了标准化,从而使将多种算法组合到单个Pipeline或工作流中变得更加容易。通过本文,你能够理解到: 什么是机器学习大数据与机器学习机器学习分类Spark MLLib介绍机器学习是人工智能的一个分支,是一门多畛域交叉学科,波及概率论、统计学、迫近论、凸剖析、计算复杂性实践等多门学科。机器学习实践次要是设计和剖析一些让计算机能够主动“学习”的算法。因为学习算法中波及了大量的统计学实践,机器学习与推断统计学分割尤为亲密,也被称为统计学习实践。算法设计方面,机器学习实践关注能够实现的,卓有成效的学习算法。起源:Mitchell, T. (1997). Machine Learning. McGraw Hill. 什么是机器学习 机器学习的利用已遍布人工智能的各个分支,如专家系统、主动推理、自然语言了解、模式识别、计算机视觉、智能机器人等畛域。机器学习是人工智能的一个分支学科,次要钻研的是让机器从过来的经验中学习教训,对数据的不确定性进行建模,对将来进行预测。机器学习利用的畛域很多,比方搜寻、举荐零碎、垃圾邮件过滤、人脸识别、语音辨认等等。 大数据与机器学习大数据时代,数据产生的速度是十分惊人的。互联网、挪动互联网、物联网、GPS等等都会在无时无刻产生着数据。解决这些数据所须要的存储与计算的能力也在成几何级增长,由此诞生了一系列的以Hadoop为代表的大数据技术,这些大数据技术为解决和存储这些数据提供了牢靠的保障。 数据、信息、常识是由大到小的三个档次。单纯的数据很难说明一些问题,须要加之人们的一些教训,将其转换为信息,所谓信息,也就是为了打消不确定性,咱们常说信息不对称,指的就是在不可能获取足够的信息时,很难打消一些不确定的因素。而常识则是最高阶段,所以数据挖掘也叫常识发现。 机器学习的工作就是利用一些算法,作用于大数据,而后开掘背地所蕴含的潜在的常识。训练的数据越多,机器学习就越能体现出劣势,以前机器学习解决不了的问题,当初通过大数据技术能够失去很好的解决,性能也会大幅度晋升,如语音辨认、图像识别等等。 机器学习分类机器学习次要分为上面几大类: 监督学习(supervised learning)基本上是分类的同义词。学习中的监督来自训练数据集中标记的实例。比方,在邮政编码辨认问题中,一组手写邮政编码图像与其对应的机器可读的转换物用作训练实例,监督分类模型的学习。常见的监督学习算法包含:线性回归、逻辑回归、决策树、奢侈贝叶斯、反对向量机等等。 无监督学习(unsupervised learning)实质上是聚类的同义词。学习过程是无监督的,因为输出实例没有类标记。无监督学习的工作是从给定的数据集中,挖掘出潜在的构造。比方,把猫和狗的照片给机器,不给这些照片打任何标签,然而心愿机器可能将这些照片分分类,最终机器会把这些照片分为两大类,然而并不知道哪些是猫的照片,哪些是狗的照片,对于机器来说,相当于分成了 A、B 两类。常见的无监督学习算法包含:K-means 聚类、主成分剖析(PCA)等。 半监督学习(Semi-supervised learning)半监督学习是一类机器学习技术,在学习模型时,它应用标记的和未标记的实例。让学习器不依赖外界交互、主动地利用未标记样本来晋升学习性能,就是半监督学习。 半监督学习的事实需要十分强烈,因为在事实利用中往往能容易地收集到大量未标记样本,而获取标记却需消耗人力、物力。例如,在进行计算机辅助医学影像剖析时,能够从医院取得大量医学影像, 但若心愿医学专家把影像中的病灶全都标识进去则是不事实的有标记数据少,未标记数据多这个景象在互联网利用中更显著,例如在进行网页举荐时需请用户标记出感兴趣的网页, 但很少有用户愿花很多工夫来提供标记,因而,有标记网页样本少,但互联网上存在有数网页可作为未标记样本来应用。 强化学习(reinforcement learning)又称再励学习、评估学习,是一种重要的机器学习办法,在智能管制机器人及剖析预测等畛域有许多利用。强化学习的常见模型是规范的马尔可夫决策过程(Markov Decision Process, MDP)。 Spark MLLib介绍MLlib是Spark的机器学习库,通过该库能够简化机器学习的工程实际工作。MLlib蕴含了十分丰盛的机器学习算法:分类、回归、聚类、协同过滤、主成分剖析等等。目前,MLlib分为两个代码包:spark.mllib与spark.ml。 spark.mllibSpark MLlib是Spark的重要组成部分,是最后提供的一个机器学习库。该库有一个毛病:如果数据集非常复杂,须要做屡次解决,或者是对新数据须要联合多个曾经训练好的单个模型进行综合计算时,应用Spark MLlib会使程序结构变得复杂,甚至难以了解和实现。 spark.mllib是基于RDD的原始算法API,目前处于保护状态。该库下蕴含4类常见的机器学习算法:分类、回归、聚类、协同过滤。指的留神的是,基于RDD的API不会再增加新的性能。 spark.mlSpark1.2版本引入了ML Pipeline,通过多个版本的倒退,Spark ML克服了MLlib解决机器学习问题的一些有余(简单、流程不清晰),向用户提供了基于DataFrame API的机器学习库,使得构建整个机器学习利用的过程变得简略高效。 Spark ML不是正式名称,用于指代基于DataFrame API的MLlib库 。与RDD相比,DataFrame提供了更加敌对的API。DataFrame的许多益处包含Spark数据源,SQL / DataFrame查问,Tungsten和Catalyst优化以及跨语言的对立API。 Spark ML API提供了很多数据特色处理函数,如特色选取、特色转换、类别数值化、正则化、降维等。另外基于DataFrame API的ml库反对构建机器学习的Pipeline,把机器学习过程一些工作有序地组织在一起,便于运行和迁徙。Spark官网举荐应用spark.ml库。 数据变换数据变换是数据预处理的一项重要工作,比方对数据进行规范化、离散化、衍生指标等等。Spark ML中提供了十分丰盛的数据转换算法,具体能够参考官网,现归纳如下: 下面的转换算法中,词频逆文档频率(TF-IDF)、Word2Vec、PCA是比拟常见的,如果你做过文本开掘解决,那么对此应该并不生疏。 数据规约大数据是机器学习的根底,为机器学习提供短缺的数据训练集。在数据量十分大的时候,须要通过数据规约技术删除或者缩小冗余的维度属性以来达到精简数据集的目标,相似于抽样的思维,尽管放大了数据容量,然而并没有扭转数据的完整性。Spark ML提供的特征选择和降维的办法如下表所示: 抉择特色和降维是机器学习中罕用的伎俩,能够应用上述的办法缩小特色的抉择,打消噪声的同时还可能维持原始的数据结构特色。尤其是主成分分析法(PCA),无论是在统计学畛域还是机器学习畛域,都起到了很重要的作用。 机器学习算法Spark反对分类、回归、聚类、举荐等罕用的机器学习算法。见下表: 总结本文对机器学习进行了总体介绍,次要包含机器学习的基本概念、机器学习的根本分类、Spark机器学习库的介绍。通过本文或者曾经对机器学习有了初步的理解,在下一篇,我将会分享基于Spark ML库构建一个机器学习的利用,次要波及LDA主题模型以及K-means聚类。 公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

August 8, 2020 · 1 min · jiezi

关于spark:第五篇SparkStreaming编程指南2

第四篇|Spark-Streaming编程指南(1)对Spark Streaming执行机制、Transformations与Output Operations、Spark Streaming数据源(Sources)、Spark Streaming 数据汇(Sinks)进行了探讨。本文将连续上篇内容,次要包含以下内容: 有状态的计算基于工夫的窗口操作长久化检查点Checkpoint应用DataFrames & SQL解决流数据有状态的计算updateStateByKey上一篇文章中介绍了常见的无状态的转换操作,比方在WordCount的例子中,输入的后果只与以后batch interval的数据无关,不会依赖于上一个batch interval的计算结果。spark Streaming也提供了有状态的操作: updateStateByKey,该算子会保护一个状态,同时进行信息更新 。该操作会读取上一个batch interval的计算结果,而后将其后果作用到以后的batch interval数据统计中。其源码如下: def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) }该算子只能在key–value对的DStream上应用,须要接管一个状态更新函数 updateFunc作为参数。应用案例如下: object StateWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName(StateWordCount.getClass.getSimpleName) val ssc = new StreamingContext(conf, Seconds(5)) // 必须开启checkpoint,否则会报错 ssc.checkpoint("file:///e:/checkpoint") val lines = ssc.socketTextStream("localhost", 9999) // 状态更新函数 def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = { var oldvalue = stateValue.getOrElse(0) // 获取状态值 // 遍历以后数据,并更新状态 for (newValue <- newValues) { oldvalue += newValue } // 返回最新的状态 Option(oldvalue) } val count = lines.flatMap(_.split(" ")) .map(w => (w, 1)) .updateStateByKey(updateFunc) count.print() ssc.start() ssc.awaitTermination() }}尖叫提醒:下面的代码必须要开启checkpoint,否则会报错:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint() ...

August 8, 2020 · 4 min · jiezi

关于spark:第四篇Spark-Streaming编程指南1

Spark Streaming是构建在Spark Core根底之上的流解决框架,是Spark十分重要的组成部分。Spark Streaming于2013年2月在Spark0.7.0版本中引入,倒退至今曾经成为了在企业中宽泛应用的流解决平台。在2016年7月,Spark2.0版本中引入了Structured Streaming,并在Spark2.2版本中达到了生产级别,Structured Streaming是构建在Spark SQL之上的流解决引擎,用户能够应用DataSet/DataFreame API进行流解决,目前Structured Streaming在不同的版本中倒退速度很快。值得注意的是,本文不会对Structured Streaming做过多解说,次要针对Spark Streaming进行探讨,包含以下内容: Spark Streaming介绍Transformations与Output OperationsSpark Streaming数据源(Sources)Spark Streaming 数据汇(Sinks)Spark Streaming介绍什么是DStreamSpark Streaming是构建在Spark Core的RDD根底之上的,与此同时Spark Streaming引入了一个新的概念:DStream(Discretized Stream,离散化数据流),示意连续不断的数据流。DStream形象是Spark Streaming的流解决模型,在外部实现上,Spark Streaming会对输出数据依照工夫距离(如1秒)分段,每一段数据转换为Spark中的RDD,这些分段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。如下图所示: 如上图,这些底层的RDD转换操作是由Spark引擎来实现的,DStream的操作屏蔽了许多底层的细节,为用户提供了比拟方便使用的高级API。 计算模型在Flink中,批处理是流解决的特例,所以Flink是人造的流解决引擎。而Spark Streaming则不然,Spark Streaming认为流解决是批处理的特例,即Spark Streaming并不是纯实时的流解决引擎,在其外部应用的是microBatch模型,行将流解决看做是在较小工夫距离内(batch interval)的一些列的批处理。对于工夫距离的设定,须要联合具体的业务提早需要,能够实现秒级或者分钟级的距离。 Spark Streaming会将每个短时间距离内接管的数据存储在集群中,而后对其作用一系列的算子操作(map,reduce, groupBy等)。执行过程见下图: 如上图:Spark Streaming会将输出的数据流宰割成一个个小的batch,每一个batch都代表这一些列的RDD,而后将这些batch存储在内存中。通过启动Spark作业来解决这些batch数据,从而实现一个流解决利用。 Spark Streaming的工作机制概览 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上每个Receiver都会负责一个input DStream(比方从文件中读取数据的文件流,比方套接字流,或者从Kafka中读取的一个输出流等等)Spark Streaming通过input DStream与内部数据源进行连贯,读取相干数据执行细节 1.启动StreamingContext2.StreamingContext启动receiver,该receiver会始终运行在Executor的task中。用于连续不断地接收数据源,有两种次要的reciver,一种是牢靠的reciver,当数据被接管并且存储到spark,发送回执确认,另一种是不牢靠的reciver,对于数据源不发送回执确认。接管的数据会被缓存到work节点内存中,也会被复制到其余executor的所在的节点内存中,用于容错解决。3.Streaming context周期触发job(依据batch-interval工夫距离)进行数据处理。4.将数据输入。Spark Streaming编程步骤通过下面的剖析,对Spark Streaming有了初步的意识。那么该如何编写一个Spark Streaming应用程序呢?一个Spark Streaming个别包含一下几个步骤: 1.创立StreamingContext2.创立输出DStream来定义输出源3.通过对DStream利用转换操作和输入操作来定义解决逻辑4.用streamingContext.start()来开始接收数据和解决流程5.streamingContext.awaitTermination()办法来期待解决完结 object StartSparkStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("Streaming") // 1.创立StreamingContext val ssc = new StreamingContext(conf, Seconds(5)) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) // 2.创立DStream val lines = ssc.socketTextStream("localhost", 9999) // 3.定义流计算解决逻辑 val count = lines.flatMap(_.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) // 4.输入后果 count.print() // 5.启动 ssc.start() // 6.期待执行 ssc.awaitTermination() } }Transformations与Output OperationsDStream是不可变的, 这意味着不能间接扭转它们的内容,而是通过对DStream进行一系列转换(Transformation)来实现预期的利用程序逻辑。 每次转换都会创立一个新的DStream,该DStream示意来自父DStream的转换后的数据。 DStream转换是惰性(lazy)的,这象征只有执行output操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation。 ...

August 8, 2020 · 7 min · jiezi

关于spark:第三篇Spark-SQL编程指南

在《第二篇|Spark Core编程指南》一文中,对Spark的外围模块进行了解说。本文将探讨Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的根底之上构建的,于2014年5月公布。从名称上能够看出,该模块是Spark提供的关系型操作API,实现了SQL-on-Spark的性能。对于一些相熟SQL的用户,能够间接应用SQL在Spark上进行简单的数据处理。通过本文,你能够理解到: Spark SQL简介DataFrame API&DataSet APICatalyst Optimizer优化器Spark SQL基本操作Spark SQL的数据源RDD与DataFrame互相转换Thrift server与Spark SQL CLISpark SQL简介Spark SQL是Spark的其中一个模块,用于结构化数据处理。与根本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了无关数据结构和正在执行的计算的更多信息,Spark SQL会应用这些额定的信息来执行额定的优化。应用SparkSQL的形式有很多种,包含SQL、DataFrame API以及Dataset API。值得注意的是,无论应用何种形式何种语言,其执行引擎都是雷同的。实现这种对立,意味着开发人员能够轻松地在不同的API之间来回切换,从而使数据处理更加地灵便。 DataFrame API&DataSet APIDataFrame APIDataFrame代表一个不可变的分布式数据汇合,其外围目标是让开发者面对数据处理时,只关怀要做什么,而不必关怀怎么去做,将一些优化的工作交由Spark框架自身去解决。DataFrame是具备Schema信息的,也就是说能够被看做具备字段名称和类型的数据,相似于关系型数据库中的表,然而底层做了很多的优化。创立了DataFrame之后,就能够应用SQL进行数据处理。 用户能够从多种数据源中结构DataFrame,例如:结构化数据文件,Hive中的表,内部数据库或现有RDD。DataFrame API反对Scala,Java,Python和R,在Scala和Java中,row类型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。 DataSet APIDataSet是Spark 1.6中增加的新接口,是DataFrame的扩大,它具备RDD的长处(强类型输出,反对弱小的lambda函数)以及Spark SQL的优化执行引擎的长处。能够通过JVM对象构建DataSet,而后应用函数转换(map,flatMap,filter)。值得注意的是,Dataset API在Scala和 Java中可用,Python不反对Dataset API。 另外,DataSet API能够缩小内存的应用,因为Spark框架晓得DataSet的数据结构,因而在长久化DataSet时能够节俭很多的内存空间。 Catalyst Optimizer优化器在Catalyst中,存在两种类型的打算: 逻辑打算(Logical Plan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑打算定义了一系列的用户代码所须要的属性(查问字段)和束缚(where条件),然而不定义该如何执行。具体如下图所示: 物理打算(Physical Plan):物理打算是从逻辑打算生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑打算中的JOIN会被转换为物理打算中的sort merge JOIN。须要留神,Spark会生成多个物理打算,而后抉择老本最低的物理打算。具体如下图所示: 在Spark SQL中,所有的算子操作会被转换成AST(abstract syntax tree,形象语法树),而后将其传递给Catalyst优化器。该优化器是在Scala的函数式编程根底会上构建的,Catalyst反对基于规定的(rule-based)和基于老本的(cost-based)优化策略。 Spark SQL的查问打算包含4个阶段(见下图): 1.剖析2.逻辑优化3.物理打算4.生成代码,将查问局部编译成Java字节码留神:在物理打算阶段,Catalyst会生成多个打算,并且会计算每个打算的老本,而后比拟这些打算的老本的大小,即基于老本的策略。在其余阶段,都是基于规定的的优化策略。 剖析Unresolved Logical plan --> Logical plan。Spark SQL的查问打算首先起始于由SQL解析器返回的AST,或者是由API构建的DataFrame对象。在这两种状况下,都会存在未解决的属性援用(某个查问字段可能不存在,或者数据类型谬误),比方查问语句:SELECT col FROM sales,对于字段col的类型,或者该字段是否是一个无效的字段,只有等到查看该sales表时才会分明。当不能确定一个属性字段的类型或者没可能与输出表进行匹配时,称之为未解决的。Spark SQL应用Catalyst的规定以及Catalog对象(可能拜访数据源的表信息)来解决这些属性。首先会构建一个Unresolved Logical Plan树,而后作用一系列的规定,最初生成Logical Plan。 ...

August 8, 2020 · 6 min · jiezi

关于spark:第二篇Spark-core编程指南

在《第一篇|Spark概览》一文中,对Spark的整体风貌进行了论述。本文将深刻探索Spark的外围组件--Spark core,Spark Core是Spark平台的根底通用执行引擎,所有其余性能均建设在该引擎之上。它不仅提供了内存计算性能来进步速度,而且还提供了通用的执行模型以反对各种应用程序,另外,用户能够应用Java,Scala和Python API开发应用程序。Spark core是建设在对立的形象RDD之上的,这使得Spark的各个组件能够随便集成,能够在同一个应用程序中应用不同的组件以实现简单的大数据处理工作。本文次要探讨的内容有: 什么是RDD RDD的设计初衷RDD的基本概念与次要特点宽依赖与窄依赖stage划分与作业调度RDD操作算子 TransformationsActions共享变量 播送变量累加器长久化综合案例什么是RDD设计初衷RDD(Resilient Distributed Datasets)的设计之初是为了解决目前存在的一些计算框架对于两类利用场景的解决效率不高的问题,这两类利用场景是迭代式算法和交互式数据挖掘。在这两种利用场景中,通过将数据保留在内存中,能够将性能进步到几个数量级。对于迭代式算法而言,比方PageRank、K-means聚类、逻辑回归等,常常须要重用两头后果。另一种利用场景是交互式数据挖掘,比方在同一份数据集上运行多个即席查问。大部分的计算框架(比方Hadoop),应用两头计算结果的形式是将其写入到一个内部存储设备(比方HDFS),这会减少额定的负载(数据复制、磁盘IO和序列化),由此会减少利用的执行工夫。 RDD能够无效地反对少数利用中的数据重用,它是一种容错的、并行的数据结构,能够让用户显性地将两头后果长久化到内存中,并且能够通过分区来优化数据的寄存,另外,RDD反对丰盛的算子操作,用户能够很容易地应用这些算子对RDD进行操作。 基本概念一个RDD是一个分布式对象汇合,其本质是一个只读的、分区的记录汇合。每个RDD能够分成多个分区,不同的分区保留在不同的集群节点上(具体如下图所示)。RDD是一种高度受限的共享内存模型,即RDD是只读的分区记录汇合,所以也就不能对其进行批改。只能通过两种形式创立RDD,一种是基于物理存储的数据创立RDD,另一种是通过在其余RDD上作用转换操作(transformation,比方map、filter、join等)失去新的RDD。 RDD不须要被物化,它通过血缘关系(lineage)来确定其是从RDD计算得来的。另外,用户能够管制RDD的长久化和分区,用户能够将须要被重用的RDD进行长久化操作(比方内存、或者磁盘)以进步计算效率。也能够依照记录的key将RDD的元素散布在不同的机器上,比方在对两个数据集进行JOIN操作时,能够确保以雷同的形式进行hash分区。 次要特点基于内存RDD是位于内存中的对象汇合。RDD能够存储在内存、磁盘或者内存加磁盘中,然而,Spark之所以速度快,是基于这样一个事实:数据存储在内存中,并且每个算子不会从磁盘上提取数据。 分区分区是对逻辑数据集划分成不同的独立局部,分区是分布式系统性能优化的一种技术手段,能够缩小网络流量传输,将雷同的key的元素散布在雷同的分区中能够缩小shuffle带来的影响。RDD被分成了多个分区,这些分区散布在集群中的不同节点。 强类型RDD中的数据是强类型的,当创立RDD的时候,所有的元素都是雷同的类型,该类型依赖于数据集的数据类型。 懒加载Spark的转换操作是懒加载模式,这就意味着只有在执行了action(比方count、collect等)操作之后,才会去执行一些列的算子操作。 不可批改RDD一旦被创立,就不能被批改。只能从一个RDD转换成另外一个RDD。 并行化RDD是能够被并行操作的,因为RDD是分区的,每个分区散布在不同的机器上,所以每个分区能够被并行操作。 长久化因为RDD是懒加载的,只有action操作才会导致RDD的转换操作被执行,进而创立出绝对应的RDD。对于一些被重复使用的RDD,能够对其进行长久化操作(比方将其保留在内存或磁盘中,Spark反对多种长久化策略),从而进步计算效率。 宽依赖和窄依赖RDD中不同的操作会使得不同RDD中的分区产不同的依赖,次要有两种依赖:宽依赖和窄依赖。宽依赖是指一个父RDD的一个分区对应一个子RDD的多个分区,窄依赖是指一个父RDD的分区对应与一个子RDD的分区,或者多个父RDD的分区对应一个子RDD分区。对于宽依赖与窄依赖,如下图所示: Stage划分窄依赖会被划分到同一个stage中,这样能够以管道的模式迭代执行。宽依赖所依赖的分区个别有多个,所以须要跨节点传输数据。从容灾方面看,两种依赖的计算结果复原的形式是不同的,窄依赖只须要复原父RDD失落的分区即可,而宽依赖则须要思考复原所有父RDD失落的分区。 DAGScheduler会将Job的RDD划分到不同的stage中,并构建一个stage的依赖关系,即DAG。这样划分的目标是既能够保障没有依赖关系的stage能够并行执行,又能够保障存在依赖关系的stage程序执行。stage次要分为两种类型,一种是ShuffleMapStage,另一种是ResultStage。其中ShuffleMapStage是属于上游的stage,而ResulStage属于最上游的stage,这意味着上游的stage先执行,最初执行ResultStage。 ShuffleMapStageShuffleMapStage是DAG调度流程的两头stage,它能够蕴含一个或者多个ShuffleMapTask,用与生成Shuffle的数据,ShuffleMapStage能够是ShuffleMapStage的前置stage,但肯定是ResultStage的前置stage。局部源码如下: private[spark] class ShuffleMapStage( id: Int, rdd: RDD[_], numTasks: Int, parents: List[Stage], firstJobId: Int, callSite: CallSite, val shuffleDep: ShuffleDependency[_, _, _], mapOutputTrackerMaster: MapOutputTrackerMaster) extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) { // 省略代码 }}ResultStageResultStage能够应用指定的函数对RDD中的分区进行计算并失去最终后果,ResultStage是最初执行的stage,比方打印数据到控制台,或者将数据写入到内部存储设备等。局部源码如下: private[spark] class ResultStage( id: Int, rdd: RDD[_], val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], parents: List[Stage], firstJobId: Int, callSite: CallSite) extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {// 省略代码}下面提到Spark通过剖析各个RDD的依赖关系生成DAG,通过各个RDD中的分区之间的依赖关系来决定如何划分stage。具体的思路是:在DAG中进行反向解析,遇到宽依赖就断开、遇到窄依赖就把以后的RDD退出到以后的stage中。行将窄依赖划分到同一个stage中,从而造成一个pipeline,晋升计算效率。所以一个DAG图能够划分为多个stage,每个stage都代表了一组关联的,相互之间没有shuffle依赖关系的工作组成的task汇合,每个task汇合会被提交到TaskScheduler进行调度解决,最终将工作散发到Executor中进行执行。 ...

August 8, 2020 · 6 min · jiezi

关于spark:第一篇Spark概览

Apache Spark最后在2009年诞生于美国加州大学伯克利分校的APM实验室,并于2010年开源,现在是Apache软件基金会下的顶级开源我的项目之一。Spark的指标是设计一种编程模型,可能疾速地进行数据分析。Spark提供了内存计算,缩小了IO开销。另外Spark是基于Scala编写的,提供了交互式的编程体验。通过10年的倒退,Spark成为了煊赫一时的大数据处理平台,目前最新的版本是Spark3.0。本文次要是对Spark进行一个总体概览式的介绍,后续内容会对具体的细节进行展开讨论。本文的次要内容包含: [x] Spark的关注度剖析[x] Spark的特点[x] Spark的一些重要概念[x] Spark组件概览[x] Spark运行架构概览[x] Spark编程初体验Spark的关注热度剖析详情下图展现了近1年内在国内对于Spark、Hadoop及Flink的搜寻趋势 近1年内寰球对于Spark、Hadoop及Flink的搜寻趋势,如下: 近1年国内对于Spark、Hadoop及Flink的搜寻热度区域散布状况(按Flink搜寻热度降序排列): 近1年寰球对于Spark、Hadoop及Flink的搜寻热度区域散布状况(按Flink搜寻热度降序排列): 剖析从下面的4幅图能够看出,近一年无论是在国内还是寰球,对于Spark的搜寻热度始终是比Hadoop和Flink要高。近年来Flink倒退迅猛,其在国内有阿里的背书,Flink人造的流解决特点使其成为了开发流式利用的首选框架。能够看出,尽管Flink在国内很火,然而放眼寰球,热度依然不迭Spark。所以学习并把握Spark技术依然是一个不错的抉择,技术有很多的相似性,如果你曾经把握了Spark,再去学习Flink的话,置信你会有种似曾相识的感觉。 Spark的特点速度快Apache Spark应用DAG调度程序、查问优化器和物理执行引擎,为批处理和流解决提供了高性能。 易于应用反对应用Java,Scala,Python,R和SQL疾速编写应用程序。Spark提供了80多个高级操作算子,可轻松构建并行应用程序。 通用性Spark提供了十分丰盛的生态栈,包含SQL查问、流式计算、机器学习和图计算等组件,这些组件能够无缝整合在一个利用中,通过一站部署,能够应答多种简单的计算场景 运行模式多样Spark能够应用Standalone模式运行,也能够运行在Hadoop,Apache Mesos,Kubernetes等环境中运行。并且能够拜访HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等多种数据源中的数据。 Spark的一些重要概念RDD弹性分布式数据集(Resilient Distributed Dataset),是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型 DAG有向无环图(Directed Acyclic Graph),反映RDD之间的依赖关系 Application用户编写的Spark程序,由 driver program 和 executors 组成 Application jar用户编写的应用程序JAR包Driver program用程序main()函数的过程,能够创立SparkContextCluster manager集群管理器,属于一个内部服务,用于资源申请调配(如:standalone manager, Mesos, YARN)Deploy mode部署模式,决定Driver过程在哪里运行。如果是cluster模式,会由框架自身在集群外部某台机器上启动Driver过程。如果是client模式,会在提交程序的机器上启动Driver过程 Worker node集群中运行应用程序的节点Executor运行在Worknode节点上的一个过程,负责运行具体的工作,并为应用程序存储数据 Task运行在executor中的工作单元Job一个job蕴含多个RDD及一些列的运行在RDD之上的算子操作,job须要通过action操作进行触发(比方save、collect等)Stage每一个作业会被分成由一些列task组成的stage,stage之间会相互依赖Spark组件概览Spark生态系统次要包含Spark Core、SparkSQL、SparkStreaming、MLlib和GraphX等组件,具体如下图所示: Spark CoreSpark core是Spark的外围,蕴含了Spark的基本功能,如内存计算、任务调度、部署模式、存储管理等。SparkCore提供了基于RDD的API是其余高级API的根底,次要性能是实现批处理。 Spark SQLSpark SQL次要是为了解决结构化和半结构化数据而设计的,SparkSQL容许用户在Spark程序中应用SQL、DataFrame和DataSetAPI查问结构化数据,反对Java、Scala、Python和R语言。因为DataFrame API提供了对立的拜访各种数据源的形式(包含Hive、Avro、Parquet、ORC和JDBC),用户能够通过雷同的形式连贯任何数据源。另外,Spark SQL能够应用hive的元数据,从而实现了与Hive的完满集成,用户能够将Hive的作业间接运行在Spark上。Spark SQL能够通过spark-sql的shell命令拜访。 SparkStreamingSparkStreaming是Spark很重要的一个模块,可实现实时数据流的可伸缩,高吞吐量,容错流解决。在外部,其工作形式是将实时输出的数据流拆分为一系列的micro batch,而后由Spark引擎进行解决。SparkStreaming反对多种数据源,如kafka、Flume和TCP套接字等 MLlibMLlib是Spark提供的一个机器学习库,用户能够应用Spark API构建一个机器学习利用,Spark尤其善于迭代计算,性能是Hadoop的100倍。该lib蕴含了常见机器学习算法,比方逻辑回归、反对向量机、分类、聚类、回归、随机森林、协同过滤、主成分剖析等。 GraphXGraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,GraphX性能良好,领有丰盛的性能和运算符,能在海量数据上自若地运行简单的图算法。GraphX内置了许多图算法,比方驰名的PageRank算法。 Spark运行架构概览从整体来看,Spark利用架构包含以下几个次要局部: Driver programMaster nodeWork nodeExecutorTasksSparkContext在Standalone模式下,运行架构如下图所示: ...

August 8, 2020 · 2 min · jiezi

关于spark:Spark-StreamingSpark第一代实时计算引擎

尽管SparkStreaming曾经进行更新,Spark的重点也放到了 Structured Streaming ,但因为Spark版本过低或者其余技术选型问题,可能还是会抉择SparkStreaming。SparkStreaming对于工夫窗口,事件工夫尽管撑持较少,但还是能够满足局部的实时计算场景的,SparkStreaming材料较多,这里也做一个简略介绍。 一. 什么是Spark Streaming Spark Streaming在过后是为了与过后的Apache Storm竞争,也让Spark能够用于流式数据的解决。依据其官网文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming反对的数据输出源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简略的TCP套接字等等。数据输出后能够用Spark的高度形象原语如:map、reduce、join、window等进行运算。而后果也能保留在很多中央,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完满交融。当然Storm目前曾经慢慢淡出,Flink开始大放异彩。 Spark与Storm的比照 二、SparkStreaming入门Spark Streaming 是 Spark Core API 的扩大,它反对弹性的,高吞吐的,容错的实时数据流的解决。数据能够通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也能够通过例如 map,reduce,join,window 等的高级函数组成的简单算法解决。最终,解决后的数据能够输入到文件系统,数据库以及实时仪表盘中。事实上,你还能够在 data streams(数据流)上应用 [机器学习] 以及 [图计算] 算法。在外部,它工作原理如下,Spark Streaming 接管实时输出数据流并将数据切分成多个 batch(批)数据,而后由 Spark 引擎解决它们以生成最终的 stream of results in batches(分批流后果)。 Spark Streaming 提供了一个名为 discretized stream 或 DStream 的高级形象,它代表一个间断的数据流。DStream 能够从数据源的输出数据流创立,例如 Kafka,Flume 以及 Kinesis,或者在其余 DStream 上进行高层次的操作以创立。在外部,一个 DStream 是通过一系列的 [RDDs] 来示意。 本指南通知你如何应用 DStream 来编写一个 Spark Streaming 程序。你能够应用 Scala,Java 或者 Python(Spark 1.2 版本后引进)来编写 Spark Streaming 程序。 ...

August 6, 2020 · 5 min · jiezi

关于spark:用Spark进行实时流计算

Spark Streaming VS Structured StreamingSpark Streaming是Spark最后的流解决框架,应用了微批的模式来进行流解决。 提供了基于RDDs的Dstream API,每个工夫距离内的数据为一个RDD,源源不断对RDD进行解决来实现流计算 Apache Spark 在 2016 年的时候启动了 Structured Streaming 我的项目,一个基于 Spark SQL 的全新流计算引擎 Structured Streaming,让用户像编写批处理程序一样简略地编写高性能的流处理程序。 Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是试验版本,从Spark2.2开始为稳固版本) 从Spark-2.X版本后,Spark Streaming就进入保护模式,看见Spark曾经将大部分精力投入到了全新的Structured Streaming中,而一些新个性也只有Structured Streaming才有,这样Spark才有了与Flink一战的能力。 1、Spark Streaming 有余Processing Time 而不是 Event Time首先解释一下,Processing Time 是数据达到 Spark 被解决的工夫,而 Event Time 是数据自带的属性,个别示意数据产生于数据源的工夫。比方 IoT 中,传感器在 12:00:00 产生一条数据,而后在 12:00:05 数据传送到 Spark,那么 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。咱们晓得 Spark Streaming 是基于 DStream 模型的 micro-batch 模式,简略来说就是将一个渺小时间段,比如说 1s,的流数据以后批数据来解决。如果咱们要统计某个时间段的一些数据统计,毫无疑问应该应用 Event Time,然而因为 Spark Streaming 的数据切割是基于 Processing Time,这样就导致应用 Event Time 特地的艰难。 ...

August 4, 2020 · 2 min · jiezi

关于spark:isEmpty类型的action算子需要cache吗

有的时候,须要判断rdd.isEmpty(),以决定是否须要后续操作。而这个isEmpty办法是个action算子。也就是说如果rdd不为空,须要做后续操作的话,那么这个rdd的创立过程可能就执行了两遍。那么rdd须要cache吗? 进入isEmpty办法 def isEmpty(): Boolean = withScope { partitions.length == 0 || take(1).length == 0 }如果这个rdd是从kafka读出来的,那么partitions.length == 0这个判断就为false,会进入take(num = 1)办法, def take(num: Int): Array[T] = withScope { // 扫描范畴扩充因子 val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { new Array[T](0) } else { // 保留take的后果 val buf = new ArrayBuffer[T] // 总共的分区个数 val totalParts = this.partitions.length // 浏览过的分区个数 var partsScanned = 0 // 后果中的记录数小于take须要的num,并且浏览过的分区数小于总分区数 while (buf.size < num && partsScanned < totalParts) { // 应该浏览的分区个数 // 最开始为1,也就是先尝试从第0个分区取记录,如果一个这个分区的记录数不够,再浏览其余分区 var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // 进入到这个判断里阐明不是第一次循环,上次浏览的分区取出来的记录数量还不够num,这时就须要扩充应该本次应该浏览的分区数了 if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // 曾经浏览过的分区个数 * (残余要拜访的记录数与曾经拜访过的记录数的比值),再扩充50%,得出还须要浏览的分区个数 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } // p是应该浏览的分区索引数组,表明哪些分区应该被浏览 val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) // 按指定的分区执行“小规模”job // 这里it.take(left)会让各分区的迭代器只迭代以后buf所须要的记录数。依据迭代器模式,可知这里并不会遍历整个分区的数据再从中拿出left条记录 val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) // 将job的后果塞进buf中 // 这里应用_.take(num - buf.size),保障buf的记录数量不会超过num res.foreach(buf ++= _.take(num - buf.size)) partsScanned += p.size } buf.toArray从源码中可见,如果take的num不超过第0个分区里的记录数,那么会产生一次“小规模job”,总共拜访过的记录数=num;如果超过了,就会再在更大的范畴(更多分区中)查找更少的残余须要take进去的记录数,从而产生一个“中等规模job”,可能使总共拜访过的记录数>num; ...

August 1, 2020 · 1 min · jiezi

关于spark:isEmpty类型的action算子需要cache吗

有的时候,须要判断rdd.isEmpty(),以决定是否须要后续操作。而这个isEmpty办法是个action算子。也就是说如果rdd不为空,须要做后续操作的话,那么这个rdd的创立过程可能就执行了两遍。那么rdd须要cache吗? 进入isEmpty办法 def isEmpty(): Boolean = withScope { partitions.length == 0 || take(1).length == 0 }如果这个rdd是从kafka读出来的,那么partitions.length == 0这个判断就为false,会进入take(num = 1)办法, def take(num: Int): Array[T] = withScope { // 扫描范畴扩充因子 val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { new Array[T](0) } else { // 保留take的后果 val buf = new ArrayBuffer[T] // 总共的分区个数 val totalParts = this.partitions.length // 浏览过的分区个数 var partsScanned = 0 // 后果中的记录数小于take须要的num,并且浏览过的分区数小于总分区数 while (buf.size < num && partsScanned < totalParts) { // 应该浏览的分区个数 // 最开始为1,也就是先尝试从第0个分区取记录,如果一个这个分区的记录数不够,再浏览其余分区 var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // 进入到这个判断里阐明不是第一次循环,上次浏览的分区取出来的记录数量还不够num,这时就须要扩充应该本次应该浏览的分区数了 if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // 曾经浏览过的分区个数 * (残余要拜访的记录数与曾经拜访过的记录数的比值),再扩充50%,得出还须要浏览的分区个数 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } // p是应该浏览的分区索引数组,表明哪些分区应该被浏览 val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) // 按指定的分区执行“小规模”job // 这里it.take(left)会让各分区的迭代器只迭代以后buf所须要的记录数。依据迭代器模式,可知这里并不会遍历整个分区的数据再从中拿出left条记录 val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) // 将job的后果塞进buf中 // 这里应用_.take(num - buf.size),保障buf的记录数量不会超过num res.foreach(buf ++= _.take(num - buf.size)) partsScanned += p.size } buf.toArray从源码中可见,如果take的num不超过第0个分区里的记录数,那么会产生一次“小规模job”,总共拜访过的记录数=num;如果超过了,就会再在更大的范畴(更多分区中)查找更少的残余须要take进去的记录数,从而产生一个“中等规模job”,可能使总共拜访过的记录数>num; ...

August 1, 2020 · 1 min · jiezi

SparkSpark优化笔记

该笔记来自 美团 spark性能优化指南 开发准则Broadcast与map联合应用 代替原始 join小表 broadcast 到每个excutor, 各个excutor本地间接调用小表,防止shuffle // 传统的join操作会导致shuffle操作。 // 因为两个RDD中,雷同的key都须要通过网络拉取到一个节点上,由一个task进行join操作。 val rdd3 = rdd1.join(rdd2) // Broadcast+map的join操作,不会导致shuffle操作。 // 应用Broadcast将一个数据量较小的RDD作为播送变量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,能够从rdd2DataBroadcast中,获取rdd2的所有数据。 // 而后进行遍历,如果发现rdd2中某条数据的key与rdd1的以后数据的key是雷同的,那么就断定能够进行join。// 此时就能够依据本人须要的形式,将rdd1以后数据与rdd2中能够连贯的数据,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 留神,以上操作,倡议仅仅在rdd2的数据量比拟少(比方几百M,或者一两G)的状况下应用。// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。尽量应用map-side预聚合的算子所谓的map-side预聚合,说的是在每个节点本地对雷同的key进行一次聚合操作,相似于MapReduce中的本地combiner。如应用reduceByKey或aggregateByKey 代替groupByKey 应用foreachPartitions代替foreach一次函数调用解决一个partition的所有数据,而不是一次函数调用解决一条数据。如在foreach函数中,将RDD中所有数据写MySQL,那么如果是一般的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创立一个数据库连贯,此时就势必会频繁地创立和销毁数据库连贯,性能是十分低下;然而如果用foreachPartitions算子一次性解决一个partition的数据,那么对于每个partition,只有创立一个数据库连贯即可,而后执行批量插入操作,此时性能是比拟高的。实际中发现,对于1万条左右的数据量写MySQL,性能能够晋升30%以上 filter后应用coalesce 缩小分区通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比方30%以上的数据),倡议应用coalesce算子,手动缩小RDD的partition数量,将RDD中的数据压缩到更少的partition中去 应用repartitionAndSortWithinPartitions代替repartition与sort类操作repartitionAndSortWithinPartitions是Spark官网举荐的一个算子,官网倡议,如果须要在repartition重分区之后,还要进行排序,倡议间接应用repartitionAndSortWithinPartitions算子。因为该算子能够一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 应用Kryo优化序列化性能Spark中,次要有三个中央波及到了序列化: 在算子函数中应用到内部变量时,该变量会被序列化后进行网络传输(见“准则七:播送大变量”中的解说)。将自定义的类型作为RDD的泛型类型时(比方JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因而这种状况下,也要求自定义的类必须实现Serializable接口。应用可序列化的长久化策略时(比方MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。Spark同时反对应用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官网介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Kryo要求最好要注册所有须要进行序列化的自定义类型,因而对于开发者来说,这种形式比拟麻烦。 // 创立SparkConf对象。 val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))数据结构尽量应用字符串代替对象;应用原始类型(比方Int、Long)代替字符串;应用数组代替汇合类型;这样尽可能地缩小内存占用,从而升高GC频率,晋升性能。 调优配置num-executors参数阐明:该参数用于设置Spark作业总共要用多少个Executor过程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能依照你的设置来在集群的各个工作节点上,启动相应数量的Executor过程。这个参数十分之重要,如果不设置的话,默认只会给你启动大量的Executor过程,此时你的Spark作业的运行速度是十分慢的。参数调优倡议:每个Spark作业的运行个别设置50~100个左右的Executor过程比拟适合,设置太少或太多的Executor过程都不好。设置的太少,无奈充分利用集群资源;设置的太多的话,大部分队列可能无奈给予充沛的资源。executor-memory参数阐明:该参数用于设置每个Executor过程的内存。Executor内存的大小,很多时候间接决定了Spark作业的性能,而且跟常见的JVM OOM异样,也有间接的关联。参数调优倡议:每个Executor过程的内存设置4G~8G较为适合。然而这只是一个参考值,具体的设置还是得依据不同部门的资源队列来定。能够看看本人团队的资源队列的最大内存限度是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,防止你本人的Spark作业占用了队列所有的资源,导致别的同学的作业无奈运行。executor-cores参数阐明:该参数用于设置每个Executor过程的CPU core数量。这个参数决定了每个Executor过程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因而每个Executor过程的CPU core数量越多,越可能疾速地执行完调配给本人的所有task线程。参数调优倡议:Executor的CPU core数量设置为2~4个较为适合。同样得依据不同部门的资源队列来定,能够看看本人的资源队列的最大CPU core限度是多少,再根据设置的Executor数量,来决定每个Executor过程能够调配到几个CPU core。同样倡议,如果是跟别人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比拟适合,也是防止影响其他同学的作业运行。driver-memory参数阐明:该参数用于设置Driver过程的内存。参数调优倡议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。惟一须要留神的一点是,如果须要应用collect算子将RDD的数据全副拉取到Driver上进行解决,那么必须确保Driver的内存足够大,否则会呈现OOM内存溢出的问题。spark.default.parallelism参数阐明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会间接影响你的Spark作业性能。参数调优倡议:Spark作业的默认task数量为500~1000个较为适合。很多同学常犯的一个谬误就是不去设置这个参数,那么此时就会导致Spark本人依据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比方就几十个task),如果task数量偏少的话,就会导致你后面设置好的Executor的参数都半途而废。试想一下,无论你的Executor过程有多少个,内存和CPU有多大,然而task只有1个或者10个,那么90%的Executor过程可能基本就没有task执行,也就是白白浪费了资源!因而Spark官网倡议的设置准则是,设置该参数为num-executors * executor-cores的2~3倍较为适合,比方Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充沛地利用Spark集群的资源。spark.storage.memoryFraction参数阐明:该参数用于设置RDD长久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,能够用来保留长久化的RDD数据。依据你抉择的不同的长久化策略,如果内存不够时,可能数据就不会长久化,或者数据会写入磁盘。参数调优倡议:如果Spark作业中,有较多的RDD长久化操作,该参数的值能够适当进步一些,保障长久化的数据可能包容在内存中。防止内存不够缓存所有的数据,导致数据只能写入磁盘中,升高了性能。然而如果Spark作业中的shuffle类操作比拟多,而长久化操作比拟少,那么这个参数的值适当升高一些比拟适合。此外,如果发现作业因为频繁的gc导致运行迟缓(通过spark web ui能够察看到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样倡议调低这个参数的值。spark.shuffle.memoryFraction参数阐明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输入后,进行聚合操作时可能应用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现应用的内存超出了这个20%的限度,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地升高性能。参数调优倡议:如果Spark作业中的RDD长久化操作较少,shuffle操作较多时,倡议升高长久化操作的内存占比,进步shuffle操作的内存占比比例,防止shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,升高了性能。此外,如果发现作业因为频繁的gc导致运行迟缓,意味着task执行用户代码的内存不够用,那么同样倡议调低这个参数的值。spark.sql.shuffle.partitions能够缓解数据歪斜 ...

July 15, 2020 · 1 min · jiezi

spark-executor-被yarn杀掉的问题

spark的任务,在运行期间executor总是挂掉。刚开始觉得是数据量太大executor内存不够。但是估算了数据量,觉得不应该出现内存不够。于是,首先尝试通过jvisualvm观察executor的内存分布: 老年代还没填满,进程就会出现挂掉的情况,所以并不是jvm级别的OOM。仔细检查对应NodeManager的日志,发现如下日志: WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=1151,containerID=container_1578970174552_5615_01_000003] is running beyond physical memory limits. Current usage: 4.3 GB of 4 GB physical memory used; 7.8 GB of 8.4 GB virtual memory used. Killing container.Dump of the process-tree for container_1578970174552_5615_01_000003 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 1585 1175 1585 1151 (python) 50 67 567230464 8448 python -m pyspark.daemon |- 1596 1585 1585 1151 (python) 1006 81 1920327680 303705 python -m pyspark.daemon |- 1175 1151 1151 1151 (java) ... |- 1151 1146 1151 1151 (bash) ...INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 1152 for container-id container_1578970174552_5615_01_000004: 4.3 GB of 4 GB physical memory used; 7.8 GB of 8.4 GB virtual memory used日志说,某个container的进程占用物理内存超过的阈值,yarn将其kill掉了。并且这个内存的统计是基于Process Tree的,我们的spark任务会启动python进程,并将数据通过pyspark传输给python进程,换句话说数据即存在jvm,也存在python进程,如果按照进程树统计,意味着会重复至少两倍。很容易超过“阈值”。 ...

June 28, 2020 · 1 min · jiezi

Spark-30发布啦改进SQL弃Python-2更好的兼容ANSI-SQL性能大幅提升

Apache Spark 3.0.0正式发布啦,Apache Spark 3.0是在Spark 2.x的基础上开发的,带来了新的想法和功能。 Spark是一个开源的大数据处理、数据科学、机器学习和数据分析工作负载的统一引擎,自2010年首次发布以来,已经成长为最活跃的开源项目之一;支持Java、Scala、Python、R等语言,并为这些语言提供了相关的SDK。 Spark 3.0中的Spark SQL是这个版本中最活跃的组件,46%的已解决的问题都是是针对Spark SQL的,包括结构化流和MLlib,以及高层API,包括SQL和DataFrames。在经过了大量优化后,Spark 3.0的性能比Spark 2.4快了大约2倍。 Python是目前Spark上使用最广泛的语言;针对Python语言提供的PySpark在PyPI上的月下载量超过500万。在Spark 3.0中,对PySpark的功能和可用性做了不少改进,包括用Python类型提示重新设计pandas UDF API,新的pandas UDF类型,以及更多的Pythonic错误处理。 以下便是Spark 3.0中的功能亮点:包括自适应查询执行,动态分区修剪,ANSI SQL合规性,pandas API的重大改进,结构化流的新UI,调用R用户定义函数的速度提高了40倍,加速器感知的调度器,以及SQL参考文档。 把这些功能按照模块来划分就可以分为以下几个模块: core、Spark SQL、Structured StreamingMLlibSparkRGraphX放弃Python 2和R 3.4以下的版的支持;修复一些已知的问题;core、Spark SQL、Structured Streaming突出功能 加速器感知调度器;自适应查询;动态分区修剪;重新设计的pandas UDF API与类型提示;结构化流用户界面;目录插件API的支持;支持Java 11;支持Hadoop 3;能够更好的兼容ANSI SQL;性能提升 自适应查询;动态分区修剪;优化9项规则;最小化表缓存同步性能优化;将聚合代码分割成小函数;在INSERT和ALTER TABLE ADD PARTITION命令中增加批处理;允许聚合器注册为UDAF;SQL兼容性增强 使用Proleptic Gregorian日历;建立Spark自己的日期时间模式定义;为表插入引入ANSI存储分配策略;在表插入中默认遵循ANSI存储分配规则;添加一个SQLConf: spark.sql.ansi.enabled,用于开启ANSI模式;支持聚合表达式的ANSI SQL过滤子句;支持ANSI SQL OVERLAY功能;支持ANSI嵌套方括号内的注释;超出整数范围时抛出异常;区间算术运算的溢出检查;当无效字符串被转换为数字类型时,抛出异常;使用区间乘法和除法的溢出行为与其他操作一致;为char和decimal添加ANSI类型的别名;SQL解析器定义了ANSI兼容的保留关键字;当ANSI模式开启时,禁止使用保留关键字作为标识符;支持ANSI SQL.LIKE...ESCAPE语法;支持ANSI SQL布尔-谓词语法;PySpark增强版 重新设计的pandas UDFs,并提供类型提示;允许Pandas UDF采用pd.DataFrames的迭代器;支持StructType作为Scalar Pandas UDF的参数和返回类型;通过Pandas UDFs支持Dataframe Cogroup;增加mapInPandas,允许DataFrames的迭代器;部分SQL函数也应该取数据列名;让PySpark的SQL异常更加Pythonic化;扩展性增强 目录插件;数据源V2 API重构;Hive 3.0和3.1的版本的元存储支持;将Spark插件接口扩展到驱动程序;可通过自定义指标来扩展Spark指标系统;为用于扩展列式处理支持提供了开发者API;使用DSV2的内置源迁移:parquet, ORC, CSV, JSON, Kafka, Text, Avro;允许在SparkExtensions中注入函数;连接器增强 ...

June 21, 2020 · 1 min · jiezi

一文详解Spark-Shuffle

前言Spark Shuffle是大众讨论的比较多的话题了。它是Spark任务执行过程中最为重要的过程之一。那么什么是Shuffle呢? Shuffle一般被翻译成数据混洗,是类MapReduce分布式计算框架独有的机制,也是这类分布式计算框架最重要的执行机制。接下来会按照两个层面来谈谈Shuffle机制。分别为: 逻辑层面物理层面逻辑层面主要是从RDD的血缘出发,从DAG的角度来讲解Shuffle,另外也会说明Spark容错机制。物理层面是从执行角度来剖析Shuffle是如何发生的1.RDD血缘与Spark容错从血缘角度出发就需先了解DAG,DAG被称之为有向无环图。在DAG中,最初的RDD被成为基础RDD,在基础RDD之上使用算子的过程中后续生成RDD被成为一个个子RDD,它们之间存在依赖关系。无论哪个RDD出现问题,都可以由这种依赖关系重新计算而成。这种依赖关系就被成为RDD血缘。血缘的表现方式主要分为宽依赖与窄依赖 1.1窄依赖与宽依赖窄依赖的标准定义是:子RDD中的分区与父RDD中的分区只存在一对一的映射关系。宽依赖的标准定义是:子RDD中分区与父RDD中分区存在一对多的映射关系。 从实际算子来说,map,filter,union等就是窄依赖,而groupByKey,reduceByKey就是典型的宽依赖。宽依赖还有个名字,叫shuffle依赖,也就是说宽依赖必然会发生在shuffle操作,shuffle也是划分stage的重要依据。而窄依赖由于不需要发生shuffle,所有计算都是在分区所在节点完成,类似于MR中的ChainMapper。所以说,在如果在程序中选取的算子形成了宽依赖,那么就必然会触发shuffle。 所以当RDD在shuffle过程中某个分区出现了故障,只需要找到当前对应的Stage,而这个Stage必然是某个shuffle算子所进行划分的,找到了这个算子,就离定位错误原因越来越近了。 如上图所示,如果P1_0分区发生故障,那么按照依赖关系,则需要P0_0与P0_1的分区重算,P0_0与P0_1没有持久化,就会不断回溯,直到找到存在的父分区为止。至于为什么要持久化,原因就是当计算逻辑复杂时,就会引发依赖链过长,如果其中的某个RDD发生了问题。若没有进行持久化,Spark则会根据RDD血缘关系进行重头开始计算。重算显然对我们是代价极高的,所以用户可以在计算过程中,适当的调用RDD的checkpoint方法,保存好当前算好的中间结果,这样依赖关系链就会大大的缩短。因为checkpoint其实是会切断血缘的。这就是RDD的血缘机制即RDD的容错机制。而Spark的容错机制则是主要分为资源管理平台的容错和Spark应用的容错。 1.2 Spark的容错机制Spark的应用是基于资源管理平台运行的,所以资源管理平台的容错也是Spark容错的一部分,如Yarn的ResourceManager HA机制。在Spark应用执行的过程中,可能会遇到以下几种失败情况: Driver出错Executor出错Task出错Dirver执行失败是Spark应用最严重的一种情况,因为它标记着整个作业的执行失败,需要开发人员手动重启Driver。而Executor报错通常是所在的Worker出错,这时Driver就会将执行失败的Task调度到另一个Executor继续执行,重新执行的Task会根据RDD的依赖关系继续计算,并将报错的Executor从可用的Executor列表中移除。Spark会对执行失败的Task进行重试,重试3次后若依然出错,则整个作业就会失败。而在这个过程中,数据恢复和重试都依赖于RDD血缘机制。2.Spark Shuffle很多算子都会引起RDD中的数据进行重分区,新的分区被创建,旧的分区被合并或者打碎,在重分区过程中,如果数据发生了跨节点移动,就被称为Shuffle。Spark对Shuffle的实现方式有两种:Hash Shuffle与Sort-based Shuffle,这其实是一个优化的过程。在较老的版本中,Spark Shuffle的方式可以通过spark.shuffle.manager配置项进行配置,而在最新的版本中,已经移除了该配置项,统一称为Sort-based Shuffle。 2.1 Hash Shuffle在Spark 1.6.3之前,Hash Shuffle都是Spark Shuffle的解决方案之一。Shuffle的过程一般分为两个部分:Shuffle Write和Shuffle Fetch,前者是Map任务划分分区,输出中间结果,而后者则是Reduce任务获取到的这些中间结果。Hash Shuffle的过程如图下所示: 图中,Shuffle Write发生在一个节点上,执行shuffle任务的CPU核数为1,可以同时执行两个任务,每个任务输出的分区数与Reducer数相同,即为3。每个分区都有一个缓冲区(bucket)用来接收结果,每个缓冲区的大小由配置spark.shuffle.file.buffer.kb决定。这样每个缓冲区写满后,就会输出到一个文件段中。而Reducer就会去相应的节点拉取文件。这样设计起来其实是不复杂的。但问题也很明显,主要有两个: 生成的文件个数太大。理论上,每个Shuffle任务输出会产生R个文件(由Reduce个数决定),而Shuffle任务的个数往往是由Map任务个数M决定的,所以总共会生成M * R个中间结果文件,而在大型作业中,若是M和R都是很大的数字的话,就会出现文件句柄数突破操作系统的限制。缓冲区占用内存空间过大。 单节点在执行Shuffle任务时缓存区大小消耗(spark.shuffle.file.buffer.kb) × m × R , m为该节点运行的shuffle个数,如果一个核可以执行一个任务,那么m就与cpu核数相等。这对于有32,64核的服务器来说都是不小的内存开销。所有为了解决第一个问题,Spark引入了Flie Consolidation机制,指通过共同输出文件以降低文件数,如下图所示: 每当Shuffle输出时,同一个CPU核心处理的Map任务的中间结果会输出到同分区的一个文件中,然后Reducer只需要一次性将整个文件拿到即可。这样,Shuffle产生的文件数为C(CPU核数)* R。Spark的FileConsolidation机制默认开启,可以通过spark.shuffle.consolidateFiles配置项进行配置。2.2 Sort-based Shuffle即便是引入了FlieConsolidation后,还是无法根本解决中间文件数太大的问题,这时候Sort-based Shuffle才算是真正的引入进来。如图所示: 每个Map任务会最后只输出两个文件(其中一个是索引文件),其中间过程采用MapReduce一样的归并排序,但是会用索引文件记录每个分区的偏移量,输出完成后,Reducer会根据索引文件得到属于自己的分区,这种情况下,shuffle产生的中间结果文件为2 * M(M为Map任务数)。在基于排序的 Shuffle 中, Spark 还提供了一种折中方案——Bypass Sort-based Shuffle,当 Reduce 任务小于 spark.shuffle.sort.bypassMergeThreshold 配置(默认 200)时,Spark Shuffle 开始按照 Hash Shuffle 的方式处理数据,而不用进行归并排序,只是在 Shuffle Write 步骤的最后,将其合并为 1 个文件,并生成索引文件。这样实际上还是会生成大量的中间文件,只是最后合并为 1 个文件并省去排序所带来的开销,该方案的准确说法是 Hash Shuffle 的Shuffle Fetch 优化版。

June 5, 2020 · 1 min · jiezi

Spark源码学习内置RPC框架3

RPC客户端工厂TransportClientFactoryTransportClientFactory是创建TransportClient的工厂类。TransportContext的createClientFactory方法可以创建TransportClientFactory的实例 /** * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning * a new Client. Bootstraps will be executed synchronously, and must run successfully in order * to create a Client. */ public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) { return new TransportClientFactory(this, bootstraps); } public TransportClientFactory createClientFactory() { return createClientFactory(Lists.<TransportClientBootstrap>newArrayList()); }可以看到,TransportContext中有两个重载的createClientFactory方法,它们最终在构造TransportClientFactory时都会传递两个参数:TransportContext和TransportClientBootstrap列表。TransportClientFactory构造器的实现如代码所示。 public TransportClientFactory( TransportContext context, List<TransportClientBootstrap> clientBootstraps) { this.context = Preconditions.checkNotNull(context); this.conf = context.getConf(); this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); this.connectionPool = new ConcurrentHashMap<>(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); this.rand = new Random(); IOMode ioMode = IOMode.valueOf(conf.ioMode()); this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); // TODO: Make thread pool name configurable. this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); }TransportClientFactory构造器中的各个变量如下: ...

May 25, 2020 · 5 min · jiezi