1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
import random
import requests
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from concurrent.futures import ThreadPoolExecutor
from pyspark import AccumulatorParam
# 创建 SparkSession,启用动态分配
spark = (
SparkSession.builder
.appName("Dynamic Scaling Test with Request Statistics")
.config("spark.sql.shuffle.partitions", "200") # 全局设置分区数
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "1")
.config("spark.dynamicAllocation.maxExecutors", "10")
.config("spark.dynamicAllocation.executorIdleTimeout", "300s")
.getOrCreate()
)
# 随机生成问题的函数
def generate_random_questions():
prompts = [
"What is the meaning of life?",
"Tell me a story about AI.",
"How does quantum computing work?",
"Explain deep learning in simple terms.",
"What are the benefits of renewable energy?",
"What is the future of artificial intelligence?",
"How does blockchain technology work?",
"Can you explain the concept of entropy?",
"What are the applications of machine learning?",
"How do neural networks learn?",
]
return random.choice(prompts)
# 自定义累加器,用于记录请求统计信息
class RequestStatsAccumulator(AccumulatorParam):
def zero(self, value):
return {"total_requests": 0, "total_time": 0.0, "max_time": 0.0, "min_time": float("inf")}
def addInPlace(self, v1, v2):
return {
"total_requests": v1["total_requests"] + v2["total_requests"],
"total_time": v1["total_time"] + v2["total_time"],
"max_time": max(v1["max_time"], v2["max_time"]),
"min_time": min(v1["min_time"], v2["min_time"]),
}
# 初始化累加器
request_stats_accumulator = spark.sparkContext.accumulator(
{"total_requests": 0, "total_time": 0.0, "max_time": 0.0, "min_time": float("inf")},
RequestStatsAccumulator()
)
# 在分区中调用 OpenAI-Compatible API 的函数
def run_inference_with_http(partition):
http_server_host = "10.176.35.11"
http_server_port = 8000
def make_request(question):
start_time = time.time() # 记录开始时间
payload = {
"model": "qwen-coder-7b-instruct",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": question}
],
"temperature": 0.7,
"top_p": 0.8,
"repetition_penalty": 1.05,
"max_tokens": 512
}
try:
response = requests.post(
f"http://{http_server_host}:{http_server_port}/v1/chat/completions",
json=payload,
timeout=15
)
response.raise_for_status()
result = response.json()
answer = result["choices"][0]["message"]["content"]
except requests.exceptions.RequestException as e:
answer = f"Error: {e}"
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time
# 更新累加器
request_stats_accumulator.add({
"total_requests": 1,
"total_time": elapsed_time,
"max_time": elapsed_time,
"min_time": elapsed_time,
})
return question, answer
# 并发执行请求
questions = [generate_random_questions() for _ in partition]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(make_request, questions))
return results
# 模拟动态负载
if __name__ == "__main__":
try:
for i in range(6): # 每 5 分钟触发一次负载变化
print(f"--- Iteration {i + 1}: Adjusting load ---")
partitions = random.randint(100, 500)
data_size = random.randint(10000, 50000)
print(f"Using {partitions} partitions and {data_size} rows of data.")
data = [("placeholder",)] * data_size
schema = StructType([StructField("placeholder", StringType(), True)])
df = spark.createDataFrame(data, schema)
rdd = df.rdd.repartition(partitions).mapPartitions(run_inference_with_http)
result_df = spark.createDataFrame(rdd, schema=["question", "answer"])
result_df.show(n=10, truncate=False)
print("Sleeping for 5 minutes before next iteration...")
time.sleep(300)
# 在任务完成后,打印总请求统计信息
stats = request_stats_accumulator.value
print("\n--- Request Statistics ---")
print(f"Total Requests Sent: {stats['total_requests']}")
print(f"Total Time Spent: {stats['total_time']:.2f} seconds")
print(f"Average Time Per Request: {stats['total_time'] / stats['total_requests']:.2f} seconds")
print(f"Max Time Per Request: {stats['max_time']:.2f} seconds")
print(f"Min Time Per Request: {stats['min_time']:.2f} seconds")
finally:
spark.stop()
|