-
Notifications
You must be signed in to change notification settings - Fork 4
[Feature] Implement time out for long running functions #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
020c62f
2ed5a11
00ccf1e
b0e0d97
981fa2a
eac521b
e1eaf9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| import os | ||
| import queue | ||
| from concurrent.futures import Future | ||
| from time import time | ||
| from typing import Any, Callable, Optional | ||
|
|
||
| from executorlib.standalone.command import get_cache_execute_command | ||
|
|
@@ -81,6 +82,7 @@ def execute_tasks_h5( | |
| process_dict: dict = {} | ||
| cache_dir_dict: dict = {} | ||
| file_name_dict: dict = {} | ||
| timeout_dict: dict = {} | ||
| while True: | ||
| task_dict = None | ||
| with contextlib.suppress(queue.Empty): | ||
|
|
@@ -97,19 +99,22 @@ def execute_tasks_h5( | |
| for key, value in memory_dict.items() | ||
| if not value.done() | ||
| } | ||
| _check_timeout(timeout_dict=timeout_dict, memory_dict=memory_dict) | ||
| if ( | ||
| terminate_function is not None | ||
| and terminate_function == terminate_subprocess | ||
| ): | ||
| for task in process_dict.values(): | ||
| terminate_function(task=task) | ||
| for task_key, task in process_dict.items(): | ||
| if task_key not in timeout_dict: | ||
| terminate_function(task=task) | ||
| elif terminate_function is not None: | ||
| for queue_id in process_dict.values(): | ||
| terminate_function( | ||
| queue_id=queue_id, | ||
| config_directory=pysqa_config_directory, | ||
| backend=backend, | ||
| ) | ||
| for task_key, queue_id in process_dict.items(): | ||
| if task_key not in timeout_dict: | ||
| terminate_function( | ||
| queue_id=queue_id, | ||
| config_directory=pysqa_config_directory, | ||
| backend=backend, | ||
| ) | ||
| future_queue.task_done() | ||
| future_queue.join() | ||
| break | ||
|
|
@@ -123,9 +128,9 @@ def execute_tasks_h5( | |
| task_resource_dict.update( | ||
| {k: v for k, v in resource_dict.items() if k not in task_resource_dict} | ||
| ) | ||
| cache_key = task_resource_dict.pop("cache_key", None) | ||
| cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) | ||
| error_log_file = task_resource_dict.pop("error_log_file", None) | ||
| cache_key, cache_directory, error_log_file, timeout = ( | ||
| _get_resource_parameters(task_resource_dict=task_resource_dict) | ||
| ) | ||
| task_key, data_dict = serialize_funct( | ||
| fn=task_dict["fn"], | ||
| fn_args=task_args, | ||
|
|
@@ -170,6 +175,8 @@ def execute_tasks_h5( | |
| backend=backend, | ||
| cache_directory=cache_directory, | ||
| ) | ||
| if timeout is not None: | ||
| timeout_dict[task_key] = time() + timeout | ||
|
Comment on lines
+178
to
+179
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix per-task timeout evaluation and cleanup. Current logic only fires when all timeouts expire and all running tasks have timeouts, which can suppress timeouts (mixed tasks) and even hang shutdown when a timed-out task never completes. Also, timeout entries aren’t cleaned up. ✅ Proposed fix def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None:
- if (
- len(timeout_dict) > 0
- and all([time() > timeout for timeout in timeout_dict.values()])
- and all([key in timeout_dict for key in memory_dict])
- ):
- for key in memory_dict:
- if key in timeout_dict:
- memory_dict[key].set_exception(
- TimeoutError("Task execution exceeded the specified timeout.")
- )
+ if not timeout_dict:
+ return
+ now = time()
+ for key, deadline in list(timeout_dict.items()):
+ if key not in memory_dict:
+ timeout_dict.pop(key, None)
+ continue
+ if now > deadline:
+ memory_dict[key].set_exception(
+ TimeoutError("Task execution exceeded the specified timeout.")
+ )
+ timeout_dict.pop(key, None)Also applies to: 271-280 🤖 Prompt for AI Agents |
||
| file_name_dict[task_key] = os.path.join( | ||
| cache_directory, task_key + "_o.h5" | ||
| ) | ||
|
|
@@ -186,6 +193,7 @@ def execute_tasks_h5( | |
| for key, value in memory_dict.items() | ||
| if not value.done() | ||
| } | ||
| _check_timeout(timeout_dict=timeout_dict, memory_dict=memory_dict) | ||
|
|
||
|
|
||
| def _check_task_output( | ||
|
|
@@ -259,3 +267,26 @@ def _convert_args_and_kwargs( | |
| else: | ||
| task_kwargs[key] = arg | ||
| return task_args, task_kwargs, future_wait_key_lst | ||
|
|
||
|
|
||
| def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None: | ||
| if ( | ||
| len(timeout_dict) > 0 | ||
| and all(time() > timeout for timeout in timeout_dict.values()) | ||
| and all(key in timeout_dict for key in memory_dict) | ||
| ): | ||
| for key, future in memory_dict.items(): | ||
| if key in timeout_dict: | ||
| future.set_exception( | ||
| TimeoutError("Task execution exceeded the specified timeout.") | ||
| ) | ||
|
|
||
|
|
||
| def _get_resource_parameters( | ||
| task_resource_dict: dict, | ||
| ) -> tuple[Optional[str], str, Optional[str], Optional[int]]: | ||
| cache_key = task_resource_dict.pop("cache_key", None) | ||
| cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) | ||
| error_log_file = task_resource_dict.pop("error_log_file", None) | ||
| timeout = task_resource_dict.pop("timeout", None) | ||
| return cache_key, cache_directory, error_log_file, timeout | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -102,7 +102,12 @@ def _execute_task_without_cache( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bool: True if the task was submitted successfully, False otherwise. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| future_obj.set_result(interface.send_and_receive_dict(input_dict=task_dict)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| future_obj.set_result( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| interface.send_and_receive_dict( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| input_dict=task_dict, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout=_get_timeout_from_task_dict(task_dict=task_dict), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as thread_exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(thread_exception, ExecutorlibSocketError): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -143,7 +148,10 @@ def _execute_task_with_cache( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if file_name not in get_cache_files(cache_directory=cache_directory): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time_start = time.time() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = interface.send_and_receive_dict(input_dict=task_dict) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = interface.send_and_receive_dict( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| input_dict=task_dict, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout=_get_timeout_from_task_dict(task_dict=task_dict), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data_dict["output"] = result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data_dict["runtime"] = time.time() - time_start | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dump(file_name=file_name, data_dict=data_dict) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -157,3 +165,20 @@ def _execute_task_with_cache( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _, _, result = get_output(file_name=file_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| future_obj.set_result(result) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Extract timeout value from the task_dict if present. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional[int]: timeout value if present in the resource_dict, None otherwise. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if "timeout" in task_dict: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return task_dict.pop("timeout") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+170
to
+184
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use float for timeout typing to match call sites. Tests pass fractional seconds (e.g., 🔧 Proposed fix-def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
+def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
@@
- Optional[int]: timeout value if present in the resource_dict, None otherwise.
+ Optional[float]: timeout value if present in the resource_dict, None otherwise.📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforce the requested timeout precisely (avoid 1s overshoot).
poll()blocks for_time_out_mseven whentimeoutis much smaller (e.g., 0.01s), so the call can exceed the requested limit by up to_time_out_ms. This breaks the timeout contract and makes tests slower/flaky. Consider polling with the remaining timeout and using a monotonic clock to avoid wall‑clock jumps.🔧 Proposed fix
Also applies to: 71-91, 103-113
🤖 Prompt for AI Agents