目录

Spark的度量系统是如何实现的

概述

最近接手一个项目,惊讶的发现一个运行了一年多的系统,连日志打的都如此不规范,更不用谈什么监控等体系了,可想而知如果线上出现问题的时候,排查问题会多难。

Spark指标体系

Spark 的指标体系非常详细,这也是为什么我们可以在 Spark UI 上看到很多 Job, Task 运行时候的很多信息。Spark 的指标组件,主要是开源项目 metrics 的高层封装。通过注册不同的源和目标,来实现指标的收集和可视化。

以 Spark 2.3.0 为例,指标系统 metricscore 内。可以看看 metrics 包的组成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
├── MetricsConfig.scala
├── MetricsSystem.scala
├── sink
│   ├── ConsoleSink.scala
│   ├── CsvSink.scala
│   ├── GraphiteSink.scala
│   ├── JmxSink.scala
│   ├── MetricsServlet.scala
│   ├── Sink.scala
│   ├── Slf4jSink.scala
│   └── package.scala
└── source
    ├── JvmSource.scala
    ├── Source.scala
    ├── StaticSources.scala
    └── package.scala

Spark 中指标系统分为 Master 和 Worker 两种类型,很容易理解,就是为了不同角色来制定监控的具体指标。指标系统会在 SparkEnv 中创建。通过 createMetircsSystem 可以实例化一个指标系统的全局对象。而这个方法实际上代理了 MetricsConfig 中的 initialize 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// /core/src/main/scala/org/apache/spark/SparkEnv.scala

val metricsSystem = if (isDriver) {
  MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
  conf.set("spark.executor.id", executorId)
  val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
  ms.start()
  ms
}

// /core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

def createMetricsSystem(
  instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}

初始化方法主要是加载属性配置,并且对属性进行初始化转换。从源码注释可以看到,初始化方法除了提供默认的属性之外,还会拉取在其他配置文件中以 spark.metrics.conf 开头的属性组。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// /core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

// Add default properties in case there's no properties file

setDefaultProperties(properties)

loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

// Also look for the properties in provided Spark configuration
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
  case (k, v) if k.startsWith(prefix) =>
    properties.setProperty(k.substring(prefix.length()), v)
  case _ =>
}

sink 包中,主要放着指标发送目标地的一些配置。可以发到像 csv, console, jmx, slf4j 这样的地方来存放指标,后面会有一些可视化的工具来协助。

参考资料

  1. Spark内核设计的艺术
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。