• 沒有找到結果。

5.26 Spark2x

5.26.1 Spark2x 基本原理

说明

Spark2x组件适用于MRS 3.x及后续版本。

简介

Spark是基于内存的分布式计算框架。在迭代计算的场景下,数据处理过程中的数据可 以存储在内存中,提供了比MapReduce高10到100倍的计算能力。Spark可以使用 HDFS作为底层存储,使用户能够快速地从MapReduce切换到Spark计算平台上去。

Spark提供一站式数据分析能力,包括小批量流式处理、离线批处理、SQL查询、数据 挖掘等,用户可以在同一个应用中无缝结合使用这些能力。Spark2x的开源新特性请参 考Spark2x开源新特性。

Spark的特点如下:

● 通过分布式内存计算和DAG(无回路有向图)执行引擎提升数据处理能力,比 MapReduce性能高10倍到100倍。

● 提供多种语言开发接口(Scala/Java/Python),并且提供几十种高度抽象算子,

可以很方便构建分布式的数据处理应用。

● 结合SQL、Streaming等形成数据处理栈,提供一站式数据处理能力。

● 完美契合Hadoop生态环境,Spark应用可以运行在Standalone、Mesos或者YARN 上,能够接入HDFS、HBase、Hive等多种数据源,支持MapReduce程序平滑转 接。

结构

Spark的架构如图5-101所示,各模块的说明如表5-23所示。

5-101 Spark 架构

5-23 基本概念说明

模块 说明

Cluster Manager 集群管理器,管理集群中的资源。Spark支持多种集群管理器,

Spark自带的Standalone集群管理器、Mesos或YARN。Spark 集群默认采用YARN模式。

Application Spark应用,由一个Driver Program和多个Executor组成。

Deploy Mode 部署模式,分为cluster和client模式。cluster模式下,Driver会 在集群内的节点运行;而在client模式下,Driver在客户端运行

(集群外)。

Driver Program 是Spark应用程序的主进程,运行Application的main()函数并 创建SparkContext。负责应用程序的解析、生成Stage并调度 Task到Executor上。通常SparkContext代表Driver Program。

Executor 在Work Node上启动的进程,用来执行Task,管理并处理应用 中使用到的数据。一个Spark应用一般包含多个Executor,每个 Executor接收Driver的命令,并执行一到多个Task。

Worker Node 集群中负责启动并管理Executor以及资源的节点。

Job 一个Action算子(比如collect算子)对应一个Job,由并行计算 的多个Task组成。

Stage 每个Job由多个Stage组成,每个Stage是一个Task集合,由DAG 分割而成。

Task 承载业务逻辑的运算单元,是Spark平台中可执行的最小工作单 元。一个应用根据执行计划以及计算量分为多个Task。

Spark 原理

Spark的应用运行架构如图5-102所示,运行流程如下所示:

1. 应用程序(Application)是作为一个进程的集合运行在集群上的,由Driver进行 协调。

2. 在运行一个应用时,Driver会去连接集群管理器(Standalone、Mesos、YARN)

申请运行Executor资源,并启动ExecutorBackend。然后由集群管理器在不同的应 用之间调度资源。Driver同时会启动应用程序DAG调度、Stage划分、Task生成。

3. 然后Spark会把应用的代码(传递给SparkContext的JAR或者Python定义的代 码)发送到Executor上。

4. 所有的Task执行完成后,用户的应用程序运行结束。

5-102 Spark 应用运行架构

Spark采用Master和worker的模式,如图5-103所示。用户在Spark客户端提交应用程 序,调度器将Job分解为多个Task发送到各个Worker中执行,各个Worker将计算的结 果上报给Driver(即Master),Driver聚合结果返回给客户端。

5-103 Spark 的 Master 和 Worker

在此结构中,有几个说明点:

● 应用之间是独立的。

每个应用有自己的executor进程,Executor启动多个线程,并行地执行任务。无 论是在调度方面,或者是executor方面。各个Driver独立调度自己的任务;不同的 应用任务运行在不同的JVM上,即不同的Executor。

● 不同Spark应用之间是不共享数据的,除非把数据存储在外部的存储系统上(比如 HDFS)。

● 因为Driver程序在集群上调度任务,所以Driver程序最好和worker节点比较近,比 如在一个相同的局部网络内。

Spark on YARN有两种部署模式:

● YARN-Cluster模式下,Spark的Driver会运行在YARN集群内的ApplicationMaster 进程中,ApplicationMaster已经启动之后,提交任务的客户端退出也不会影响任 务的运行。

● YRAN-Client模式下,Driver启动在客户端进程内,ApplicationMaster进程只用来 向YARN集群申请资源。

Spark Streaming 原理

Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式 数据的能力。当前Spark支持两种数据处理方式:Direct Streaming和Receiver方式。

Direct Streaming计算流程

Direct Streaming方式主要通过采用Direct API对数据进行处理。以Kafka Direct接口为 例,与启动一个Receiver来连续不断地从Kafka中接收数据并写入到WAL中相比,

Direct API简单地给出每个batch区间需要读取的偏移量位置。然后,每个batch的Job 被运行,而对应偏移量的数据在Kafka中已准备好。这些偏移量信息也被可靠地存储在 checkpoint文件中,应用失败重启时可以直接读取偏移量信息。

5-104 Direct Kafka 接口数据传输

