目录

Spark入门

概述

Spark 是开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。

1.1 Spark 基础核心概念

  • Client:
  • Driver:
  • Executor: 负责执行 Driver 分发的 Task 任务。集群中一个节点可以启动多个 Executor,每个 Executor 可以实行多个 Task 任务。
  • Catche: Spark 提供了对 RDD 不同级别的缓存策略,分别可以缓存到内存、磁盘、外部分布式存储系统等。
  • Application: 提交的一个作业就是一个 Application。
  • Job: RDD 执行一次 Action 操作就会生成一个 Job。
  • Task: Spark 运行的基本单位,负责处理 RDD 的计算逻辑。
  • Stage: DAGScheduler 将 Job 划分为多个 Stage,Stage 的划分界限为 Shuffle 的产生,Shuffle 标志着上一个 Stage 的结束和下一个 Stage 的开始。
  • TaskSet: 划分的 Stage 会转换成一组相关联的任务集合。
  • RDD:
  • DAG: 有向无环图。

1.1.1 RDD 介绍

是一种分布式多分区只读数组。具有几个特性,只读、多分区、分布式。

1.1.2 计算类型

Spark 中 RDD 提供了 Transformation 和 Action 两种计算类型。前者操作非常丰富,采用延迟执行的方式,在逻辑上定义了 RDD 的依赖关系和计算逻辑,并不会触发执行动作,只有等到 Action 操作才会触发真正执行操作。

1.1.3 缓存

Spark 中的 RDD 可以缓存到内存或者磁盘上,提供缓存的主要目的是减少 同一数据集被多次使用的网络传输次数,提高 Spark 的计算性能。RDD 的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。

1.1.4 依赖关系

  • 窄依赖:父 RDD 的分区只对应一个子 RDD 的分区,如果子 RDD 只有部分分区数据损坏或者丢失,只需从对应的父 RDD 重新计算恢复。
  • 宽依赖:子 RDD d分区依赖于父 RDD 的所有分区,如果子 RDD 部分分区甚至全部分区数据损坏或丢失,需要从所有父 RDD 重新计算,相对依赖而言复出的代价更高,所以应尽量避免。
  • Lineage:每个 RDD 都会记录自己依赖的父 RDD 的信息,一旦出现数据损坏或者丢失可以从父 RDD 迅速恢复。

1.2 运行模式

  • Local:
  • On Yarn 模式: Spark On Yarn 有两种模式,分别为 yarn-client 和 yarn-cluster 模式。前者中 Driver 运行在客户端,其作业运行日志在客户端查看,适合返回小数据量结果集交互式场景使用。后者的 Driver 运行在集群中的某个节点,节点的选择由 YARN 来调度,作业日志通过 yarn 管理名称查看,适合非交互模式。

2 Shuffle详解

Shuffle 最早出现在 MapReduce 框架中,负责连接 Map 阶段的输出与 Reduce 阶段的输入。Shuffle 阶段涉及磁盘IO、网络传输、内存使用等多重资源的调用,所以 Shuffle 阶段执行效率影响整个作业的执行效率。大部分优化也都针对 Shuffle 阶段进行。 在 Spark 中,Shuffle Write 相当于 Map,Shuffle Read 相当于 Read。前者会将 Map Task 中间结果数据写入到本地磁盘,而后者会从前者阶段拉取数据到内存中进行并行计算。

2.1 Shuffle Write的实现方式

2.1.1 基于Hash的实现

每个 Map 任务都生成与 Reducec 任务数据相同的文件数,对 Key 取哈希值分别写入对应的文件里。生成的文件数是 Map 任务数与 Reduce 任务数的积。写文件过程中,每个文件都要占用一部分缓冲区。大量的小文件会占用很多的缓冲区,造成不必要的内存开销,同时大量的随机写操作也会大大降低磁盘IO的性能。

2.1.2 基于Sort的实现

