概述
国际惯例,学习一个新的框架,应该先找找官方文档有没有 QucikStart 之类的文档,尤其像砖厂这种公司,文档应该不会少的。大家可以打开 Delta Lake 官网查看 QuickStart,按照文档迅速过一次。
QuickStart走读
Set up Apache Spark with Delta Lake
因为要方便跑 demo,我这里选择用 spark-shell 来交互式探索一下 Delta Lake 的功能。
按照文档介绍,Delta Lake 是需要 Spark 2.4.2 或以上版本的,所以大家最好去官网下载一个预先编译的 Spark 包。
按照上图,输入命令 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0
就可以启动加载了 Delta Lake 的 spark shell 了。
关于 --packages
的用法,是因为 Spark 有个专门解析选项参数的工具叫做 SparkSubmitOptionParser
,他可以解析到依赖,并且先在本地仓库找,没有的话就会根据你的 Maven 配置到远程拉取,这里 Spark 内部做了一些事情。
Create a table
创建一个 Delta 类型的表方法很简单,如下。
1
2
|
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
|
然后我们到目录下看看。
1
2
3
4
5
6
7
8
|
# tree
.
├── _delta_log
│ └── 00000000000000000000.json
├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet
├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet
├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet
└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet
|
从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。
1
2
3
4
5
6
7
|
{"commitInfo":{"timestamp":1556253526941,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"3ae73d1f-4d33-4378-8f98-ba94b8204de9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556253525810}}
{"add":{"path":"part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556253526000,"dataChange":true}}
|
Update the table data
为了方便展示,下图可以看到,用于 update 的数据是不一样的,见红色方框。
此时,表存储路径下的文件有了一些变化。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# tree
.
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
├── part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet
├── part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet
├── part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet
├── part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet
├── part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet
├── part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet
├── part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet
└── part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet
|
再看一下 transaction log 的变化。涉及到了
1
2
3
4
5
6
7
8
9
|
{"commitInfo":{"timestamp":1556326035295,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
{"remove":{"path":"part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
|
这里特别提醒一下,可以看到事务日志有两个操作,一个是 add,一个是 remove。特别留意一下时间戳,update 文件都是先 add 再 remove 的,所以 add 的时间1556326033000是更早于 remove 的时间1556326035294。
1
2
|
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
|
Read data
默认读取的,都是最新的文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
scala> val df = spark.read.format("delta").load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show()
+---+
| id|
+---+
| 8|
| 9|
| 5|
| 7|
| 6|
+---+
|
Read older versions of data using Time Travel
Delta 为读取旧版本的数据,定义了个名字,叫做 Time travel,后面源码分析的时候会经常提及吧。Delta 提供了一个 option 来描述。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
scala> val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show()
+---+
| id|
+---+
| 3|
| 4|
| 0|
| 2|
| 1|
+---+
|
Write a stream of data to a table
通过 Structure Streaming 也可以写流式的表,可以提供 Exactly-one。流式表是采用 append 方式来追加数据到原表里的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# ll _delta_log
total 160
-rw-r--r-- 1 runzhliu wheel 1.1K 4 27 08:47 00000000000000000000.json
-rw-r--r-- 1 runzhliu wheel 1.4K 4 27 08:47 00000000000000000001.json
-rw-r--r-- 1 runzhliu wheel 466B 4 27 09:21 00000000000000000002.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000003.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000004.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000005.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000006.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000007.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000008.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000009.json
-rw-r--r-- 1 runzhliu wheel 14K 4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000010.json
-rw-r--r-- 1 runzhliu wheel 977B 4 27 09:22 00000000000000000011.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000012.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000013.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000014.json
-rw-r--r-- 1 runzhliu wheel 25B 4 27 09:22 _last_checkpoint
# ll _delta_log
total 168
-rw-r--r-- 1 runzhliu wheel 1.1K 4 27 08:47 00000000000000000000.json
-rw-r--r-- 1 runzhliu wheel 1.4K 4 27 08:47 00000000000000000001.json
-rw-r--r-- 1 runzhliu wheel 466B 4 27 09:21 00000000000000000002.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000003.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000004.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000005.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000006.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000007.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000008.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000009.json
-rw-r--r-- 1 runzhliu wheel 14K 4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000010.json
-rw-r--r-- 1 runzhliu wheel 977B 4 27 09:22 00000000000000000011.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000012.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000013.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000014.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000015.json
-rw-r--r-- 1 runzhliu wheel 25B 4 27 09:22 _last_checkpoint
|
可以看到流式表的事务日志是不断的在增加的。需要注意的是,流表在写的时候,是不影响读的,后面源码分析的时候,也会针对这个进行探索。
Read a stream of changes from a table
流式表可以边写边读,这里就不贴接结果了。
总结
以上就是 Delta Lake 官网的 Quick Start 的内容,建议大家可以按照以上内容来快速测试一下。
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。