需要注意的是,Spark Streaming可以在失败后重新从Kafka中读取并处理数据段。然 而,由于语义仅被处理一次,重新处理的结果和没有失败处理的结果是一致的。

因此,Direct API消除了需要使用WAL和Receivers的情况,且确保每个Kafka记录仅被 接收一次,这种接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一 起。总体来说,这些特性使得流处理管道拥有高容错性、高效性及易用性,因此推荐 使用Direct Streaming方式处理数据。

Receiver计算流程

在一个Spark Streaming应用开始时(也就是Driver开始时),相关的

StreamingContext(所有流功能的基础)使用SparkContext启动Receiver成为长驻运 行任务。这些Receiver接收并保存流数据到Spark内存中以供处理。用户传送数据的生 命周期如图5-105所示:

5-105 数据传输生命周期

1. 接收数据(蓝色箭头)

Receiver将数据流分成一系列小块,存储到Executor内存中。另外,在启用预写日 志(Write-ahead Log,简称WAL)以后,数据同时还写入到容错文件系统的预写 日志中。

2. 通知Driver(绿色箭头)

接收块中的元数据(Metadata)被发送到Driver的StreamingContext。这个元数 据包括:

– 定位其在Executor内存中数据位置的块Reference ID。

– 若启用了WAL,还包括块数据在日志中的偏移信息。

3. 处理数据(红色箭头)

对每个批次的数据,StreamingContext使用Block信息产生RDD及其Job。

StreamingContext通过运行任务处理Executor内存中的Block来执行Job。

4. 周期性地设置检查点(橙色箭头)

5. 为了容错的需要,StreamingContext会周期性地设置检查点,并保存到外部文件 系统中。

容错性

Spark及其RDD允许无缝地处理集群中任何Worker节点的故障。鉴于Spark Streaming 建立于Spark之上,因此其Worker节点也具备了同样的容错能力。然而,由于Spark Streaming的长正常运行需求,其应用程序必须也具备从Driver进程(协调各个Worker 的主要应用进程)故障中恢复的能力。使Spark Driver能够容错是件很棘手的事情,因 为可能是任意计算模式实现的任意用户程序。不过Spark Streaming应用程序在计算上 有一个内在的结构:在每批次数据周期性地执行同样的Spark计算。这种结构允许把应 用的状态(亦称Checkpoint)周期性地保存到可靠的存储空间中,并在Driver重新启 动时恢复该状态。

对于文件这样的源数据,这个Driver恢复机制足以做到零数据丢失,因为所有的数据都 保存在了像HDFS这样的容错文件系统中。但对于像Kafka和Flume等其他数据源,有 些接收到的数据还只缓存在内存中,尚未被处理,就有可能会丢失。这是由于Spark应 用的分布操作方式引起的。当Driver进程失败时,所有在Cluster Manager中运行的 Executor,连同在内存中的所有数据,也同时被终止。为了避免这种数据损失,Spark Streaming引进了WAL功能。

WAL通常被用于数据库和文件系统中,用来保证任何数据操作的持久性,即先将操作 记入一个持久的日志,再对数据施加这个操作。若施加操作的过程中执行失败了,则 通过读取日志并重新施加前面预定的操作,系统就得到了恢复。下面介绍了如何利用 这样的概念保证接收到的数据的持久性。

Kafka数据源使用Receiver来接收数据,是Executor中的长运行任务,负责从数据源接 收数据,并且在数据源支持时还负责确认收到数据的结果(收到的数据被保存在 Executor的内存中,然后Driver在Executor中运行来处理任务)。

当启用了预写日志以后,所有收到的数据同时还保存到了容错文件系统的日志文件 中。此时即使Spark Streaming失败,这些接收到的数据也不会丢失。另外,接收数据 的正确性只在数据被预写到日志以后Receiver才会确认,已经缓存但还没有保存的数据 可以在Driver重新启动之后由数据源再发送一次。这两个机制确保了零数据丢失,即所 有的数据或者从日志中恢复,或者由数据源重发。

如果需要启用预写日志功能,可以通过如下动作实现:

● 通过“streamingContext.checkpoint”(path-to-directory)设置checkpoint的目 录,这个目录是一个HDFS的文件路径,既用作保存流的checkpoint,又用作保存 预写日志。

● 设置SparkConf的属性“spark.streaming.receiver.writeAheadLog.enable”为

“true”(默认值是“false”)。

在WAL被启用以后,所有Receiver都获得了能够从可靠收到的数据中恢复的优势。建 议缓存RDD时不采取多备份选项,因为用于预写日志的容错文件系统很可能也复制了 数据。

说明

在启用了预写日志以后,数据接收吞吐率会有降低。由于所有数据都被写入容错文件系统,文件 系统的写入吞吐率和用于数据复制的网络带宽,可能就是潜在的瓶颈了。在此情况下,最好创建 更多的Recevier增加数据接收的并行度,或使用更好的硬件以增加容错文件系统的吞吐率。

恢复流程

当一个失败的Driver重启时,按如下流程启动:

5-106 计算恢复流程

1. 恢复计算(橙色箭头)

使用checkpoint信息重启Driver,重新构造SparkContext并重启Receiver。

2. 恢复元数据块(绿色箭头)

为了保证能够继续下去所必备的全部元数据块都被恢复。

3. 未完成作业的重新形成(红色箭头)

由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生RDD和对应的 作业。

4. 读取保存在日志中的块数据(蓝色箭头)

4. 读取保存在日志中的块数据(蓝色箭头)