热线电话:13121318867

登录
首页大数据时代spark入门必读:核心概念介绍及常用RDD操作
spark入门必读:核心概念介绍及常用RDD操作
2020-07-13
收藏

作者:肖冠宇

来源:大数据DT(ID:hzdashuju)

内容摘编自《企业大数据处理:Spark、Druid、Flume与Kafka应用实践》

导读:Spark是由加州大学伯克利分校AMP实验室开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。Spark内部提供了丰富的开发库,集成了数据分析引擎Spark SQL、图计算框架GraphX、机器学习库MLlib、流计算引擎Spark Streaming。

Spark在函数式编程语言Scala中实现,提供了丰富的开发API,支持Scala、Java、Python、R等多种开发语言。同时,Spark提供了多种运行模式,既可以采用独立部署的方式运行,也可以依托Hadoop YARN、Apache Mesos等资源管理器调度任务运行。

目前,Spark已经在金融、交通、医疗、气象等多种领域中广泛使用。

01 Spark概述

1. 核心概念介绍

Spark架构示意图如图2-1所示,下面将分别介绍各核心组件。

Client:客户端进程,负责提交作业。

Driver:一个Spark作业有一个Spark Context,一个Spark Context对应一个Driver进程,作业的main函数运行在Driver中。Driver主要负责Spark作业的解析,以及通过DAGScheduler划分Stage,将Stage转化成TaskSet提交给TaskScheduler任务调度器,进而调度Task到Executor上执行。

Executor:负责执行Driver分发的Task任务。集群中一个节点可以启动多个Executor,每一个Executor可以执行多个Task任务。

Catche:Spark提供了对RDD不同级别的缓存策略,分别可以缓存到内存、磁盘、外部分布式内存存储系统Tachyon等。

Application:提交的一个作业就是一个Application,一个Application只有一个Spark Context。

Job:RDD执行一次Action操作就会生成一个Job。

Task:Spark运行的基本单位,负责处理RDD的计算逻辑。

Stage:DAGScheduler将Job划分为多个Stage,Stage的划分界限为Shuffle的产生,Shuffle标志着上一个Stage的结束和下一个Stage的开始。

TaskSet:划分的Stage会转换成一组相关联的任务集。

RDD(Resilient Distributed Dataset):弹性分布式数据集,可以理解为一种只读的分布式多分区的数组,Spark计算操作都是基于RDD进行的,下面会有详细介绍。

DAG(Directed Acyclic Graph):有向无环图。Spark实现了DAG的计算模型,DAG计算模型是指将一个计算任务按照计算规则分解为若干子任务,这些子任务之间根据逻辑关系构建成有向无环图。

▲图2-1 Spark架构示意图

2. RDD介绍

RDD从字面上理解有些困难,我们可以认为是一种分布式多分区只读的数组,Spark计算操作都是基于RDD进行的。

RDD具有几个特性:只读、多分区、分布式,可以将HDFS块文件转换成RDD,也可以由一个或多个RDD转换成新的RDD,失效自动重构。基于这些特性,RDD在分布式环境下能够被高效地并行处理。

(1)计算类型

在Spark中RDD提供Transformation和Action两种计算类型。Transformation操作非常丰富,采用延迟执行的方式,在逻辑上定义了RDD的依赖关系和计算逻辑,但并不会真正触发执行动作,只有等到Action操作才会触发真正执行操作。Action操作常用于最终结果的输出。

常用的Transformation操作及其描述:

map (func):接收一个处理函数并行处理源RDD中的每个元素,返回与源RDD元素一一对应的新RDD

filter (func):并行处理源RDD中的每个元素,接收一个处理函数,并根据定义的规则对RDD中的每个元素进行过滤处理,返回处理结果为true的元素重新组成新的RDD

flatMap (func):flatMap是map和flatten的组合操作,与map函数相似,不过map函数返回的新RDD包含的元素可能是嵌套类型,flatMap接收一个处理嵌套会将嵌套类型的元素展开映射成多个元素组成新的RDD

mapPartitions (func):与map函数应用于RDD中的每个元素不同,mapPartitions应用于RDD中的每个分区。mapPartitions函数接收的参数为func函数,func接收参数为每个分区的迭代器,返回值为每个分区元素处理之后组成的新的迭代器,func会作用于分区中的每一个元素。有一种典型的应用场景,比如待处理分区中的数据需要写入到数据库,如果使用map函数,每一个元素都会创建一个数据库连接对象,非常耗时并且容易引起问题发生,如果使用mapPartitions函数只会在分区中创建一个数据库连接对象,性能提高明显

mapPartitionsWithIndex(func):作用与mapPartitions函数相同,只是接收的参数func函数需要传入两个参数,分区的索引作为第一个参数传入,按照分区的索引对分区中元素进行处理

