《Spark的RDD原理以及2.0特性的介绍》要点:
本文介绍了Spark的RDD原理以及2.0特性的介绍,希望对您有用。如果有疑问,可以联系我们。
Spark 是 Apache 顶级项目里面最火的大数据处理的计算引擎,它目前是负责大数据计算的工作.包括离线计算或交互式查询、数据挖掘算法、流式计算以及图计算等.全世界有许多公司和组织使用或给社区贡献代码,社区的活跃度见 www.github.com/apache/spark.
2013 年开始 Spark开发团队成立 Databricks,来对 Spark 进行运作和管理,并提供 Cloud 服务.Spark 社区基本保持一个季度一个版本,不出意外的话 Spark 2.0 将在五月底发布.
与 Mapreduce 相比,Spark 具备 DAG 执行引擎以及基于内存的多轮迭代计算等优势,在SQL 层面上,比 Hive/Pig 相比,引入关系数据库的许多特性,以及内存管理技术.另外在 Spark 上所有的计算模型最终都统一基于 RDD 之上运行执行,包括流式和离线计算.Spark 基于磁盘的性能是 MR 的 10 倍,基于内存的性能是 MR 的 100 倍 ❶ (见文后参考阅读❶ ,下同) .
Spark 提供 SQL、机器学习库 MLlib、流计算 Streaming 和图计算 Graphx,同时也支持 Scala、Java、Python 和 R 语言开发的基于 API 的应用程序.
RDD,英文全称叫 Resilient Distributed Datasets.
an RDD is a read-only, partitioned collection of records❸. 字面意思是只读的分布式数据集.
但其实个人觉得可以把 RDD 理解为关系数据库 里的一个个操作,比如 map,filter,Join 等.在 Spark 里面实现了许多这样的 RDD 类,即可以看成是操作类.当我们调用一个 map 接口,底层实现是会生成一个 MapPartitionsRDD 对象,当 RDD 真正执行时,会调用 MapPartitionsRDD 对象里面的 compute 方法来执行这个操作的计算逻辑.但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 这种 action 动作被调用后再会去触发 runJob 动作.
RDD 分为二类:transformation 和 action.
val file = sc.textFile(args(0))
val words = file.flatMap(line => line.split(” “))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))
这段代码生成的 RDD 的执行树是如下图所示:
最终在 saveAsTextFile 方法时才会将整个 RDD 的执行图提交给 DAG 执行引擎,根据相关信息切分成一个一个 Stage,每个 Stage 去执行多个 task,最终完成整个 Job 的执行.
还有一个区别就是,RDD 计算后的中间结果是可以被持久化,当下一次需要使用时,可以直接使用之前持久化好的结果,而不是重新计算,并且这些结果被存储在各个结点的 executor 上.下一次使用时,调度器可以直接把 task 分发到存储持久化数据的结点上,减少数据的网络传输开稍.这种场景在数据挖掘迭代计算是经常出现.如下代码
val links = spark.textFile(…).map(…).persist() var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size)) }
// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum) }
以上代码生成的 RDD 执行树如下图所示:
计算 contribs-0 时需要使用 links 的计算逻辑,当 links 每个分片计算完后,会将这个结果保存到本地内存或磁盘上,下一次 contribs-1 计算要使用 links 的数据时,直接从上一次保存的内存和磁盘上读取就可以了.这个持久化系统叫做 blockManager,类似于在内部再构建了一个 KV 系统,K 表示每个分区 ID 号,V 表示这个分区计算后的结果.
另外在 streaming 计算时,每个 batch 会去消息队列上拉取这个时间段的数据,每个 Recevier 接收过来数据形成 block 块并存放到 blockManager 上,为了可靠性,这个 block 块可以远程备份,后续的 batch 计算就直接在之前已读取的 block 块上进行计算,这样不断循环迭代来完成流处理.
一个 RDD 一般会有以下四个函数组成.
定义为:
def compute(split: Partition, context: TaskContext): Iterator[T]
如在 MapPartitionsRDD 里的实现是如下:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
函数定义
f: (TaskContext, Int, Iterator[T]) => Iterator[U]
protected def getPartitions: Array[Partition]
即这个操作的数据划分为多少个分 区.跟 mapreduce 里的 map 上的 split 类似的.
protected def getDependencies: Seq[Dependency[_]]
依赖分二种:如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若依赖于多个 Child RDD 分区,则称之为 wide dependency.不同的操作根据其特性,可能会产生不同的依赖 ❹.如下图所示
map 操作前后二个 RDD 操作之间的分区是一对一的关系,故产生 narrow dependency,而 join 操作的分区分别对应于它的二个子操作相对应的分区,故产生 wide dependency.当最后要生成具体的 task 运行时,就需要利用这个依赖关系也生成 Stage 的 DAG 图.
4. 获取该操作对应数据的存放位置信息,主要是针对 HDFS 这类有数据源的 RDD.
protected def getPreferredLocations(split: Partition): Seq[String]
Spark 的执行模式有 local、Yarn、Standalone、Mesos 四类.后面三个分别有 cluster 和 client 二种.client 和 cluster 的区别就是指 Driver 是在程序提交客户端还是在集群的 AM 上. 比如常见的 Yarn-cluster 模式如下图所示:
一般来说,运行简单测试或 UT 用的是 local 模式运行,其实就是用多线程模似分布式执行. 如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署.
当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos.
Unifying DataFrames and Datasets in Scala/Java
DataFrame❺ 和 Dataset❽ 的功能是什么?
它们都是提供给用户使用,包括各类操作接口的 API.1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是将二者统一,即保留 Dataset,而把 DataFrame 定义为 Dataset[Row],即是 Dataset 里的元素对象为 Row 的一种(SPARK-13485).
在参考资料❺ 中有介绍 DataFrame,它就是提供了一系列操作 API,与 RDD API 相比较,DataFrame 里操作的数据都是带有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 带来的性能提升,比如 code generation 以及 Tungsten❼ 等.执行过程如下图所示
但是 DataFrame 出来后发现有些情况下 RDD 可以表达的逻辑用 DataFrame 无法表达.比如 要对 group by 或 join 后的结果用自定义的函数,可能用 SQL 是无法表达的.如下代码:
case class ClassData(a: String, b: Int)
case class ClassNullableData(a: String, b: Integer)
val ds = Seq(ClassData(“a”, 1), ClassData(“a”, 2)).toDS()
val agged = ds.groupByKey(d => ClassNullableData(d.a, null))
.mapGroups {
case (key, values) => key.a + values.map(_.b).sum
}
中间处理过程的数据是自定义的类型,并且 groupby 后的聚合逻辑也是自定义的,故用 SQL 比较难以表达,所以提出了 Dataset API.Dataset API 扩展 DataFrame API 支持静态类型和运行已经存在的 Scala 或 Java 语言的用户自定义函数.同时 Dataset 也能享受 Spark SQL 里所有性能 带来的提升.
那么后面发现 Dataset 是包含了 DataFrame 的功能,这样二者就出现了很大的冗余,故在 2.0 时将二者统一,保留 Dataset API,把 DataFrame 表示为 Dataset[Row],即 Dataset 的子集.
因此我们在使用 API 时,优先选择 DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API 也会一直保留着.后续 Spark 上层的库将全部会用 DataFrame,比如 MLlib、Streaming、Graphx 等.
Whole-stage code generation
在参考资料 9 中有几个例子的代码比较,我们看其中一个例子:
elect count(*) from store_sales where ss_item_sk = 1000
那么在翻译成计算引擎的执行计划如下图:
而通常物理计划的代码是这样实现的:
class Filter {
def next(): Boolean = {
var found = false
while (!found && child.next()) {
found = predicate(child.fetch())
}
return found
}
def fetch(): InternalRow = {
child.fetch()
}…
}
但是真正如果我们用 hard code 写的话,代码是这样的:
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
}
}
发现二者相关如下图所示:
那么如何使得计算引擎的物理执行速度能达到 hard code 的性能呢?这就提出了 whole-stage code generation,即对物理执行的多次调用转换为代码 for 循环,类似 hard code 方式,减少中间执行的函数调用次数,当数据记录多时,这个调用次数是很大. 最后这个优化带来的性能提升如下图所示:
从 benchmark 的结果可以看出,使用了该特性后各操作的性能都有很大的提升.
Spark Streaming 是把流式计算看成一个一个的离线计算来完成流式计算,提供了一套 Dstream 的流 API,相比于其他的流式计算,Spark Streaming 的优点是容错性和吞吐量上要有优势❿,关于 Spark Streaming 的详细设计思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 进行详细学习和了解.
在 2.0 以前的版本,用户在使用时,如果有流计算,又有离线计算,就需要用二套 API 去编写程序,一套是 RDD API,一套是 Dstream API.而且 Dstream API 在易用性上远不如 SQL 或 DataFrame.
为了真正将流式计算和离线计算在编程 API 上统一,同时也让 Streaming 作业能够享受 DataFrame/Dataset 上所带来的优势:性能提升和 API 易用,于是提出了 Structured Streaming.最后我们只需要基于 DataFrame/Dataset 可以开发离线计算和流式计算的程序,很容易使得 Spark 在 API 跟业界所说的 DataFlow 来统一离线计算和流式计算效果一样.
比如在做 Batch Aggregation 时我们可以写成下面的代码
那么对于流式计算时,我们仅仅是调用了 DataFrame/Dataset 的不同函数代码,如下:
最后,在 DataFrame/Dataset 这个 API 上可以完成如下图所示的所有应用:
在 http://geek.csdn.net/news/detail/70162 提到的 1.6 问题中 Spillable 集合内存溢出问题在 SPARK-4452 里已解决,BlockManager 死锁问题在 SPARK-12757 里已解决.
最后 2.0 版本还有一些其他的特性,如:
文/王联辉
文章出处——高可用架构微信公众号
转载请注明本页网址:
http://www.vephp.com/jiaocheng/4521.html