目录

Promethues-Metrics和Spark指标

概述

本文简述一下 Prometheus 的 Metrics。

Metrics

首先,什么是 Metrics?

Metrics(指标),可以理解为它是一个工具,用来记录系统正在发生的事情,比如说记录某某队列的瞬时大小,或者是某类请求的平均时长。指标系统在推荐系统甚至是 Spark 里都是作为一个基础设施存在的。

然后,为什么需要 Metrics?

在编写应用程序的时候,通常会记录日志以便事后分析,在很多情况下是产生了问题之后,再去查看日志,是一种事后的静态分析。在很多时候,我们可能需要了解整个系统在当前,或者某一时刻运行的情况,比如当前系统中对外提供了多少次服务,这些服务的响应时间是多少,随时间变化的情况是什么样的,系统出错的频率是多少。这些动态的准实时信息对于监控整个系统的运行健康状况来说很重要。

最后,怎么使用 Metrics?

Metrics 又是指一个流行的 Java 库,使用 Metrics 主要需要搞清楚三个概念,registries, source 和 reporter。registries 就是一个登记处,需要把指标收集的源头以及指标发送的目的地注册上才可以使用。

Metrics Core 模块提供5种基本的度量类型:Gauges, Counters, Histograms, Meters 和 Timers 有几个指标类型:

Metrics类型

Gauges

A gauge is an instantaneous measurement of a value.

Gauges 是最简单的度量类,只有一个简单的返回值,他用来记录一些对象或者事物的瞬时值。瞬时值可以忽大忽小,像仪表盘一样,比如 CPU 的使用率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class GaugeTest {
    public static Queue<String> q = new LinkedList<String>();
    public static void main(String[] args) throws InterruptedException {
        MetricRegistry registry = new MetricRegistry();
        ConsoleReporter reporter = ConsoleReporter.forRegistry(registry).build();
        reporter.start(1, TimeUnit.SECONDS);

        registry.register(MetricRegistry.name(GaugeTest.class, "queue", "size"), 
        new Gauge<Integer>() {
            public Integer getValue() {
                return q.size();
            }
        });
        while(true) {
            Thread.sleep(1000);
            q.add("Job-xxx");
        }
    }
}

Console 中打印的结果:

1
2
3
-- Gauges ------------------------------------------------
com.test.data.metrics.GaugeTest.queue.size
             value = 6

Counters

A counter is just a gauge for an AtomicLong instance.