union (otherDataset):将两个RDD进行合并,返回结果为RDD中元素(不去重)

intersection (otherDataset):对两个RDD进行取交集运算,返回结果为RDD无重复元素

distinct ([numTasks])):对RDD中元素去重

groupByKey ([numTasks]):在KV类型的RDD中按Key分组,将相同Key的元素聚集到同一个分区内,此函数不能接收函数作为参数,只接收一个可选参数任务数,所以不能在RDD分区本地进行聚合计算,如需按Key对Value聚合计算,只能对groupByKey返回的新RDD继续使用其他函数运算

reduceByKey (func, [numTasks]):对KV类型的RDD按Key分组,接收两个参数,第一个参数为处理函数,第二个参数为可选参数设置reduce的任务数。reduceByKey函数能够在RDD分区本地提前进行聚合运算,这有效减少了shuffle过程传输的数据量。相对于groupByKey函数更简洁高效

aggregateByKey (zeroValue)(seqOp, combOp):对KV类型的RDD按Key分组进行reduce计算,可接收三个参数,第一个参数是初始化值,第二个参数是分区内处理函数,第三个参数是分区间处理函数

sortByKey ([ascending], [numTasks]):对KV类型的RDD内部元素按照Key进行排序,排序过程会涉及Shuffle

join (otherDataset, [numTasks]):对KV类型的RDD进行关联,只能是两个RDD之间关联,超过两个RDD关联需要使用多次join函数,join函数只会关联出具有相同Key的元素,相当于SQL语句中的inner join

cogroup (otherDataset, [numTasks]):对KV类型的RDD进行关联,cogroup处理多个RDD关联比join更加优雅,它可以同时传入多个RDD作为参数进行关联,产生的新RDD中的元素不会出现笛卡尔积的情况,使用fullOuterJoin函数会产生笛卡尔积

coalesce (numPartitions):对RDD重新分区,将RDD中的分区数减小到参数numPartitions个,不会产生shuffle。在较大的数据集中使用filer等过滤操作后可能会产生多个大小不等的中间结果数据文件,重新分区并减小分区可以提高作业的执行效率,是Spark中常用的一种优化手段

repartition (numPartitions):对RDD重新分区,接收一个参数——numPartitions分区数,是coalesce函数设置shuffle为true的一种实现形式

repartitionAndSortWithinPartitions (partitioner):接收一个分区对象(如Spark提供的分区类HashPartitioner)对RDD中元素重新分区并在分区内排序

常用的Action操作及其描述:

reduce(func):处理RDD两两之间元素的聚集操作

collect():返回RDD中所有数据元素

count():返回RDD中元素个数

first():返回RDD中的第一个元素

take(n):返回RDD中的前n个元素

saveAsTextFile(path):将RDD写入文本文件,保存至本地文件系统或者HDFS

saveAsSequenceFile(path):将KV类型的RDD写入SequenceFile文件,保存至本地文件系统或者HDFS

countByKey():返回KV类型的RDD每个Key包含的元素个数

foreach(func):遍历RDD中所有元素,接收参数为func函数,常用操作是传入println函数打印所有元素

HDFS文件生成Spark RDD,经过map、filter、join等多次Transformation操作,最终调用saveAsTextFile Action操作将结果集输出到HDFS,并以文件形式保存。RDD的流转过程如图2-2所示。

▲图2-2 RDD的流转过程示意图

(2)缓存

在Spark中RDD可以缓存到内存或者磁盘上,提供缓存的主要目的是减少同一数据集被多次使用的网络传输次数,提高Spark的计算性能。Spark提供对RDD的多种缓存级别,可以满足不同场景对RDD的使用需求。RDD的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。

在代码中可以使用persist()方法或cache()方法缓存RDD。cache()方法默认将RDD缓存到内存中,cache()方法和persist()方法都可以用unpersist()方法来取消RDD缓存。示例如下:

val fileDataRdd = sc.textFile("hdfs://data/hadoop/test.text")

fileDataRdd.cache()        // 缓存RDD到内存

或者

fileDataRdd.persist(StorageLevel.MEMORY_ONLY)

fileDataRdd..unpersist()        // 取消缓存

Spark的所有缓存级别定义在org.apache.spark.storage.StorageLevel对象中,如下所示。

