目录

Aibrix部署跨节点推理实例

概述

通过 Aibrix 可以部署跨节点推理实例,这是基于 Aibrix 中的 CRDs RayClusterFleet 类型,实际上这也是基于 kuberay 的 RayCluster, 而 RayCluster 中的跨节点部署的后端推理框架使用的是 vLLM,下面是三者的关系。

1
2
3
Aibrix(平台)
  └─ KubeRay(任务调度 + 资源管理)
      └─ vLLM(高效推理引擎)

实验过程

因为 Aibrix 在公司内部部署的版本经过了修改,其注册的方式有变化,为了减少一些 Aibrix 在管控面的影响,调整了 RayClusterFleet 的一些标签,使得实验过程中部署的跨节点推理实例只是引用到了 RayClusterFleet,而不会增加其他非必要的操作。

经过调整之后的 RayClusterFleet,不会经过 Aibrix 的网关,所以我们在后面的操作用,将针对 RayClusterFleet 中的 Head 节点的 Pod IP 和端口实行请求和访问。

1
2
3
4
# k get pods -o wide
NAME                                                                READY   STATUS      RESTARTS   AGE     IP               NODE         
qwen-coder-7b-instruct-75df9c55c5-p9898-head-xwtk5                  1/1     Running     0          4h3m    10.176.35.11     10.175.35.25 
qwen-coder-7b-instruct-75df9c55c5-p9898-small-grou-worker-f6bhp     1/1     Running     0          4h3m    10.176.33.178    10.175.33.23 

其中 Aibrix 在创建 RayClusterFleet 的时候会把 Ray Dashboard 启动,Ray Dashboard 对于后面进行模型推理过程的 Profiling 有帮助。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_7.png

为了方便测试,部署 open-webui 来对接 vLLM 接口,在外部连接填上 Head 节点的 IP 端口。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_3.png

选择当前测试的 RayClusterFleet 的模型 qwen-coder-7b-instruct。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_4.png

正常的请求和返回,说明模型部署成功。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_5.png

Spark刷数

跨节点推理实例部署成功后,可以使用 Spark 来刷数,下面用 Spark Operator 部署一个 SparkApplication 作为一个简单的例子。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import random
import requests
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from concurrent.futures import ThreadPoolExecutor
from pyspark import AccumulatorParam

# 创建 SparkSession,启用动态分配
spark = (
    SparkSession.builder
    .appName("Dynamic Scaling Test with Request Statistics")
    .config("spark.sql.shuffle.partitions", "200")  # 全局设置分区数
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "1")
    .config("spark.dynamicAllocation.maxExecutors", "10")
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")
    .getOrCreate()
)

# 随机生成问题的函数
def generate_random_questions():
    prompts = [
        "What is the meaning of life?",
        "Tell me a story about AI.",
        "How does quantum computing work?",
        "Explain deep learning in simple terms.",
        "What are the benefits of renewable energy?",
        "What is the future of artificial intelligence?",
        "How does blockchain technology work?",
        "Can you explain the concept of entropy?",
        "What are the applications of machine learning?",
        "How do neural networks learn?",
    ]
    return random.choice(prompts)

# 自定义累加器,用于记录请求统计信息
class RequestStatsAccumulator(AccumulatorParam):
    def zero(self, value):
        return {"total_requests": 0, "total_time": 0.0, "max_time": 0.0, "min_time": float("inf")}

    def addInPlace(self, v1, v2):
        return {
            "total_requests": v1["total_requests"] + v2["total_requests"],
            "total_time": v1["total_time"] + v2["total_time"],
            "max_time": max(v1["max_time"], v2["max_time"]),
            "min_time": min(v1["min_time"], v2["min_time"]),
        }

# 初始化累加器
request_stats_accumulator = spark.sparkContext.accumulator(
    {"total_requests": 0, "total_time": 0.0, "max_time": 0.0, "min_time": float("inf")},
    RequestStatsAccumulator()
)

# 在分区中调用 OpenAI-Compatible API 的函数
def run_inference_with_http(partition):
    http_server_host = "10.176.35.11"
    http_server_port = 8000

    def make_request(question):
        start_time = time.time()  # 记录开始时间

        payload = {
            "model": "qwen-coder-7b-instruct",
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": question}
            ],
            "temperature": 0.7,
            "top_p": 0.8,
            "repetition_penalty": 1.05,
            "max_tokens": 512
        }

        try:
            response = requests.post(
                f"http://{http_server_host}:{http_server_port}/v1/chat/completions",
                json=payload,
                timeout=15
            )
            response.raise_for_status()
            result = response.json()
            answer = result["choices"][0]["message"]["content"]
        except requests.exceptions.RequestException as e:
            answer = f"Error: {e}"

        end_time = time.time()  # 记录结束时间
        elapsed_time = end_time - start_time

        # 更新累加器
        request_stats_accumulator.add({
            "total_requests": 1,
            "total_time": elapsed_time,
            "max_time": elapsed_time,
            "min_time": elapsed_time,
        })

        return question, answer

    # 并发执行请求
    questions = [generate_random_questions() for _ in partition]
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(make_request, questions))

    return results

# 模拟动态负载
if __name__ == "__main__":
    try:
        for i in range(6):  # 每 5 分钟触发一次负载变化
            print(f"--- Iteration {i + 1}: Adjusting load ---")

            partitions = random.randint(100, 500)
            data_size = random.randint(10000, 50000)

            print(f"Using {partitions} partitions and {data_size} rows of data.")

            data = [("placeholder",)] * data_size
            schema = StructType([StructField("placeholder", StringType(), True)])
            df = spark.createDataFrame(data, schema)

            rdd = df.rdd.repartition(partitions).mapPartitions(run_inference_with_http)
            result_df = spark.createDataFrame(rdd, schema=["question", "answer"])
            result_df.show(n=10, truncate=False)

            print("Sleeping for 5 minutes before next iteration...")
            time.sleep(300)

        # 在任务完成后,打印总请求统计信息
        stats = request_stats_accumulator.value
        print("\n--- Request Statistics ---")
        print(f"Total Requests Sent: {stats['total_requests']}")
        print(f"Total Time Spent: {stats['total_time']:.2f} seconds")
        print(f"Average Time Per Request: {stats['total_time'] / stats['total_requests']:.2f} seconds")
        print(f"Max Time Per Request: {stats['max_time']:.2f} seconds")
        print(f"Min Time Per Request: {stats['min_time']:.2f} seconds")

    finally:
        spark.stop()        

vLLM 框架指标输出。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img.png

ray-head 的 GPU 相关指标。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_1.png

ray-worker 的 GPU 相关指标。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_2.png

PySpark 里使用了 Dynamic Resource Allocation,可以看到 Executor 会随着 task 数量动态调整。

/aibrix%E9%83%A8%E7%BD%B2%E8%B7%A8%E8%8A%82%E7%82%B9%E6%8E%A8%E7%90%86%E5%AE%9E%E4%BE%8B/img_6.png

总结

本文通过 Aibrix 部署跨节点推理实例,使用 vLLM 作为后端推理引擎,使用 PySpark 来刷数,验证了跨节点推理实例的可用性和稳定性,另外未来还可以考虑通过 Spark 的动态扩缩容的机制,在不停任务的情况下,动态调整资源的使用。