Spark入门
概述
Spark 是开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。
1.1 Spark 基础核心概念
- Client:
- Driver:
- Executor: 负责执行 Driver 分发的 Task 任务。集群中一个节点可以启动多个 Executor,每个 Executor 可以实行多个 Task 任务。
- Catche: Spark 提供了对 RDD 不同级别的缓存策略,分别可以缓存到内存、磁盘、外部分布式存储系统等。
- Application: 提交的一个作业就是一个 Application。
- Job: RDD 执行一次 Action 操作就会生成一个 Job。
- Task: Spark 运行的基本单位,负责处理 RDD 的计算逻辑。
- Stage: DAGScheduler 将 Job 划分为多个 Stage,Stage 的划分界限为 Shuffle 的产生,Shuffle 标志着上一个 Stage 的结束和下一个 Stage 的开始。
- TaskSet: 划分的 Stage 会转换成一组相关联的任务集合。
- RDD:
- DAG: 有向无环图。
1.1.1 RDD 介绍
是一种分布式多分区只读数组。具有几个特性,只读、多分区、分布式。
1.1.2 计算类型
Spark 中 RDD 提供了 Transformation 和 Action 两种计算类型。前者操作非常丰富,采用延迟执行的方式,在逻辑上定义了 RDD 的依赖关系和计算逻辑,并不会触发执行动作,只有等到 Action 操作才会触发真正执行操作。
1.1.3 缓存
Spark 中的 RDD 可以缓存到内存或者磁盘上,提供缓存的主要目的是减少 同一数据集被多次使用的网络传输次数,提高 Spark 的计算性能。RDD 的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。
1.1.4 依赖关系
- 窄依赖:父 RDD 的分区只对应一个子 RDD 的分区,如果子 RDD 只有部分分区数据损坏或者丢失,只需从对应的父 RDD 重新计算恢复。
- 宽依赖:子 RDD d分区依赖于父 RDD 的所有分区,如果子 RDD 部分分区甚至全部分区数据损坏或丢失,需要从所有父 RDD 重新计算,相对依赖而言复出的代价更高,所以应尽量避免。
- Lineage:每个 RDD 都会记录自己依赖的父 RDD 的信息,一旦出现数据损坏或者丢失可以从父 RDD 迅速恢复。
1.2 运行模式
- Local:
- On Yarn 模式: Spark On Yarn 有两种模式,分别为 yarn-client 和 yarn-cluster 模式。前者中 Driver 运行在客户端,其作业运行日志在客户端查看,适合返回小数据量结果集交互式场景使用。后者的 Driver 运行在集群中的某个节点,节点的选择由 YARN 来调度,作业日志通过 yarn 管理名称查看,适合非交互模式。
2 Shuffle详解
Shuffle 最早出现在 MapReduce 框架中,负责连接 Map 阶段的输出与 Reduce 阶段的输入。Shuffle 阶段涉及磁盘IO、网络传输、内存使用等多重资源的调用,所以 Shuffle 阶段执行效率影响整个作业的执行效率。大部分优化也都针对 Shuffle 阶段进行。 在 Spark 中,Shuffle Write 相当于 Map,Shuffle Read 相当于 Read。前者会将 Map Task 中间结果数据写入到本地磁盘,而后者会从前者阶段拉取数据到内存中进行并行计算。
2.1 Shuffle Write的实现方式
2.1.1 基于Hash的实现
每个 Map 任务都生成与 Reducec 任务数据相同的文件数,对 Key 取哈希值分别写入对应的文件里。生成的文件数是 Map 任务数与 Reduce 任务数的积。写文件过程中,每个文件都要占用一部分缓冲区。大量的小文件会占用很多的缓冲区,造成不必要的内存开销,同时大量的随机写操作也会大大降低磁盘IO的性能。
2.1.2 基于Sort的实现
为了解决基于 Hash 的实现方式的诸多问题,引入基于 Sort 的实现方式,每个 Map 任务生成两个文件,一个是数据文件,一个是索引文件,生成的文件数是 Map 任务数的两倍。数据文件中的数据按照 Key 分区在不同分区之间排序,统一分区中的数据不排序,索引文件记录了文件中每个分区的偏移量和范围,当 Reduce 任务读取数据时候,先读取索引文件找到对应的分区数据偏移量和范围,然后从数据文件读取指定的数据。其优缺点包括:
- 顺序读写可以大幅提高磁盘 IO 的性能,不会产生太多小文件,降低文件缓存占内存空间大小,提高内存使用率。
- 多了一次粗粒度的排序
2.2 Shuffle Read实现方式
这个阶段中的 Task 通过直接读取本地 Write 阶段产生的中间结果数据或者通过 HTTP 方式从远程 Write 阶段拉取中间结果数据进行处理。
- 获取需要拉取的数据信息,根据数据本地性原则判断采用哪种级别的拉取方式。
- 判断是否需要在 Map 端聚合。
- Read 阶段的 Task 拉取过来的数据如果涉及聚合或者排序,则会使用 HashMap 结构在内存中存储,如果拉取过来的数据集在 HashMap 中已经存在相同的键则将数据聚合在一起。
3 Spark SQl
3.1 Spark Session
从 Spark 2.0 开始引入了 SparkSession,用于在 Spark SQL 开发过程中初始化上下文,为用户提供统一的入口。2.0开始使用运行的参数设置和获取都可以通过 conf 方法来实现。返回 RuntimeConfig 对象。
3.2 DataFrame
1.3后引入 DataFrame,是一种带有 Schema 元数据的分布式数据集,类似于传统数据库中的二维表。
3.2.1 DataFrame的常用操作
- toDF 作为 DataSet 的一种特殊形式,可以将 RDD 转换成为 DataFrame。
- as 返回一个指定别名的新 DataSet。
- printSchema
- show
- createTempView 和 createOrReplaceTempView 创建临时视图,临时视图随着创建该视图会话的终止自动删除,不会绑定到任何数据库中。
- createGlobalTempView 函数 创建全局临时视图,该视图的声明周期与 Spark 应用程序声明周期相关联。随着 Spark 应用程序的终止自动删除,它与系统保留的数据库 _global_temp 绑定。
3.2.2 DataFrame持久化
DF 可以以不同文件格式输出到指定路径,可以保存到 Hive 表,还可以通过 JDBC 连接输出到数据库表中。DF 的四种保存模式:
- SaveMode.ErrorIfExists: 表示如果输出数据或者目标表已经存在则抛出异常,此为默认保存模式。
- SaveMode.Append: 表示如果输出数据或目标表已经存在,则 DF 数据会追加。
- SaveMode.Overwrite:
- SaveMode.Ignore: 表示如果输出数据或目标表已经存在,则不做任何操作。
3.2.3 DataSet
DataSet 是一个特定域的强类型的不可变数据集,每个 DS 都有一个非类型化视图 DataFrame,也可以说 DataFrame 是 DataSet[Row] 的一种表示形式。DF 可以通过调用 as(Encoder) 函数转换成 DataSet,而 DataSet 则可以通过调用 toDF 函数转换成 DF。两者之间可以互相灵活转换。操作 DS 可以像操作 RDD 一样使用各种转换算子并行操作,采用惰性执行的执行方式,当调用 Action 才会真正执行。 创建 DS 需要显示提供 Encoder 把对象序列化为二进制形式进行存储,而不是使用 Java 序列化或者 Kryo 序列化方式。DataSet 使用专门的编码器序列化对象在网络间传输处理。编码器动态生成代码,可以在编译的时候检查类型,不需要将对象反序列化就可以过滤、排序等曹组,避免了 Shuffle 过程中频繁的序列化和反序列化,有效减少内存的使用和 Java 对象频繁的 GC 的开销。
4 Structured Streaming
设计意图是将流数据结构化,基于 Spark 引擎可扩展和高容错流处理引擎。