概述
本文记录了搞清楚 Delta Lake 的读写流程的笔记,主要希望帮助老铁们高清粗 Delta Lake 是如何实现 ACID 事务的这些特性的。
事务日志的实现
探索
首先,通读 README,里面有提及,Delta Lake 对外的 API。
The only stable, public APIs currently provided by Delta Lake are through the DataFrameReader
/Writer
(i.e. spark.read
, df.write
, spark.readStream
and df.writeStream
).
对外的 API,只有通过形如 spark.read.format("delta")
之类的形式来使用,其余接口目前皆为内部接口。
关于 Spark 如何通过 format 或者 option 来使用诸如 Parquet 和 Delta Lake 之类的接口,建议可以看看 DataFrameWriter 部分的代码,理清楚里面的代码和调用逻辑,你就基本清楚了,这里不是本文的重点,所以就此略过。
然后我们看看测试用例中,看看是否可以简单找到读写的调用逻辑。
显然 org/apache/spark/sql/delta/LogStoreSuite.scala
有我们需要的东西。
1
2
3
4
5
6
7
8
9
10
11
|
test("read / write") {
val tempDir = Utils.createTempDir()
val store = createLogStore(spark)
val deltas = Seq(0, 1).map(i => new File(tempDir, i.toString)).map(_.getCanonicalPath)
store.write(deltas(0), Iterator("zero", "none"))
store.write(deltas(1), Iterator("one"))
assert(store.read(deltas(0)) == Seq("zero", "none"))
assert(store.read(deltas(1)) == Seq("one"))
}
|
看到了吧,手下创建一个 LogStore 对象,然后调用里面的 write 方法,正是 Delta Lake 写事务日志 Transaction log 的入口。
多读呢,一般不是问题,乐观锁主要针对的是,一写多读的场景。多个写需要阻塞等待,这个是乐观锁的要点。读呢,需要保持可以读到最新的数据,而不阻塞。
Delta Lake 中乐观写,或者叫事务写,主要是基于基于接口来实现的。
org.apache.spark.sql.delta.files.TransactionalWrite
那么重点是在写文件的方法里,也就是 writeFiles
。
看看 Delta Lake 支持的操作类型
org.apache.spark.sql.delta.DeltaOperations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
/** Recorded during batch inserts. Predicates can be provided for overwrites. */
case class Write(
/** Recorded during streaming inserts. */
case class StreamingUpdate(
/** Recorded while deleting certain partitions. */
case class Delete(predicate: Seq[String]) extends Operation("DELETE") {
/** Recorded when truncating the table. */
case class Truncate() extends Operation("TRUNCATE") {
/** Recorded when fscking the table. */
case class Fsck(numRemovedFiles: Long) extends Operation("FSCK") {
/** Recorded when converting a table into a Delta table. */
case class Convert(
/** Recorded when optimizing the table. */
case class Optimize(
/** Recorded when a merge operation is committed to the table. */
case class Merge(
/** Recorded when an update operation is committed to the table. */
case class Update(predicate: Option[String]) extends Operation("UPDATE") {
/** Recorded when the table is created. */
case class CreateTable(metadata: Metadata, isManaged: Boolean, asSelect: Boolean = false)
/** Recorded when the table properties are set. */
case class SetTableProperties(
/** Recorded when the table properties are unset. */
case class UnsetTableProperties(
/** Recorded when columns are added. */
case class AddColumns(
/** Recorded when columns are changed. */
case class ChangeColumn(
/** Recorded when columns are replaced. */
case class ReplaceColumns(
case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") {
object ManualUpdate extends Operation("Manual Update") {
object FileNotificationRetention extends Operation("FILE NOTIFICATION RETENTION") {
/** Recorded when recomputing stats on the table. */
case class ComputeStats(predicate: Seq[String]) extends Operation("COMPUTE STATS") {
/** Recorded when manually re-/un-/setting ZCube Information for existing files. */
case class ResetZCubeInfo(predicate: Seq[String], zOrderBy: Seq[String])
case class UpdateSchema(oldSchema: StructType, newSchema: StructType)
extends Operation("UPDATE SCHEMA") {
|
Write 是作为 Operation 存在的,需要给乐观事务对象 OptimisticTransaction 提交,实际上,就是其 commit 操作。
commit 之前会有 preCommit 的动作,其主要是检查元数据 metaData 有没有改变。
最后还有 postCommit,主要做 checkpoint
commit 这里有一个重入锁。
TODO
Delta 本质上解决了大数据分析场景中常见的数据更新的问题。
从开源大数据这个生态圈子来看,Hive 早已经做掉了 ACID 事务支持;Uber 发起了hudi,Netflix 搞 Iceberg,双双进入 Apache 孵化器,解决类似问题。
之前的数据湖只支持批量插入,写入失败容易产生脏数据,导致查询分析失败;现在支持事务更新,允许一边读一边更新,甚至多个 writer 同时操作,ACID 的那种,保证一致性地读取和分析查询。
从 delta 的字面意思来看,其核心关注点在于数据更新操作和由此产生的增量数据写入。
不管是批处理还是流处理,背后的大数据存储考虑的都是一写多读,讲究吞吐量,写主要是追加写,写新的数据分区,或者在往已有的分区里面追加数据,总而言之不支持随机修改。
这里面有些重要问题,一个是这个过程会产生大量的 delta 小文件,需要及时做合并;处理场景可能本身还要求事务支持和多版本,满足回溯或回滚;读的话不能只读原来那些 Parquet/ORC 文件了,还得考虑吸收这些 delta 文件。
Delta 这类技术总结一下就是,按列式格式写 base 数据加快分析读,增量更新数据 delta 则采取行式写入支持事务和多版本,然后系统通过后台不断地进行 delta 合并。
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。