object storageLevel extends scala.AnyRef with scala.Serializable {

val NONE : org.apache.spark.storage.StorageLevel

val DISK_ONLY : org.apache.spark.storage.StorageLevel

val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel

val MEMORY_ONLY : org.apache.spark.storage.StorageLevel

val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel

val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel

val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel

val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel

val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel

val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel

val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel

val OFF_HEAP : org.apache.spark.storage.StorageLevel

Spark各缓存级别及其描述:

MEMORY_ONLY:RDD仅缓存一份到内存,此为默认级别

MEMORY_ONLY_2:将RDD分别缓存在集群的两个节点上,RDD在集群内存中保存两份

MEMORY_ONLY_SER:将RDD以Java序列化对象的方式缓存到内存中,有效减少了RDD在内存中占用的空间,不过读取时会消耗更多的CPU资源

DISK_ONLY:RDD仅缓存一份到磁盘

MEMORY_AND_DISK:RDD仅缓存一份到内存,当内存中空间不足时会将部分RDD分区缓存到磁盘

MEMORY_AND_DISK_2:将RDD分别缓存在集群的两个节点上,当内存中空间不足时会将部分RDD分区缓存到磁盘,RDD在集群内存中保存两份

MEMORY_AND_DISK_SER:将RDD以Java序列化对象的方式缓存到内存中,当内存中空间不足时会将部分RDD分区缓存到磁盘,有效减少了RDD在内存中占用的空间,不过读取时会消耗更多的CPU资源

OFF_HEAP:将RDD以序列化的方式缓存到JVM之外的存储空间Tachyon中,与其他缓存模式相比,减少了JVM垃圾回收开销。Spark执行程序失败不会导致数据丢失,Spark与Tachyon已经能较好地兼容,使用起来方便稳定

(3)依赖关系

窄依赖(Narrow Dependency):父RDD的分区只对应一个子RDD的分区,如图2-3所示,如果子RDD只有部分分区数据损坏或者丢失,只需要从对应的父RDD重新计算恢复。

▲图2-3 窄依赖示意图

宽依赖(Shuffle Dependency):子RDD分区依赖父RDD的所有分区,如图2-4所示。如果子RDD部分分区甚至全部分区数据损坏或丢失,需要从所有父RDD重新计算,相对窄依赖而言付出的代价更高,所以应尽量避免宽依赖的使用。

▲图2-4 宽依赖示意图

Lineage:每个RDD都会记录自己依赖的父RDD信息,一旦出现数据损坏或者丢失将从父RDD迅速重新恢复。

3. 运行模式

Spark运行模式主要有以下几种:

Local模式:本地采用多线程的方式执行,主要用于开发测试。

On Yarn模式:Spark On Yarn有两种模式,分别为yarn-client和yarn-cluster模式。yarn-client模式中,Driver运行在客户端,其作业运行日志在客户端查看,适合返回小数据量结果集交互式场景使用。yarn-cluster模式中,Driver运行在集群中的某个节点,节点的选择由YARN调度,作业日志通过yarn管理名称查看:yarn logs -applicationId,也可以在YARN的Web UI中查看,适合大数据量非交互式场景使用。

提交作业命令:

./bin/spark-submit --class package.MainClass \    # 作业执行主类,需要完成的包路径

--master spark://host:port, mesos://host:port, yarn, or local\Maste

# 运行方式

---deploy-mode client,cluster\ # 部署模式,如果Master采用YARN模式则可以选择使用clent模式或者cluster模式,默认client模式

--driver-memory 1g \          # Driver运行内存,默认1G

---driver-cores 1 \          # Driver分配的CPU核个数

--executor-memory 4g \       # Executor内存大小

--executor-cores 1 \           # Executor分配的CPU核个数

---num-executors \           # 作业执行需要启动的Executor数

---jars \               # 作业程序依赖的外部jar包,这些jar包会从本地上传到Driver然后分发到各Executor classpath中。

lib/spark-examples*.jar \      # 作业执行JAR包

[other application arguments ]       # 程序运行需要传入的参数

作业在yarn-cluster模式下的执行过程如图2-5所示。

▲图2-5 作业在yarn-cluster模式下的执行过程

Client在任何一台能与Yarn通信的入口机向Yarn提交作业,提交的配置中可以设置申请的资源情况,如果没有配置则将采用默认配置。

ResourceManager接收到Client的作业请求后,首先检查程序启动的ApplicationMaster需要的资源情况,然后向资源调度器申请选取一个能够满足资源要求的NodeManager节点用于启动ApplicationMaster进程,ApplicationMaster启动成功之后立即在该节点启动Driver进程。

ApplicationMaster根据提交作业时设置的Executor相关配置参数或者默认配置参数与ResourceManager通信领取Executor资源信息,并与相关NodeManager通信启动Executor进程。

Executor启动成功之后与Driver通信领取Driver分发的任务。

Task执行,运行成功输出结果。

02 Shuffle详解

Shuffle最早出现于MapReduce框架中,负责连接Map阶段的输出与Reduce阶段的输入。Shuffle阶段涉及磁盘IO、网络传输、内存使用等多种资源的调用,所以Shuffle阶段的执行效率影响整个作业的执行效率,大部分优化也都是针对Shuffle阶段进行的。

Spark是实现了MapReduce原语的一种通用实时计算框架。Spark作业中Map阶段的Shuffle称为Shuffle Write,Reduce阶段的Shuffle称为Shuffle Read。

Shuffle Write阶段会将Map Task中间结果数据写入到本地磁盘,而在Shuffle Read阶段中,Reduce Task从Shuffle Write阶段拉取数据到内存中并行计算。Spark Shuffle阶段的划分方式如图2-6所示。

▲图2-6 Spark Shuffle阶段的划分方式

1. Shuffle Write实现方式

(1)基于Hash的实现(hash-based)

每个Map Task都会生成与Reduce Task数据相同的文件数,对Key取Hash值分别写入对应的文件中,如图2-7所示。

生成的文件数FileNum=MapTaskNum×ReduceTaskNum,如果Map Task和Reduce Task数都比较多就会生成大量的小文件,写文件过程中,每个文件都要占用一部分缓冲区,总占用缓冲区大小TotalBufferSize=CoreNum×ReduceTaskNum×FileBufferSize,大量的小文件就会占用更多的缓冲区,造成不必要的内存开销,同时,大量的随机写操作会大大降低磁盘IO的性能。

▲图2-7 基于Hash的实现方式

由于简单的基于Hash的实现方式扩展性较差,内存资源利用率低,过多的小文件在文件拉取过程中增加了磁盘IO和网络开销,所以需要对基于Hash的实现方式进行进一步优化,为此引入了Consolidate(合并)机制。

如图2-8所示,将同一个Core中执行的Task输出结果写入到相同的文件中,生成的文件数FileNum=CoreNum×ReduceTaskNum,这种优化方式减少了生成的文件数目,提高了磁盘IO的吞吐量,但是文件缓存占用的空间并没有减少,性能没有得到明显有效的提高。

▲图2-8 优化后的基于Hash的实现方式

设置方式:

代码中设置:conf.get("spark.shuffle.manager", "hash")

配置文件中设置:在conf/spark-default.conf配置文件中添加spark.shuffle.managerhash

基于Hash的实现方式的优缺点:

优点:实现简单,小数量级数据处理操作方便。

缺点:产生小文件过多,内存利用率低,大量的随机读写造成磁盘IO性能下降。

(2)基于Sort的实现方式(sort-based)

为了解决基于Hash的实现方式的诸多问题,Spark Shuffle引入了基于Sort的实现方式,如图2-9所示。该方式中每个Map Task任务生成两个文件,一个是数据文件,一个是索引文件,生成的文件数FileNum=MapTaskNum×2.

数据文件中的数据按照Key分区在不同分区之间排序,同一分区中的数据不排序,索引文件记录了文件中每个分区的偏移量和范围。当Reduce Task读取数据时,先读取索引文件找到对应的分区数据偏移量和范围,然后从数据文件读取指定的数据。

设置方式:

代码中设置:conf.get("spark.shuffle.manager", "sort")

配置文件中设置:在conf/spark-default.conf配置文件中添加spark.shuffle.manager sort

▲图2-9 基于Sort的实现方式

基于Sort的实现方式的优缺点:

优点:顺序读写能够大幅提高磁盘IO性能,不会产生过多小文件,降低文件缓存占用内存空间大小,提高内存使用率。

缺点:多了一次粗粒度的排序。

2. Shuffle Read实现方式

Shuffle Read阶段中Task通过直接读取本地Shuffle Write阶段产生的中间结果数据或者通过HTTP的方式从远程Shuffle Write阶段拉取中间结果数据进行处理。Shuffle Write阶段基于Hash和基于Sort两种实现方式产生的中间结果数据在Shuffle Read阶段采用同一种实现方式。

获取需要拉取的数据信息,根据数据本地性原则判断采用哪种级别的拉取方式。

判断是否需要在Map端聚合(reduceByKey会在Map端预聚合)。

Shuffle Read阶段Task拉取过来的数据如果涉及聚合或者排序,则会使用HashMap结构在内存中存储,如果拉取过来的数据集在HashMap中已经存在相同的键则将数据聚合在一起。此时涉及一个比较重要的参数——spark.shuffle.spill,决定在内存被写满后是否将数据以文件的形式写入到磁盘,默认值为true,如果设置为false,则有可能会发生OOM内存溢出的风险,建议开启。

排序聚合之后的数据以文件形式写入磁盘将产生大量的文件内数据有序的小文件,将这些小文件重新加载到内存中,随后采用归并排序的方式合并为一个大的数据文件。

本文摘编自《企业大数据处理:Spark、Druid、Flume与Kafka应用实践》,经出版方授权发布。

数据分析咨询请扫描二维码

若不方便扫码,搜微信号:CDAshujufenxi

最新资讯
更多
客服在线
立即咨询