目录

Apache-Griffin系列-简介

概述

Apache Griffin 是一个应用于分布式数据系统中的开源数据质量解决方案。在 Hadoop, Spark, Storm 等分布式系统中,提供了一整套统一的流程来定义和检测数据集的质量并及时报告问题。

Profiling Use Case

假设我们有一张表(demo_src),根据 hour 分区,我们想知道每个小时,数据都是长什么样的。

表结构如下:

1
2
3
4
5
id      bigint
age     int
desc    string
dt      string
hour    string

dt 和 hour 都是分区字段,所以这个表每天都有一个 dt 分区和24个 hour 分区。

Data Preparation

创建一个 Hive 表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE `demo_src`(
  `id` bigint,
  `age` int,
  `desc` string) 
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_src';

数据文件格式如下:

1
2
3
4
1|18|student
2|23|engineer
3|42|cook
...

官方文档很体贴地提供了一个脚本来造假数据。然后就可以将数据 Load 进去 Hive 表里了。

1
LOAD DATA LOCAL INPATH 'demo_src' INTO TABLE demo_src PARTITION (dt='20180912',hour='09');

Define data quality measure

首先是环境配置文件,对应程序中的 envDq。一般就是定义 Spark 任务的输入输出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "spark": {
    "log.level": "WARN"
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://es:9200/griffin/accuracy"
      }
    }
  ]
}

然后定义数据质量,对应 dqConfig,要去大概了解 json 参数的意义。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
  "name": "batch_prof",
  "process.type": "batch",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "hive",
          "version": "1.2",
          "config": {
            "database": "default",
            "table.name": "demo_tgt"
          }
        }
      ]
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "profiling",
        "out.dataframe.name": "prof",
        "rule": "src.id.count() AS id_count, src.age.max() AS age_max, src.desc.length().max() AS desc_length_max",
        "out": [
          {
            "type": "metric",
            "name": "prof"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}

Measure data quality

提交检测任务到 Spark,配置文件作为参数也需要提交。

1
2
3
4
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 2 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json

Report data quality metrics

然后就可以在 console 里看到计算的日志,等待 job 结束,可以得到 metrics 的统计,并且会保存到 HDFS 上。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。