目录

Spark-Structured-Streaming编程指南

1 Overview

Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。您可以以静态数据表示批量计算的方式来表达 streaming computation (流式计算)。 Spark SQL 引擎将随着 streaming data 持续到达而增量地持续地运行,并更新最终结果。您可以使用 Scala,Java,Python 或 R 中的 Dataset/DataFrame API 来表示 streaming aggregations (流聚合),event-time windows (事件时间窗口),stream-to-batch joins (流到批处理连接)等。在同一个 optimized Spark SQL engine (优化的 Spark SQL 引擎)上执行计算。最后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),而无需用户理解 streaming 。

在本指南中,我们将向您介绍 programming model (编程模型) 和 APIs 。首先,我们从一个简单的例子开始 - 一个 streaming word count 。

2 快速示例

假设您想要保持从监听 TCP socket 的 data server (数据服务器) 接收的 text data (文本数据)的运行的 word count。让我们看看如何使用 Structured Streaming 表达这一点。你可以在 Scala/Java/Python/R 之中看到完整的代码。 并且如果您下载 Spark ,您可以直接运行这个例子。在任何情况下,让我们逐步了解示例并了解它的工作原理。首先,我们必须导入必要的 classes 并创建一个本地的 SparkSession,这是与 Spark 相关的所有功能的起点。

1
2
3
4
5
6
7
8
9
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._

接下来,我们创建一个 streaming DataFrame ,它表示从监听 localhost:9999 的服务器上接收的 text data (文本数据),并且将 DataFrame 转换以计算 word counts。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 创建表示从连接到 localhost:9999 的输入行 stream 的 DataFrame
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 将 lines 切分为 words
val words = lines.as[String].flatMap(_.split(" "))

// 生成正在运行的 word count
val wordCounts = words.groupBy("value").count()

这个 lines DataFrame 表示一个包含包含 streaming text data (流文本数据) 的无边界表。此表包含了一列名为 “value” 的 strings ,并且 streaming text data 中的每一 line (行)都将成为表中的一 row (行)。请注意,这并不是正在接收的任何数据,因为我们只是设置 transformation (转换),还没有开始。接下来,我们使用 .as[String] 将 DataFrame 转换为 String 的 Dataset ,以便我们可以应用 flatMap 操作将每 line (行)切分成多个 words 。所得到的 words Dataset 包含所有的 words 。最后,我们通过将 Dataset 中 unique values (唯一的值)进行分组并对它们进行计数来定义 wordCounts DataFrame 。请注意,这是一个 streaming DataFrame ,它表示 stream 的正在运行的 word counts 。

我们现在已经设置了关于 streaming data (流数据)的 query (查询)。剩下的就是实际开始接收数据并计算 counts (计数)。为此,我们将其设置为在每次更新时将完整地计数(由 outputMode(“complete”) 指定)发送到控制台。然后使用 start() 启动 streaming computation (流式计算)。

1
2
3
4
5
6
7
// 开始运行将 running counts 打印到控制台的查询
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

执行此代码之后,streaming computation (流式计算)将在后台启动。 query 对象是该 active streaming query(活动流查询)的 handle (句柄),并且我们决定使用 awaitTermination() 来等待查询的终止,以防止查询处于 active (活动)状态时退出。

要实际执行此示例代码,您可以在您自己的 Spark 应用程序 编译代码,或者简单地运行示例。我们正在展示的是后者。您将首先需要运行 Netcat(大多数类 Unix 系统中的一个小型应用程序)作为 data server 通过使用。

1
$ nc -lk 9999

然后,在一个不同的终端,您可以启动示例通过使用

1
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

然后,在运行 netcat 服务器的终端中输入的任何 lines 将每秒计数并打印在屏幕上。它看起来像下面这样。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

3 Programming Model

Structured Streaming 的关键思想是将 live data stream (实时数据流)视为一种正在不断 appended (附加)的表。这形成了一个与 batch processing model (批处理模型)非常相似的新的 stream processing model (流处理模型)。您会将您的 streaming computation (流式计算)表示为在一个静态表上的 standard batch-like query (标准类批次查询),并且 Spark 在 unbounded(无界) 输入表上运行它作为 incremental(增量) 查询。让我们更加详细地了解这个模型。

3.1 基本概念

将 input data stream (输入数据流) 视为 “Input Table”(输入表)。每个在 stream 上到达的 data item (数据项)就像是一个被 appended 到 Input Table 的新的 row 。

  • 图1

对输入的查询将生成 “Result Table” (结果表)。每个 trigger interval (触发间隔)(例如,每 1 秒),新 row (行)将附加到 Input Table ,最终更新 Result Table 。无论何时更新 result table ,我们都希望将 changed result rows (更改的结果行)写入 external sink (外部接收器)。

  • 图2

“Output(输出)” 被定义为写入 external storage (外部存储器)的内容。可以以不同的模式定义 output :

  1. Complete Mode(完全模式) - 整个更新的 Result Table 将被写入外部存储。由 storage connector (存储连接器)决定如何处理整个表的写入。
  2. Append Mode(附加模式) - 只有 Result Table 中自上次触发后附加的新 rows(行) 将被写入 external storage (外部存储)。这仅适用于不期望更改 Result Table 中现有行的查询。
  3. Update Mode(更新模式) - 只有自上次触发后 Result Table 中更新的 rows (行)将被写入 external storage (外部存储)(从 Spark 2.1.1 之后可用)。请注意,这与 Complete Mode (完全模式),因为此模式仅输出自上次触发以来更改的 rows (行)。如果查询不包含 aggregations (聚合),它将等同于 Append mode 。

为了说明这个模型的使用,我们来了解一下上面章节的 快速示例 。第一个 lines DataFrame 是 input table ,并且最后的 wordCounts DataFrame 是 result table 。请注意,streaming lines DataFrame 上的查询生成 wordCounts 是 exactly the same(完全一样的) 因为它将是一个 static DataFrame (静态 DataFrame )。但是,当这个查询启动时, Spark 将从 socket 连接中持续检查新数据。如果有新数据,Spark 将运行一个 “incremental(增量)” 查询,它会结合以前的 running counts (运行计数)与新数据计算更新的 counts ,如下所示。

这种模式与许多其他 stream processing engines (流处理引擎)有着显著不同。许多 streaming systems (流系统)要求用户本身保持运行 aggregations (聚合),因此必须要考虑容错,和数据一致性(at-least-once(至少一次), at-most-once (最多一次),exactly-once (完全一次))。在这个模型中,当有新数据时, Spark 负责更新 Result Table ,从而减轻用户对它的考虑。举个例子,我们来看一下这个模型如何处理对于基于 event-time 的处理和 late arriving (迟到)的数据。

参考资料

  1. https://cloud.tencent.com/developer/article/1014928
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。