为了解决基于 Hash 的实现方式的诸多问题,引入基于 Sort 的实现方式,每个 Map 任务生成两个文件,一个是数据文件,一个是索引文件,生成的文件数是 Map 任务数的两倍。数据文件中的数据按照 Key 分区在不同分区之间排序,统一分区中的数据不排序,索引文件记录了文件中每个分区的偏移量和范围,当 Reduce 任务读取数据时候,先读取索引文件找到对应的分区数据偏移量和范围,然后从数据文件读取指定的数据。其优缺点包括:

  • 顺序读写可以大幅提高磁盘 IO 的性能,不会产生太多小文件,降低文件缓存占内存空间大小,提高内存使用率。
  • 多了一次粗粒度的排序

2.2 Shuffle Read实现方式

这个阶段中的 Task 通过直接读取本地 Write 阶段产生的中间结果数据或者通过 HTTP 方式从远程 Write 阶段拉取中间结果数据进行处理。

  1. 获取需要拉取的数据信息,根据数据本地性原则判断采用哪种级别的拉取方式。
  2. 判断是否需要在 Map 端聚合。
  3. Read 阶段的 Task 拉取过来的数据如果涉及聚合或者排序,则会使用 HashMap 结构在内存中存储,如果拉取过来的数据集在 HashMap 中已经存在相同的键则将数据聚合在一起。

3 Spark SQl

3.1 Spark Session

从 Spark 2.0 开始引入了 SparkSession,用于在 Spark SQL 开发过程中初始化上下文,为用户提供统一的入口。2.0开始使用运行的参数设置和获取都可以通过 conf 方法来实现。返回 RuntimeConfig 对象。

3.2 DataFrame

1.3后引入 DataFrame,是一种带有 Schema 元数据的分布式数据集,类似于传统数据库中的二维表。

3.2.1 DataFrame的常用操作

  1. toDF 作为 DataSet 的一种特殊形式,可以将 RDD 转换成为 DataFrame。
  2. as 返回一个指定别名的新 DataSet。
  3. printSchema
  4. show
  5. createTempView 和 createOrReplaceTempView 创建临时视图,临时视图随着创建该视图会话的终止自动删除,不会绑定到任何数据库中。
  6. createGlobalTempView 函数 创建全局临时视图,该视图的声明周期与 Spark 应用程序声明周期相关联。随着 Spark 应用程序的终止自动删除,它与系统保留的数据库 _global_temp 绑定。

3.2.2 DataFrame持久化

DF 可以以不同文件格式输出到指定路径,可以保存到 Hive 表,还可以通过 JDBC 连接输出到数据库表中。DF 的四种保存模式:

  1. SaveMode.ErrorIfExists: 表示如果输出数据或者目标表已经存在则抛出异常,此为默认保存模式。
  2. SaveMode.Append: 表示如果输出数据或目标表已经存在,则 DF 数据会追加。
  3. SaveMode.Overwrite:
  4. SaveMode.Ignore: 表示如果输出数据或目标表已经存在,则不做任何操作。

3.2.3 DataSet

DataSet 是一个特定域的强类型的不可变数据集,每个 DS 都有一个非类型化视图 DataFrame,也可以说 DataFrame 是 DataSet[Row] 的一种表示形式。DF 可以通过调用 as(Encoder) 函数转换成 DataSet,而 DataSet 则可以通过调用 toDF 函数转换成 DF。两者之间可以互相灵活转换。操作 DS 可以像操作 RDD 一样使用各种转换算子并行操作,采用惰性执行的执行方式,当调用 Action 才会真正执行。 创建 DS 需要显示提供 Encoder 把对象序列化为二进制形式进行存储,而不是使用 Java 序列化或者 Kryo 序列化方式。DataSet 使用专门的编码器序列化对象在网络间传输处理。编码器动态生成代码,可以在编译的时候检查类型,不需要将对象反序列化就可以过滤、排序等曹组,避免了 Shuffle 过程中频繁的序列化和反序列化,有效减少内存的使用和 Java 对象频繁的 GC 的开销。

4 Structured Streaming

设计意图是将流数据结构化,基于 Spark 引擎可扩展和高容错流处理引擎。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。