-
Notifications
You must be signed in to change notification settings - Fork 170
fix: instance grpc client once per process in benchmarks #1725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
558d13d
2c0dc3a
dcc7932
2be3c4b
98f85ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -300,44 +300,46 @@ def target_wrapper(*args, **kwargs): | |
| ) | ||
|
|
||
|
|
||
| def _download_files_worker(files_to_download, other_params, chunks, bucket_type): | ||
| # For regional buckets, a new client must be created for each process. | ||
| # For zonal, the same is done for consistency. | ||
| # --- Global Variables for Worker Process --- | ||
| worker_loop = None | ||
| worker_client = None | ||
| worker_json_client = None | ||
|
|
||
|
|
||
| def _worker_init(bucket_type): | ||
| """Initializes a persistent event loop and client for each worker process.""" | ||
| global worker_loop, worker_client, worker_json_client | ||
| if bucket_type == "zonal": | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| client = loop.run_until_complete(create_client()) | ||
| try: | ||
| # download_files_using_mrd_multi_coro returns max latency of coros | ||
| result = download_files_using_mrd_multi_coro( | ||
| loop, client, files_to_download, other_params, chunks | ||
| ) | ||
| finally: | ||
| tasks = asyncio.all_tasks(loop=loop) | ||
| for task in tasks: | ||
| task.cancel() | ||
| loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | ||
| loop.close() | ||
| return result | ||
| worker_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(worker_loop) | ||
| worker_client = worker_loop.run_until_complete(create_client()) | ||
| else: # regional | ||
| from google.cloud import storage | ||
|
|
||
| json_client = storage.Client() | ||
| worker_json_client = storage.Client() | ||
|
Comment on lines
+309
to
+319
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The resources created in this initializer ( While terminating worker processes will cause the OS to reclaim these resources, it's better to perform a graceful shutdown. You can use the I suggest updating def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
global worker_loop, worker_client, worker_json_client
import atexit
if bucket_type == "zonal":
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
def _cleanup_zonal():
# Ensure resources are cleaned up when the worker process exits.
if worker_client and worker_loop and not worker_loop.is_closed():
try:
worker_loop.run_until_complete(worker_client.close())
finally:
worker_loop.close()
atexit.register(_cleanup_zonal)
else: # regional
from google.cloud import storage
worker_json_client = storage.Client()
def _cleanup_regional():
# Ensure resources are cleaned up when the worker process exits.
if worker_json_client:
worker_json_client.close()
atexit.register(_cleanup_regional)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. json_client doesn't have
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, json has |
||
|
|
||
|
|
||
| def _download_files_worker(files_to_download, other_params, chunks, bucket_type): | ||
| if bucket_type == "zonal": | ||
| # The loop and client are already initialized in _worker_init. | ||
| # download_files_using_mrd_multi_coro returns max latency of coros | ||
| return download_files_using_mrd_multi_coro( | ||
| worker_loop, worker_client, files_to_download, other_params, chunks | ||
| ) | ||
| else: # regional | ||
| # download_files_using_json_multi_threaded returns max latency of threads | ||
| return download_files_using_json_multi_threaded( | ||
| None, json_client, files_to_download, other_params, chunks | ||
| None, worker_json_client, files_to_download, other_params, chunks | ||
| ) | ||
|
|
||
|
|
||
| def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): | ||
| num_processes = params.num_processes | ||
| def download_files_mp_mc_wrapper(pool, files_names, params, chunks, bucket_type): | ||
| num_coros = params.num_coros # This is n, number of files per process | ||
|
|
||
| # Distribute filenames to processes | ||
| filenames_per_process = [ | ||
| files_names[i : i + num_coros] for i in range(0, len(files_names), num_coros) | ||
| ] | ||
|
|
||
| args = [ | ||
| ( | ||
| filenames, | ||
|
|
@@ -348,10 +350,7 @@ def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): | |
| for filenames in filenames_per_process | ||
| ] | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| with ctx.Pool(processes=num_processes) as pool: | ||
| results = pool.starmap(_download_files_worker, args) | ||
|
|
||
| results = pool.starmap(_download_files_worker, args) | ||
| return max(results) | ||
|
|
||
|
|
||
|
|
@@ -386,10 +385,16 @@ def test_downloads_multi_proc_multi_coro( | |
| logging.info("randomizing chunks") | ||
| random.shuffle(chunks) | ||
|
|
||
| ctx = multiprocessing.get_context("spawn") | ||
| pool = ctx.Pool( | ||
| processes=params.num_processes, | ||
| initializer=_worker_init, | ||
| initargs=(params.bucket_type,), | ||
| ) | ||
| output_times = [] | ||
|
|
||
| def target_wrapper(*args, **kwargs): | ||
| result = download_files_mp_mc_wrapper(*args, **kwargs) | ||
| result = download_files_mp_mc_wrapper(pool, *args, **kwargs) | ||
| output_times.append(result) | ||
| return output_times | ||
|
|
||
|
|
@@ -407,6 +412,8 @@ def target_wrapper(*args, **kwargs): | |
| ), | ||
| ) | ||
| finally: | ||
| pool.close() | ||
| pool.join() | ||
| publish_benchmark_extra_info(benchmark, params, true_times=output_times) | ||
| publish_resource_metrics(benchmark, m) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.