目录

Delta-Lake系列-QuickStart

概述

国际惯例,学习一个新的框架,应该先找找官方文档有没有 QucikStart 之类的文档,尤其像砖厂这种公司,文档应该不会少的。大家可以打开 Delta Lake 官网查看 QuickStart,按照文档迅速过一次。

QuickStart走读

Set up Apache Spark with Delta Lake

因为要方便跑 demo,我这里选择用 spark-shell 来交互式探索一下 Delta Lake 的功能。

按照文档介绍,Delta Lake 是需要 Spark 2.4.2 或以上版本的,所以大家最好去官网下载一个预先编译的 Spark 包。

/delta-lake%E7%B3%BB%E5%88%97-quickstart/img.png

按照上图,输入命令 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0 就可以启动加载了 Delta Lake 的 spark shell 了。

关于 --packages 的用法,是因为 Spark 有个专门解析选项参数的工具叫做 SparkSubmitOptionParser,他可以解析到依赖,并且先在本地仓库找,没有的话就会根据你的 Maven 配置到远程拉取,这里 Spark 内部做了一些事情。

Create a table

创建一个 Delta 类型的表方法很简单,如下。

1
2
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

然后我们到目录下看看。

1
2
3
4
5
6
7
8
# tree
.
├── _delta_log
│    └── 00000000000000000000.json
├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet
├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet
├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet
└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet

从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。

1
2
3
4
5
6
7
{"commitInfo":{"timestamp":1556253526941,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"3ae73d1f-4d33-4378-8f98-ba94b8204de9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556253525810}}
{"add":{"path":"part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556253526000,"dataChange":true}}

Update the table data

为了方便展示,下图可以看到,用于 update 的数据是不一样的,见红色方框。

/delta-lake%E7%B3%BB%E5%88%97-quickstart/image_1d9e47ao41j7trca1dh1thgc4u9.png

此时,表存储路径下的文件有了一些变化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# tree
.
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet
├── part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet
├── part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet
├── part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet
├── part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet
├── part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet
├── part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet
└── part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet

再看一下 transaction log 的变化。涉及到了

1
2
3
4
5
6
7
8
9
{"commitInfo":{"timestamp":1556326035295,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
{"remove":{"path":"part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}

这里特别提醒一下,可以看到事务日志有两个操作,一个是 add,一个是 remove。特别留意一下时间戳,update 文件都是先 add 再 remove 的,所以 add 的时间1556326033000是更早于 remove 的时间1556326035294。

1
2
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}

Read data

默认读取的,都是最新的文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
scala> val df = spark.read.format("delta").load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show()
+---+
| id|
+---+
|  8|
|  9|
|  5|
|  7|
|  6|
+---+

Read older versions of data using Time Travel

Delta 为读取旧版本的数据,定义了个名字,叫做 Time travel,后面源码分析的时候会经常提及吧。Delta 提供了一个 option 来描述。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
scala> val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show()
+---+
| id|
+---+
|  3|
|  4|
|  0|
|  2|
|  1|
+---+

Write a stream of data to a table

通过 Structure Streaming 也可以写流式的表,可以提供 Exactly-one。流式表是采用 append 方式来追加数据到原表里的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ll _delta_log 
total 160
-rw-r--r--  1 runzhliu  wheel   1.1K  4 27 08:47 00000000000000000000.json
-rw-r--r--  1 runzhliu  wheel   1.4K  4 27 08:47 00000000000000000001.json
-rw-r--r--  1 runzhliu  wheel   466B  4 27 09:21 00000000000000000002.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000003.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000004.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000005.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000006.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000007.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000008.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000009.json
-rw-r--r--  1 runzhliu  wheel    14K  4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000010.json
-rw-r--r--  1 runzhliu  wheel   977B  4 27 09:22 00000000000000000011.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000012.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000013.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000014.json
-rw-r--r--  1 runzhliu  wheel    25B  4 27 09:22 _last_checkpoint
# ll _delta_log 
total 168
-rw-r--r--  1 runzhliu  wheel   1.1K  4 27 08:47 00000000000000000000.json
-rw-r--r--  1 runzhliu  wheel   1.4K  4 27 08:47 00000000000000000001.json
-rw-r--r--  1 runzhliu  wheel   466B  4 27 09:21 00000000000000000002.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000003.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000004.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000005.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000006.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000007.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000008.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000009.json
-rw-r--r--  1 runzhliu  wheel    14K  4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000010.json
-rw-r--r--  1 runzhliu  wheel   977B  4 27 09:22 00000000000000000011.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000012.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000013.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000014.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000015.json
-rw-r--r--  1 runzhliu  wheel    25B  4 27 09:22 _last_checkpoint

可以看到流式表的事务日志是不断的在增加的。需要注意的是,流表在写的时候,是不影响读的,后面源码分析的时候,也会针对这个进行探索。

Read a stream of changes from a table

流式表可以边写边读,这里就不贴接结果了。

总结

以上就是 Delta Lake 官网的 Quick Start 的内容,建议大家可以按照以上内容来快速测试一下。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。