Skip to content

Commit 94cd704

Browse files
committed
test(storage): improve throughput calculation in microbenchmarks
Refactor time-based read microbenchmarks to calculate throughput using actual elapsed time (`time.perf_counter()`) instead of the requested duration. This provides more accurate throughput metrics. Also updates utility functions to handle a list of durations for calculating throughput across multiple iterations.
1 parent 7bb29e5 commit 94cd704

2 files changed

Lines changed: 109 additions & 20 deletions

File tree

packages/google-cloud-storage/tests/perf/microbenchmarks/_utils.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import os
1616
import socket
1717
import statistics
18-
from typing import Any, List, Optional
18+
from typing import Any, List, Optional, Union
1919

2020
import psutil
2121

@@ -28,7 +28,7 @@ def publish_benchmark_extra_info(
2828
benchmark_group: str = "read",
2929
true_times: List[float] = [],
3030
download_bytes_list: Optional[List[int]] = None,
31-
duration: Optional[int] = None,
31+
duration: Optional[Union[float, List[float]]] = None,
3232
) -> None:
3333
"""
3434
Helper function to publish benchmark parameters to the extra_info property.
@@ -50,9 +50,19 @@ def publish_benchmark_extra_info(
5050

5151
if download_bytes_list is not None:
5252
assert duration is not None, (
53-
"Duration must be provided if total_bytes_transferred is provided."
53+
"Duration must be provided if download_bytes_list is provided."
5454
)
55-
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
55+
if isinstance(duration, list):
56+
assert len(download_bytes_list) == len(duration), (
57+
"Download bytes and duration lists must have the same length."
58+
)
59+
throughputs_list = [
60+
x / d / (1024 * 1024) for x, d in zip(download_bytes_list, duration)
61+
]
62+
else:
63+
throughputs_list = [
64+
x / duration / (1024 * 1024) for x in download_bytes_list
65+
]
5666
min_throughput = min(throughputs_list)
5767
max_throughput = max(throughputs_list)
5868
mean_throughput = statistics.mean(throughputs_list)

packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import random
2121
import time
2222
from io import BytesIO
23+
from typing import List, NamedTuple, Optional
2324

2425
import pytest
2526

@@ -39,11 +40,68 @@
3940
all_params = config._get_params()
4041

4142

43+
class DownloadResult(NamedTuple):
44+
total_bytes: int
45+
measured_start_time: float
46+
measured_end_time: float
47+
48+
4249
async def create_client():
4350
"""Initializes async client and gets the current event loop."""
4451
return AsyncGrpcClient()
4552

4653

54+
def _aggregate_download_results(results: List[DownloadResult]) -> DownloadResult:
55+
if not results:
56+
raise ValueError("At least one download result is required.")
57+
58+
total_bytes = sum(result.total_bytes for result in results)
59+
measured_start_time = min(result.measured_start_time for result in results)
60+
measured_end_time = max(result.measured_end_time for result in results)
61+
if measured_end_time <= measured_start_time:
62+
raise ValueError("Measured elapsed time must be positive.")
63+
64+
return DownloadResult(
65+
total_bytes=total_bytes,
66+
measured_start_time=measured_start_time,
67+
measured_end_time=measured_end_time,
68+
)
69+
70+
71+
def _calculate_average_throughput_mib_s(
72+
download_bytes_list: List[int], download_elapsed_times: List[float]
73+
) -> float:
74+
total_bytes_downloaded = sum(download_bytes_list)
75+
total_elapsed_time = sum(download_elapsed_times)
76+
if total_elapsed_time <= 0:
77+
raise ValueError("Total measured elapsed time must be positive.")
78+
79+
return (total_bytes_downloaded / total_elapsed_time) / (1024 * 1024)
80+
81+
82+
def _record_measured_start(
83+
measured_start_time: Optional[float], current_time: float
84+
) -> float:
85+
if measured_start_time is None:
86+
return current_time
87+
return measured_start_time
88+
89+
90+
def _build_download_result(
91+
total_bytes_downloaded: int,
92+
measured_start_time: Optional[float],
93+
measured_end_time: Optional[float],
94+
) -> DownloadResult:
95+
if measured_start_time is None or measured_end_time is None:
96+
raise ValueError("No downloads completed during the measured interval.")
97+
98+
return DownloadResult(
99+
total_bytes=total_bytes_downloaded,
100+
measured_start_time=measured_start_time,
101+
measured_end_time=measured_end_time,
102+
)
103+
104+
47105
# --- Global Variables for Worker Process ---
48106
worker_loop = None
49107
worker_client = None
@@ -78,15 +136,20 @@ def _download_time_based_json(client, filename, params):
78136

79137
offset = 0
80138
is_warming_up = True
81-
start_time = time.monotonic()
139+
start_time = time.perf_counter()
82140
warmup_end_time = start_time + params.warmup_duration
83141
test_end_time = warmup_end_time + params.duration
142+
measured_start_time = None
143+
measured_end_time = None
84144

85-
while time.monotonic() < test_end_time:
86-
current_time = time.monotonic()
145+
while time.perf_counter() < test_end_time:
146+
current_time = time.perf_counter()
87147
if is_warming_up and current_time >= warmup_end_time:
88148
is_warming_up = False
89149
total_bytes_downloaded = 0 # Reset counter after warmup
150+
measured_start_time = _record_measured_start(
151+
measured_start_time, current_time
152+
)
90153

91154
bytes_in_iteration = 0
92155
# For JSON, we can't batch ranges like gRPC, so we download one by one
@@ -110,8 +173,11 @@ def _download_time_based_json(client, filename, params):
110173

111174
if not is_warming_up:
112175
total_bytes_downloaded += bytes_in_iteration
176+
measured_end_time = time.perf_counter()
113177

114-
return total_bytes_downloaded
178+
return _build_download_result(
179+
total_bytes_downloaded, measured_start_time, measured_end_time
180+
)
115181

116182

117183
async def _download_time_based_async(client, filename, params):
@@ -122,15 +188,20 @@ async def _worker_coro():
122188
total_bytes_downloaded = 0
123189
offset = 0
124190
is_warming_up = True
125-
start_time = time.monotonic()
191+
start_time = time.perf_counter()
126192
warmup_end_time = start_time + params.warmup_duration
127193
test_end_time = warmup_end_time + params.duration
194+
measured_start_time = None
195+
measured_end_time = None
128196

129-
while time.monotonic() < test_end_time:
130-
current_time = time.monotonic()
197+
while time.perf_counter() < test_end_time:
198+
current_time = time.perf_counter()
131199
if is_warming_up and current_time >= warmup_end_time:
132200
is_warming_up = False
133201
total_bytes_downloaded = 0 # Reset counter after warmup
202+
measured_start_time = _record_measured_start(
203+
measured_start_time, current_time
204+
)
134205

135206
ranges = []
136207
if params.pattern == "rand":
@@ -153,13 +224,16 @@ async def _worker_coro():
153224

154225
if not is_warming_up:
155226
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
156-
return total_bytes_downloaded
227+
measured_end_time = time.perf_counter()
228+
return _build_download_result(
229+
total_bytes_downloaded, measured_start_time, measured_end_time
230+
)
157231

158232
tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)]
159233
results = await asyncio.gather(*tasks)
160234

161235
await mrd.close()
162-
return sum(results)
236+
return _aggregate_download_results(results)
163237

164238

165239
def _download_files_worker(process_idx, filename, params, bucket_type):
@@ -175,7 +249,8 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type):
175249
args = [(i, files_names[i], params, bucket_type) for i in range(len(files_names))]
176250

177251
results = pool.starmap(_download_files_worker, args)
178-
return sum(results)
252+
agg_res = _aggregate_download_results(results)
253+
return agg_res.total_bytes, agg_res.measured_end_time - agg_res.measured_start_time
179254

180255

181256
@pytest.mark.parametrize(
@@ -198,9 +273,14 @@ def test_downloads_multi_proc_multi_coro(
198273
)
199274

200275
download_bytes_list = []
276+
download_elapsed_times = []
201277

202278
def target_wrapper(*args, **kwargs):
203-
download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs))
279+
total_bytes, measured_elapsed_time = download_files_mp_mc_wrapper(
280+
pool, *args, **kwargs
281+
)
282+
download_bytes_list.append(total_bytes)
283+
download_elapsed_times.append(measured_elapsed_time)
204284
return
205285

206286
try:
@@ -214,10 +294,9 @@ def target_wrapper(*args, **kwargs):
214294
finally:
215295
pool.close()
216296
pool.join()
217-
total_bytes_downloaded = sum(download_bytes_list)
218-
throughput_mib_s = (
219-
total_bytes_downloaded / params.duration / params.rounds
220-
) / (1024 * 1024)
297+
throughput_mib_s = _calculate_average_throughput_mib_s(
298+
download_bytes_list, download_elapsed_times
299+
)
221300
benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}"
222301
print(
223302
f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s"
@@ -226,6 +305,6 @@ def target_wrapper(*args, **kwargs):
226305
benchmark,
227306
params,
228307
download_bytes_list=download_bytes_list,
229-
duration=params.duration,
308+
duration=download_elapsed_times,
230309
)
231310
publish_resource_metrics(benchmark, m)

0 commit comments

Comments
 (0)