diff --git a/internal/pkg/appconfig/types.go b/internal/pkg/appconfig/types.go index 6d36984..707910c 100644 --- a/internal/pkg/appconfig/types.go +++ b/internal/pkg/appconfig/types.go @@ -54,6 +54,9 @@ type Config struct { EnableDCGMLog bool DCGMLogLevel string PodResourcesKubeletSocket string + ContainerdSocket string + DockerdSocket string + PodLabelsAllowList []string HPCJobMappingDir string NvidiaResourceNames []string } diff --git a/internal/pkg/transformation/kubernetes.go b/internal/pkg/transformation/kubernetes.go index 5d02fec..fcc215c 100644 --- a/internal/pkg/transformation/kubernetes.go +++ b/internal/pkg/transformation/kubernetes.go @@ -1,24 +1,11 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package transformation import ( "context" "fmt" + "github.com/docker/docker/api/types/container" + _ "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" "log/slog" "net" "regexp" @@ -35,7 +22,8 @@ import ( "github.com/NVIDIA/dcgm-exporter/internal/pkg/appconfig" "github.com/NVIDIA/dcgm-exporter/internal/pkg/collector" "github.com/NVIDIA/dcgm-exporter/internal/pkg/deviceinfo" - "github.com/NVIDIA/dcgm-exporter/internal/pkg/nvmlprovider" + "github.com/containerd/containerd" + "github.com/containerd/containerd/namespaces" ) var ( @@ -59,6 +47,9 @@ func (p *PodMapper) Name() string { } func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo deviceinfo.Provider) error { + // 获取 Kubelet socket 路径 + podLabelsAllowList := p.Config.PodLabelsAllowList + slog.Debug(fmt.Sprintf("Pod labels allow list as: %+v", podLabelsAllowList)) socketPath := p.Config.PodResourcesKubeletSocket _, err := os.Stat(socketPath) if os.IsNotExist(err) { @@ -66,13 +57,14 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic return nil } - // TODO: This needs to be moved out of the critical path. + // 连接到 Kubelet c, cleanup, err := connectToServer(socketPath) if err != nil { return err } defer cleanup() + // 列出 Pods pods, err := p.listPods(c) if err != nil { return err @@ -82,10 +74,15 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic deviceToPod := p.toDeviceToPod(pods, deviceInfo) - slog.Debug(fmt.Sprintf("Device to pod mapping: %+v", deviceToPod)) + // 获取 Containerd 信息 + containerdInfo, err := p.getDockerInfo() + if err != nil { + return fmt.Errorf("failed to get containerd info: %w", err) + } - // Note: for loop are copies the value, if we want to change the value - // and not the copy, we need to use the indexes + slog.Debug(fmt.Sprintf("Containerd Device Information: %+v", containerdInfo)) + + // 遍历 Metrics 并更新 Attributes for counter := range metrics { for j, val := range metrics[counter] { deviceID, err := val.GetIDOfType(p.Config.KubernetesGPUIdType) @@ -95,6 +92,7 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic podInfo, exists := deviceToPod[deviceID] if exists { + // 添加 Pod 的 Attributes if !p.Config.UseOldNamespace { metrics[counter][j].Attributes[podAttribute] = podInfo.Name metrics[counter][j].Attributes[namespaceAttribute] = podInfo.Namespace @@ -104,6 +102,32 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic metrics[counter][j].Attributes[oldNamespaceAttribute] = podInfo.Namespace metrics[counter][j].Attributes[oldContainerAttribute] = podInfo.Container } + + // 通过 Pod 名字和 namespace 匹配 Containerd 的信息 + for _, container := range containerdInfo { + if container.PodName == podInfo.Name && container.PodNamespace == podInfo.Namespace { + // 追加来自 Containerd 的标签、注解和环境变量 + for k, v := range container.Labels { + if slices.Contains(podLabelsAllowList, k) { + normalizedKey := normalizeLabelName(k) + metrics[counter][j].Attributes["label_"+normalizedKey] = v + } + } + for k, v := range container.Annotations { + if slices.Contains(podLabelsAllowList, k) { + normalizedKey := normalizeLabelName(k) + metrics[counter][j].Attributes["annotation_"+normalizedKey] = v + } + } + for k, v := range container.Env { + // 判断键是否在允许的列表中 + if slices.Contains(podLabelsAllowList, k) { + normalizedKey := normalizeLabelName(k) + metrics[counter][j].Attributes["env_"+normalizedKey] = v + } + } + } + } } } } @@ -113,7 +137,7 @@ func (p *PodMapper) Process(metrics collector.MetricsByCounter, deviceInfo devic func connectToServer(socket string) (*grpc.ClientConn, func(), error) { resolver.SetDefaultScheme("passthrough") - conn, err := grpc.NewClient( + conn, err := grpc.Dial( socket, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { @@ -150,10 +174,8 @@ func (p *PodMapper) toDeviceToPod( for _, pod := range devicePods.GetPodResources() { for _, container := range pod.GetContainers() { for _, device := range container.GetDevices() { - resourceName := device.GetResourceName() if resourceName != appconfig.NvidiaResourceName && !slices.Contains(p.Config.NvidiaResourceNames, resourceName) { - // Mig resources appear differently than GPU resources if !strings.HasPrefix(resourceName, appconfig.NvidiaMigResourcePrefix) { continue } @@ -166,35 +188,6 @@ func (p *PodMapper) toDeviceToPod( } for _, deviceID := range device.GetDeviceIds() { - if strings.HasPrefix(deviceID, appconfig.MIG_UUID_PREFIX) { - migDevice, err := nvmlprovider.Client().GetMIGDeviceInfoByID(deviceID) - if err == nil { - giIdentifier := deviceinfo.GetGPUInstanceIdentifier(deviceInfo, migDevice.ParentUUID, - uint(migDevice.GPUInstanceID)) - deviceToPodMap[giIdentifier] = podInfo - } - gpuUUID := deviceID[len(appconfig.MIG_UUID_PREFIX):] - deviceToPodMap[gpuUUID] = podInfo - } else if gkeMigDeviceIDMatches := gkeMigDeviceIDRegex.FindStringSubmatch(deviceID); gkeMigDeviceIDMatches != nil { - var gpuIndex string - var gpuInstanceID string - for groupIdx, group := range gkeMigDeviceIDMatches { - switch groupIdx { - case 1: - gpuIndex = group - case 2: - gpuInstanceID = group - } - } - giIdentifier := fmt.Sprintf("%s-%s", gpuIndex, gpuInstanceID) - deviceToPodMap[giIdentifier] = podInfo - } else if strings.Contains(deviceID, gkeVirtualGPUDeviceIDSeparator) { - deviceToPodMap[strings.Split(deviceID, gkeVirtualGPUDeviceIDSeparator)[0]] = podInfo - } else if strings.Contains(deviceID, "::") { - gpuInstanceID := strings.Split(deviceID, "::")[0] - deviceToPodMap[gpuInstanceID] = podInfo - } - // Default mapping between deviceID and pod information deviceToPodMap[deviceID] = podInfo } } @@ -203,3 +196,159 @@ func (p *PodMapper) toDeviceToPod( return deviceToPodMap } + +// 获取 Containerd 中的容器信息 +func (p *PodMapper) getContainerdInfo() (map[string]ContainerInfo, error) { + socketPath := p.Config.ContainerdSocket + _, err := os.Stat(socketPath) + if os.IsNotExist(err) { + slog.Info("No Containerd socket, ignoring") + return nil, nil + } + + client, err := containerd.New(socketPath) + if err != nil { + return nil, fmt.Errorf("failed to connect to containerd: %w", err) + } + defer client.Close() + + ctx := namespaces.WithNamespace(context.Background(), "moby") + containers, err := client.Containers(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list containers: %w", err) + } + + containerInfo := make(map[string]ContainerInfo) + for _, container := range containers { + // 获取容器信息 + info, err := container.Info(ctx) + if err != nil { + continue + } + + // 只保留 "io.cri-containerd.kind" 为 "container" 的容器 + if kind, ok := info.Labels["io.cri-containerd.kind"]; !ok || kind != "container" { + continue + } + + labels := info.Labels + spec, err := container.Spec(ctx) + if err != nil { + continue + } + + // 提取 Pod 名字和 namespace + podName := labels["io.kubernetes.pod.name"] + podNamespace := labels["io.kubernetes.pod.namespace"] + if podName == "" || podNamespace == "" { + continue + } + + annotations := spec.Annotations + env := make(map[string]string) + for _, e := range spec.Process.Env { + parts := strings.SplitN(e, "=", 2) + if len(parts) == 2 { + env[parts[0]] = parts[1] + } + } + + // 保存容器信息 + containerInfo[info.ID] = ContainerInfo{ + PodName: podName, + PodNamespace: podNamespace, + Labels: labels, + Annotations: annotations, + Env: env, + } + } + + return containerInfo, nil +} + +// 获取 Docker 容器信息 +func (p *PodMapper) getDockerInfo() (map[string]ContainerInfo, error) { + socketPath := p.Config.DockerdSocket + // 检查 Docker socket 是否存在 + _, err := os.Stat(socketPath) + if os.IsNotExist(err) { + slog.Info("No Dockerd socket, ignoring") + return nil, nil + } + + // 创建 Docker 客户端 + cli, err := client.NewClientWithOpts( + client.WithHost("unix://"+socketPath), + client.WithVersion("1.40"), // Docker API 版本 + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to Docker: %w", err) + } + defer cli.Close() + + // 获取所有容器 + containers, err := cli.ContainerList(context.Background(), container.ListOptions{All: true}) + if err != nil { + return nil, fmt.Errorf("failed to list Docker containers: %w", err) + } + + containerInfo := make(map[string]ContainerInfo) + for _, container := range containers { + // 提取容器的标签和环境变量 + labels := container.Labels + envVars, err := cli.ContainerInspect(context.Background(), container.ID) + if err != nil { + slog.Warn(fmt.Sprintf("Failed to inspect container %s: %v", container.ID, err)) + continue + } + + // 提取 Pod 名字和 namespace + podName := labels["io.kubernetes.pod.name"] + podNamespace := labels["io.kubernetes.pod.namespace"] + if podName == "" || podNamespace == "" { + continue + } + + annotations := envVars.Config.Labels // 这里假设 annotations 在 Labels 中 + env := make(map[string]string) + for _, e := range envVars.Config.Env { + parts := strings.SplitN(e, "=", 2) + if len(parts) == 2 { + env[parts[0]] = parts[1] + } + } + + // 保存容器信息 + containerInfo[container.ID] = ContainerInfo{ + PodName: podName, + PodNamespace: podNamespace, + Labels: labels, + Annotations: annotations, + Env: env, + } + } + + return containerInfo, nil +} + +// 容器信息结构 +type ContainerInfo struct { + PodName string + PodNamespace string + Labels map[string]string + Annotations map[string]string + Env map[string]string +} + +func normalizeLabelName(name string) string { + // 将 `.` 和 `-` 替换为 `_` + name = strings.ReplaceAll(name, ".", "_") + name = strings.ReplaceAll(name, "-", "_") + + // 确保标签名称不以数字开头 + if len(name) > 0 && name[0] >= '0' && name[0] <= '9' { + name = "_" + name + } + + return name +} diff --git a/pkg/cmd/app.go b/pkg/cmd/app.go index 7da26ca..ee19af9 100644 --- a/pkg/cmd/app.go +++ b/pkg/cmd/app.go @@ -85,6 +85,9 @@ const ( CLIEnableDCGMLog = "enable-dcgm-log" CLIDCGMLogLevel = "dcgm-log-level" CLIPodResourcesKubeletSocket = "pod-resources-kubelet-socket" + CLIContainerdSocket = "containerd-socket" + CLIDockerdSocket = "dockerd-socket" + CLIPodLabelsAllowList = "pod-labels-allow-list" CLIHPCJobMappingDir = "hpc-job-mapping-dir" CLINvidiaResourceNames = "nvidia-resource-names" ) @@ -244,6 +247,24 @@ func NewApp(buildVersion ...string) *cli.App { Usage: "Path to the kubelet pod-resources socket file.", EnvVars: []string{"DCGM_POD_RESOURCES_KUBELET_SOCKET"}, }, + &cli.StringFlag{ + Name: CLIContainerdSocket, + Value: "/var/run/docker/containerd/containerd.sock", + Usage: "Path to the containerd socket file.", + EnvVars: []string{"DCGM_CONTAINERD_SOCKET"}, + }, + &cli.StringFlag{ + Name: CLIDockerdSocket, + Value: "/var/run/docker.sock", + Usage: "Path to the dockerd socket file.", + EnvVars: []string{"DCGM_DOCKERD_SOCKET"}, + }, + &cli.StringSliceFlag{ + Name: CLIPodLabelsAllowList, + Value: cli.NewStringSlice(), + Usage: "allow pod labels list.", + EnvVars: []string{"DCGM_POD_LABELS_ALLOW_LIST"}, + }, &cli.StringFlag{ Name: CLIHPCJobMappingDir, Value: "", @@ -604,6 +625,9 @@ func contextToConfig(c *cli.Context) (*appconfig.Config, error) { EnableDCGMLog: c.Bool(CLIEnableDCGMLog), DCGMLogLevel: dcgmLogLevel, PodResourcesKubeletSocket: c.String(CLIPodResourcesKubeletSocket), + ContainerdSocket: c.String(CLIContainerdSocket), + DockerdSocket: c.String(CLIDockerdSocket), + PodLabelsAllowList: c.StringSlice(CLIPodLabelsAllowList), HPCJobMappingDir: c.String(CLIHPCJobMappingDir), NvidiaResourceNames: c.StringSlice(CLINvidiaResourceNames), }, nil