Counter 也是一种计数器,Counter 只是用 Gauge 封装了 AtomicLong。同样可以增加或者减少。关于 Counter 和 Gauge 的区别,有一个 issue 讨论。 获得队列长度(此处的获取要比使用 Gauge 通过 size() 方法获取高效很多,后者 size() 方法的获取大多数是 O(n)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CounterTest {
public static Queue<String> q = new LinkedBlockingQueue<String>();
public static Counter pendingJobs;
public static Random random = new Random();
public static void addJob(String job) {
    pendingJobs.inc();
    q.offer(job);
}
public static String takeJob() {
    pendingJobs.dec();
    return q.poll();
}
public static void main(String[] args) throws InterruptedException {
    MetricRegistry registry = new MetricRegistry();
    ConsoleReporter reporter = ConsoleReporter.forRegistry(registry).build();
    reporter.start(1, TimeUnit.SECONDS);
    pendingJobs = registry.counter(MetricRegistry.name(Queue.class,"pending-jobs","size"));
    int num = 1;
    while(true){
        Thread.sleep(200);
        if (random.nextDouble() > 0.7){
            String job = takeJob();
            System.out.println("take job : "+job);
        }else{
            String job = "Job-"+num;
            addJob(job);
            System.out.println("add job : "+job);
        }
        num++;
    }
}
}

Console 打印的结果:

1
2
3
4
5
6
7
8
9
add job : Job-15
add job : Job-16
take job : Job-8
take job : Job-10
add job : Job-19
18-7-26 16:11:31 ============================================
-- Counters ----------------------------------------------
java.util.Queue.pending-jobs.size
             count = 5

Histograms

A histogram measures the statistical distribution of values in a stream of data. In addition to minimum, maximum, mean, etc., it also measures median, 75th, 90th, 95th, 98th, 99th, and 99.9th percentiles.

Histograms 用来测量数据的分布情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class HistogramTest {
    public static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {
        MetricRegistry registry = new MetricRegistry();
        ConsoleReporter reporter = ConsoleReporter.forRegistry(registry).build();
        reporter.start(1, TimeUnit.SECONDS);
        Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
        registry.register(MetricRegistry.name(HistogramTest.class, "request", "histogram"), histogram);
        
        while(true){
            Thread.sleep(1000);
            histogram.update(random.nextInt(100000));
        }
    }
}

Console 打印的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
-- Histograms --------------------------------------------
java.util.Queue.queue.histogram
             count = 56
               min = 1122
               max = 99650
              mean = 48735.12
            stddev = 28609.02
            median = 49493.00
              75% <= 72323.00
              95% <= 90773.00
              98% <= 94011.00
              99% <= 99650.00
            99.9% <= 99650.00

Meters

A meter measures the rate of events over time (e.g., “requests per second”). In addition to the mean rate, meters also track 1-, 5-, and 15-minute moving averages.

meter 是用来测量一段时间的时间发生的频次,为了得到频率指标,meters 默认会收集1/5/15分钟的平均值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MeterTest {
    public static Random random = new Random();
    public static void request(Meter meter){
        System.out.println("request");
        meter.mark();
    }
    public static void request(Meter meter, int n){
        while(n > 0){
            request(meter);
            n--;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MetricRegistry registry = new MetricRegistry();
        ConsoleReporter reporter = ConsoleReporter.forRegistry(registry).build();
        reporter.start(1, TimeUnit.SECONDS);
        Meter meterTps = registry.meter(MetricRegistry.name(MeterTest.class,"request","tps"));
        while(true){
            request(meterTps,random.nextInt(5));
            Thread.sleep(1000);
        }
    }
}

Console 打印的结果:

1
2
3
4
5
6
7
8
9
request
18-7-26 16:23:25 ============================================
-- Meters ------------------------------------------------
com.test.data.metrics.MeterTest.request.tps
             count = 134
         mean rate = 2.13 events/second
     1-minute rate = 2.52 events/second
     5-minute rate = 3.16 events/second
    15-minute rate = 3.32 events/second

Timers

A timer measures both the rate that a particular piece of code is called and the distribution of its duration.

计算一段 code 的运行时间以及其分布(有类似于 Histograms 的功能)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class TimerTest {
    public static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {
        MetricRegistry registry = new MetricRegistry();
        ConsoleReporter reporter = ConsoleReporter.forRegistry(registry).build();
        reporter.start(1, TimeUnit.SECONDS);
        Timer timer = registry.timer(MetricRegistry.name(TimerTest.class,"get-latency"));
        Timer.Context ctx;
        while(true){
            ctx = timer.time();
            Thread.sleep(random.nextInt(1000));
            ctx.stop();
        }
    }
}

Console 中打印的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
-- Timers ------------------------------------------------
com.test.data.metrics.TimerTest.get-latency
             count = 38
         mean rate = 1.90 calls/second
     1-minute rate = 1.66 calls/second
     5-minute rate = 1.61 calls/second
    15-minute rate = 1.60 calls/second
               min = 13.90 milliseconds
               max = 988.71 milliseconds
              mean = 519.21 milliseconds
            stddev = 286.23 milliseconds
            median = 553.84 milliseconds
              75% <= 763.64 milliseconds
              95% <= 943.27 milliseconds
              98% <= 988.71 milliseconds
              99% <= 988.71 milliseconds
            99.9% <= 988.71 milliseconds

Metrics Reporter

Reporters are the way that your application exports all the measurements being made by its metrics. metrics-core comes with four ways of exporting your metrics: JMX, console, SLF4J, and CSV.

Metrics Core 提供了四种默认的 Reporter,用于把收集到的指标发送到这些 Reporter 中。此处以 Console 为例。

1
2
3
4
5
final ConsoleReporter reporter = ConsoleReporter.forRegistry(registry)
                                                .convertRatesTo(TimeUnit.SECONDS)
                                                .convertDurationsTo(TimeUnit.MILLISECONDS)
                                                .build();
reporter.start(1, TimeUnit.MINUTES);

结合Grafana

什么是 Grafana?

The leading open source software for time series analytics.

Grafana支持多种不同的时序数据库数据源,Grafana对每种数据源提供不同的查询方法,而且能很好的支持每种数据源的特性。

为什么需要 Grafana?

通过 Metrics 收集到各种指标之后,需要做更好的可视化,这样才能更好的观察指标。

以推荐系统为例,推荐系统的指标收集是先把收集到的指标记录到日志里,然后再通过 ES 索引指标相关的日志,最后通过 Grafana 来进行展示。ES 在这里相当于充当了时序数据源,基本的操作可以参考官方指南

Spark的指标体系

Spark 的指标体系非常详细,这也是为什么我们可以在 Spark UI 上看到很多 Job, Task 运行时候的很多信息。Spark 的指标组件,主要是开源项目 metrics 的高层封装。通过注册不同的源和目标,来实现指标的收集和可视化。

以 Spark 2.3.0 为例,指标系统 metricscore 内。可以看看 metrics 包的组成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
├── MetricsConfig.scala
├── MetricsSystem.scala
├── sink
│   ├── ConsoleSink.scala
│   ├── CsvSink.scala
│   ├── GraphiteSink.scala
│   ├── JmxSink.scala
│   ├── MetricsServlet.scala
│   ├── Sink.scala
│   ├── Slf4jSink.scala
│   └── package.scala
└── source
    ├── JvmSource.scala
    ├── Source.scala
    ├── StaticSources.scala
    └── package.scala

Spark 中指标系统分为 Master 和 Worker 两种类型,很容易理解,就是为了不同角色来制定监控的具体指标。指标系统会在 SparkEnv 中创建。通过 createMetircsSystem 可以实例化一个指标系统的全局对象。而这个方法实际上代理了 MetricsConfig 中的 initialize 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// /core/src/main/scala/org/apache/spark/SparkEnv.scala

val metricsSystem = if (isDriver) {
  MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
  conf.set("spark.executor.id", executorId)
  val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
  ms.start()
  ms
}

// /core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

def createMetricsSystem(
  instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}

初始化方法主要是加载属性配置,并且对属性进行初始化转换。从源码注释可以看到,初始化方法除了提供默认的属性之外,还会拉取在其他配置文件中以 spark.metrics.conf 开头的属性组。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// /core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

// Add default properties in case there's no properties file

setDefaultProperties(properties)

loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

// Also look for the properties in provided Spark configuration
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
  case (k, v) if k.startsWith(prefix) =>
    properties.setProperty(k.substring(prefix.length()), v)
  case _ =>
}

sink 包中,主要放着指标发送目标地的一些配置。可以发到像 csv, console, jmx, slf4j 这样的地方来存放指标,后面会有一些可视化的工具来协助。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。