目录

XGBoost-Operator源码分析

概述

分布式的 XGBoost 可以用 Spark 来跑,当然也支持用其他分布式的方法去跑,比如用 XGBoost Operator,可以很轻松的实现 XGBoost 算法的分布式执行。

代码实现

目前在 Kubeflow 的框架下去开发一个机器学习相关的 Operator 已经比较容易了,首先 kubebuilder 打造好 Operator 的框架,然后通过 Kubeflow 社区抽象的 common 包,在新的 Operator 下调整业务逻辑还是比较简单的。XGBoost Operator 也是在这样的背景下诞生的,所以可以看到其源码是相对 tf-operator 这些 Kubeflow 早起的项目,代码会更加简练清晰一点。

重点分析 XGBoost Operator 的 Reconcile 协调方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (r *ReconcileXGBoostJob) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	// Fetch the XGBoostJob instance
	xgboostjob := &v1alpha1.XGBoostJob{}
	err := r.Get(context.Background(), request.NamespacedName, xgboostjob)
	if err != nil {
		if errors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	// Check reconcile is required.
	needSync := r.satisfiedExpectations(xgboostjob)

	if !needSync || xgboostjob.DeletionTimestamp != nil {
		log.Info("reconcile cancelled, job does not need to do reconcile or has been deleted",
			"sync", needSync, "deleted", xgboostjob.DeletionTimestamp != nil)
		return reconcile.Result{}, nil
	}
	// Set default priorities for xgboost job
	scheme.Scheme.Default(xgboostjob)

	// Use common to reconcile the job related pod and service
	err = r.xgbJobController.ReconcileJobs(xgboostjob, xgboostjob.Spec.XGBReplicaSpecs, xgboostjob.Status.JobStatus, &xgboostjob.Spec.RunPolicy)

	if err != nil {
		logrus.Warnf("Reconcile XGBoost Job error %v", err)
		return reconcile.Result{}, err
	}

	return reconcile.Result{}, err
}

实际上,自定义资源对象 XGBoostJob 由 XGBoost Operator 的 Reconcile 方法来协调就可以了,因为这个方法的背后,是 Kubeflow 的 common 包,会统一再做 Pod/Service 的协调的,所以开发者只要专注自定义资源的协调就够了。

就这?对的,就是挺简单的。

测试

下面运行一个 XGBoost Operator 提供的 Demo

按照官方文档,build 镜像。

1
docker build -f Dockerfile -t kubeflow/xgboost-dist-rabit-test:1.2 ./

镜像里主要运行的代码是 xgboost_smoke_test.py

Master 正常运行的日志。

/xgboost-operator%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/image_1e7pu3uf14pd12va7uiflq1m5c9.png

Worker 正常运行的日志。

/xgboost-operator%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/image_1e7pu4no398r16111d6hd81lvem.png

这个 smoke test 仅仅是建立一个 rabit 拓扑并进行通信的简单例子,运行成功说明 XGBoost Operator 的部署也是成功的,因为 worker 之间以及与 master 通过 pod ip 是可以建立 tcp 连接的。

总结

目前在 Kubeflow Common 包的框架下开发一个分布式的机器学习 Operator 还是比较方便的。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。