目录

Delta-Lake系列-事务日志的读写

概述

本文记录了搞清楚 Delta Lake 的读写流程的笔记,主要希望帮助老铁们高清粗 Delta Lake 是如何实现 ACID 事务的这些特性的。

事务日志的实现

探索

首先,通读 README,里面有提及,Delta Lake 对外的 API。

The only stable, public APIs currently provided by Delta Lake are through the DataFrameReader/Writer (i.e. spark.read, df.write, spark.readStream and df.writeStream).

对外的 API,只有通过形如 spark.read.format("delta") 之类的形式来使用,其余接口目前皆为内部接口。

关于 Spark 如何通过 format 或者 option 来使用诸如 Parquet 和 Delta Lake 之类的接口,建议可以看看 DataFrameWriter 部分的代码,理清楚里面的代码和调用逻辑,你就基本清楚了,这里不是本文的重点,所以就此略过。

然后我们看看测试用例中,看看是否可以简单找到读写的调用逻辑。

显然 org/apache/spark/sql/delta/LogStoreSuite.scala 有我们需要的东西。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
test("read / write") {
  val tempDir = Utils.createTempDir()
  val store = createLogStore(spark)

  val deltas = Seq(0, 1).map(i => new File(tempDir, i.toString)).map(_.getCanonicalPath)
  store.write(deltas(0), Iterator("zero", "none"))
  store.write(deltas(1), Iterator("one"))

  assert(store.read(deltas(0)) == Seq("zero", "none"))
  assert(store.read(deltas(1)) == Seq("one"))
}

看到了吧,手下创建一个 LogStore 对象,然后调用里面的 write 方法,正是 Delta Lake 写事务日志 Transaction log 的入口。

多读呢,一般不是问题,乐观锁主要针对的是,一写多读的场景。多个写需要阻塞等待,这个是乐观锁的要点。读呢,需要保持可以读到最新的数据,而不阻塞。

Delta Lake 中乐观写,或者叫事务写,主要是基于基于接口来实现的。

org.apache.spark.sql.delta.files.TransactionalWrite

那么重点是在写文件的方法里,也就是 writeFiles

看看 Delta Lake 支持的操作类型

org.apache.spark.sql.delta.DeltaOperations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  /** Recorded during batch inserts. Predicates can be provided for overwrites. */
  case class Write(
 
   /** Recorded during streaming inserts. */
  case class StreamingUpdate(
  
    /** Recorded while deleting certain partitions. */
  case class Delete(predicate: Seq[String]) extends Operation("DELETE") {
  
    /** Recorded when truncating the table. */
  case class Truncate() extends Operation("TRUNCATE") {
  
    /** Recorded when fscking the table. */
  case class Fsck(numRemovedFiles: Long) extends Operation("FSCK") {
  
    /** Recorded when converting a table into a Delta table. */
  case class Convert(
  
    /** Recorded when optimizing the table. */
  case class Optimize(
  
    /** Recorded when a merge operation is committed to the table. */
  case class Merge(
  
    /** Recorded when an update operation is committed to the table. */
  case class Update(predicate: Option[String]) extends Operation("UPDATE") {
 
   /** Recorded when the table is created. */
  case class CreateTable(metadata: Metadata, isManaged: Boolean, asSelect: Boolean = false)
  
    /** Recorded when the table properties are set. */
  case class SetTableProperties(
  
    /** Recorded when the table properties are unset. */
  case class UnsetTableProperties(
  
    /** Recorded when columns are added. */
  case class AddColumns(
  
    /** Recorded when columns are changed. */
  case class ChangeColumn(
  
    /** Recorded when columns are replaced. */
  case class ReplaceColumns(
  
    case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") {
    
      object ManualUpdate extends Operation("Manual Update") {
  
  object FileNotificationRetention extends Operation("FILE NOTIFICATION RETENTION") {
  
    /** Recorded when recomputing stats on the table. */
  case class ComputeStats(predicate: Seq[String]) extends Operation("COMPUTE STATS") {
  
    /** Recorded when manually re-/un-/setting ZCube Information for existing files. */
  case class ResetZCubeInfo(predicate: Seq[String], zOrderBy: Seq[String])
  
    case class UpdateSchema(oldSchema: StructType, newSchema: StructType)
      extends Operation("UPDATE SCHEMA") {

Write 是作为 Operation 存在的,需要给乐观事务对象 OptimisticTransaction 提交,实际上,就是其 commit 操作。

commit 之前会有 preCommit 的动作,其主要是检查元数据 metaData 有没有改变。

最后还有 postCommit,主要做 checkpoint

commit 这里有一个重入锁。

TODO

Delta 本质上解决了大数据分析场景中常见的数据更新的问题。

从开源大数据这个生态圈子来看,Hive 早已经做掉了 ACID 事务支持;Uber 发起了hudi,Netflix 搞 Iceberg,双双进入 Apache 孵化器,解决类似问题。

之前的数据湖只支持批量插入,写入失败容易产生脏数据,导致查询分析失败;现在支持事务更新,允许一边读一边更新,甚至多个 writer 同时操作,ACID 的那种,保证一致性地读取和分析查询。

从 delta 的字面意思来看,其核心关注点在于数据更新操作和由此产生的增量数据写入。

不管是批处理还是流处理,背后的大数据存储考虑的都是一写多读,讲究吞吐量,写主要是追加写,写新的数据分区,或者在往已有的分区里面追加数据,总而言之不支持随机修改。

这里面有些重要问题,一个是这个过程会产生大量的 delta 小文件,需要及时做合并;处理场景可能本身还要求事务支持和多版本,满足回溯或回滚;读的话不能只读原来那些 Parquet/ORC 文件了,还得考虑吸收这些 delta 文件。

Delta 这类技术总结一下就是,按列式格式写 base 数据加快分析读,增量更新数据 delta 则采取行式写入支持事务和多版本,然后系统通过后台不断地进行 delta 合并。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。