Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/executorlib/standalone/interactive/communication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import sys
from socket import gethostname
from time import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce the requested timeout precisely (avoid 1s overshoot).

poll() blocks for _time_out_ms even when timeout is much smaller (e.g., 0.01s), so the call can exceed the requested limit by up to _time_out_ms. This breaks the timeout contract and makes tests slower/flaky. Consider polling with the remaining timeout and using a monotonic clock to avoid wall‑clock jumps.

🔧 Proposed fix
-from time import time
+from time import monotonic
@@
-    def receive_dict(self, timeout: Optional[int] = None) -> dict:
+    def receive_dict(self, timeout: Optional[float] = None) -> dict:
@@
-            timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
+            timeout (float, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
                                       default time out set during initialization is used.
@@
-        time_start = time()
+        time_start = monotonic()
         while len(response_lst) == 0:
-            response_lst = self._poller.poll(self._time_out_ms)
+            poll_timeout_ms = self._time_out_ms
+            if timeout is not None:
+                elapsed = monotonic() - time_start
+                remaining = timeout - elapsed
+                if remaining <= 0:
+                    raise TimeoutError("SocketInterface reached timeout.")
+                poll_timeout_ms = min(self._time_out_ms, int(remaining * 1000))
+            response_lst = self._poller.poll(poll_timeout_ms)
             if not self._spawner.poll():
                 raise ExecutorlibSocketError(
                     "SocketInterface crashed during execution."
                 )
-            if timeout is not None and (time() - time_start) > timeout:
-                raise TimeoutError("SocketInterface reached timeout.")

Also applies to: 71-91, 103-113

🤖 Prompt for AI Agents
In `@src/executorlib/standalone/interactive/communication.py` at line 4, The code
imports time() and uses fixed _time_out_ms slices for poll(), which can
overshoot the requested timeout; replace wall-clock time() with a monotonic
clock (time.monotonic or from time import monotonic) and, inside the methods
that call poll() (the polling loop in this module — see the sections around
lines that call poll() twice, within functions handling the interactive
communication), compute the remaining timeout as (deadline - monotonic_now) and
pass min(remaining_seconds, _time_out_ms_seconds) to poll each iteration so poll
never blocks longer than the remaining allowed time; ensure conversions between
ms and seconds are consistent.

from typing import Any, Callable, Optional

import cloudpickle
Expand Down Expand Up @@ -67,20 +68,27 @@ def send_dict(self, input_dict: dict):
self._logger.warning("Send dictionary of size: " + str(sys.getsizeof(data)))
self._socket.send(data)

def receive_dict(self) -> dict:
def receive_dict(self, timeout: Optional[int] = None) -> dict:
"""
Receive a dictionary from a connected client process.

Args:
timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
default time out set during initialization is used.

Returns:
dict: dictionary with response received from the connected client
"""
response_lst: list[tuple[Any, int]] = []
time_start = time()
while len(response_lst) == 0:
response_lst = self._poller.poll(self._time_out_ms)
if not self._spawner.poll():
raise ExecutorlibSocketError(
"SocketInterface crashed during execution."
)
if timeout is not None and (time() - time_start) > timeout:
raise TimeoutError("SocketInterface reached timeout.")
data = self._socket.recv(zmq.NOBLOCK)
if self._logger is not None:
self._logger.warning(
Expand All @@ -92,19 +100,23 @@ def receive_dict(self) -> dict:
else:
raise output["error"]

def send_and_receive_dict(self, input_dict: dict) -> dict:
def send_and_receive_dict(
self, input_dict: dict, timeout: Optional[int] = None
) -> dict:
"""
Combine both the send_dict() and receive_dict() function in a single call.

Args:
input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the
connected client from listening.
timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
default time out set during initialization is used.

Returns:
dict: dictionary with response received from the connected client
"""
self.send_dict(input_dict=input_dict)
return self.receive_dict()
return self.receive_dict(timeout=timeout)

def bind_to_random_port(self) -> int:
"""
Expand Down
53 changes: 42 additions & 11 deletions src/executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import queue
from concurrent.futures import Future
from time import time
from typing import Any, Callable, Optional

from executorlib.standalone.command import get_cache_execute_command
Expand Down Expand Up @@ -81,6 +82,7 @@ def execute_tasks_h5(
process_dict: dict = {}
cache_dir_dict: dict = {}
file_name_dict: dict = {}
timeout_dict: dict = {}
while True:
task_dict = None
with contextlib.suppress(queue.Empty):
Expand All @@ -97,19 +99,22 @@ def execute_tasks_h5(
for key, value in memory_dict.items()
if not value.done()
}
_check_timeout(timeout_dict=timeout_dict, memory_dict=memory_dict)
if (
terminate_function is not None
and terminate_function == terminate_subprocess
):
for task in process_dict.values():
terminate_function(task=task)
for task_key, task in process_dict.items():
if task_key not in timeout_dict:
terminate_function(task=task)
elif terminate_function is not None:
for queue_id in process_dict.values():
terminate_function(
queue_id=queue_id,
config_directory=pysqa_config_directory,
backend=backend,
)
for task_key, queue_id in process_dict.items():
if task_key not in timeout_dict:
terminate_function(
queue_id=queue_id,
config_directory=pysqa_config_directory,
backend=backend,
)
future_queue.task_done()
future_queue.join()
break
Expand All @@ -123,9 +128,9 @@ def execute_tasks_h5(
task_resource_dict.update(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)
cache_key = task_resource_dict.pop("cache_key", None)
cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory"))
error_log_file = task_resource_dict.pop("error_log_file", None)
cache_key, cache_directory, error_log_file, timeout = (
_get_resource_parameters(task_resource_dict=task_resource_dict)
)
task_key, data_dict = serialize_funct(
fn=task_dict["fn"],
fn_args=task_args,
Expand Down Expand Up @@ -170,6 +175,8 @@ def execute_tasks_h5(
backend=backend,
cache_directory=cache_directory,
)
if timeout is not None:
timeout_dict[task_key] = time() + timeout
Comment on lines +178 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix per-task timeout evaluation and cleanup.

Current logic only fires when all timeouts expire and all running tasks have timeouts, which can suppress timeouts (mixed tasks) and even hang shutdown when a timed-out task never completes. Also, timeout entries aren’t cleaned up.

✅ Proposed fix
 def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None:
-    if (
-        len(timeout_dict) > 0
-        and all([time() > timeout for timeout in timeout_dict.values()])
-        and all([key in timeout_dict for key in memory_dict])
-    ):
-        for key in memory_dict:
-            if key in timeout_dict:
-                memory_dict[key].set_exception(
-                    TimeoutError("Task execution exceeded the specified timeout.")
-                )
+    if not timeout_dict:
+        return
+    now = time()
+    for key, deadline in list(timeout_dict.items()):
+        if key not in memory_dict:
+            timeout_dict.pop(key, None)
+            continue
+        if now > deadline:
+            memory_dict[key].set_exception(
+                TimeoutError("Task execution exceeded the specified timeout.")
+            )
+            timeout_dict.pop(key, None)

Also applies to: 271-280

🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/file/shared.py` around lines 177 - 178, The
per-task timeout handling needs to be evaluated and cleaned up individually: in
the loop that sets timeout_dict[task_key] when timeout is not None, ensure you
also check each running task against timeout_dict[task_key] every iteration
(using time() > timeout_dict[task_key]) and act on that single task's timeout
immediately (e.g., mark it timed out/cancel it) rather than waiting for all
timeouts to expire; remove the task_key entry from timeout_dict when the task
finishes or its timeout is cleared to avoid stale entries; apply the same
per-task check/cleanup logic to the other block around lines 271-280 so mixed
tasks (some timed, some not) cannot suppress individual timeouts or hang
shutdown.

file_name_dict[task_key] = os.path.join(
cache_directory, task_key + "_o.h5"
)
Expand All @@ -186,6 +193,7 @@ def execute_tasks_h5(
for key, value in memory_dict.items()
if not value.done()
}
_check_timeout(timeout_dict=timeout_dict, memory_dict=memory_dict)


def _check_task_output(
Expand Down Expand Up @@ -259,3 +267,26 @@ def _convert_args_and_kwargs(
else:
task_kwargs[key] = arg
return task_args, task_kwargs, future_wait_key_lst


def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None:
if (
len(timeout_dict) > 0
and all(time() > timeout for timeout in timeout_dict.values())
and all(key in timeout_dict for key in memory_dict)
):
for key, future in memory_dict.items():
if key in timeout_dict:
future.set_exception(
TimeoutError("Task execution exceeded the specified timeout.")
)


def _get_resource_parameters(
task_resource_dict: dict,
) -> tuple[Optional[str], str, Optional[str], Optional[int]]:
cache_key = task_resource_dict.pop("cache_key", None)
cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory"))
error_log_file = task_resource_dict.pop("error_log_file", None)
timeout = task_resource_dict.pop("timeout", None)
return cache_key, cache_directory, error_log_file, timeout
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["queue_join_on_shutdown"] = False
timeout = executor_kwargs.pop("timeout", None)
self._process_kwargs = executor_kwargs
self._max_workers = max_workers
self_id = random.getrandbits(128)
Expand All @@ -85,6 +86,7 @@ def __init__(
| {
"worker_id": worker_id,
"stop_function": lambda: _interrupt_bootup_dict[self_id],
"timeout": timeout,
},
)
for worker_id in range(self._max_workers)
Expand Down Expand Up @@ -211,6 +213,7 @@ def _execute_multiple_tasks(
worker_id: Optional[int] = None,
stop_function: Optional[Callable] = None,
restart_limit: int = 0,
timeout: Optional[int] = None,
**kwargs,
) -> None:
"""
Expand Down Expand Up @@ -239,6 +242,8 @@ def _execute_multiple_tasks(
distribution.
stop_function (Callable): Function to stop the interface.
restart_limit (int): The maximum number of restarting worker processes.
timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
default time out set during initialization is used.
"""
interface = interface_bootup(
command_lst=get_interactive_execute_command(
Expand Down Expand Up @@ -283,6 +288,8 @@ def _execute_multiple_tasks(
f.set_exception(exception=interface_initialization_exception)
else:
# The interface failed during the execution
if timeout is not None:
task_dict["timeout"] = timeout
interface.status = execute_task_dict(
task_dict=task_dict,
future_obj=f,
Expand Down
2 changes: 2 additions & 0 deletions src/executorlib/task_scheduler/interactive/onetoone.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def _wrap_execute_task_in_separate_process(
dictionary containing the future objects and the number of cores they require
"""
resource_dict = task_dict.pop("resource_dict").copy()
if "timeout" in resource_dict:
task_dict["timeout"] = resource_dict.pop("timeout")
f = task_dict.pop("future")
if "cores" not in resource_dict or (
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
Expand Down
29 changes: 27 additions & 2 deletions src/executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ def _execute_task_without_cache(
bool: True if the task was submitted successfully, False otherwise.
"""
try:
future_obj.set_result(interface.send_and_receive_dict(input_dict=task_dict))
future_obj.set_result(
interface.send_and_receive_dict(
input_dict=task_dict,
timeout=_get_timeout_from_task_dict(task_dict=task_dict),
)
)
except Exception as thread_exception:
if isinstance(thread_exception, ExecutorlibSocketError):
return False
Expand Down Expand Up @@ -143,7 +148,10 @@ def _execute_task_with_cache(
if file_name not in get_cache_files(cache_directory=cache_directory):
try:
time_start = time.time()
result = interface.send_and_receive_dict(input_dict=task_dict)
result = interface.send_and_receive_dict(
input_dict=task_dict,
timeout=_get_timeout_from_task_dict(task_dict=task_dict),
)
data_dict["output"] = result
data_dict["runtime"] = time.time() - time_start
dump(file_name=file_name, data_dict=data_dict)
Expand All @@ -157,3 +165,20 @@ def _execute_task_with_cache(
_, _, result = get_output(file_name=file_name)
future_obj.set_result(result)
return True


def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
"""
Extract timeout value from the task_dict if present.

Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}

Returns:
Optional[int]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
Comment on lines +170 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use float for timeout typing to match call sites.

Tests pass fractional seconds (e.g., 0.01), so Optional[int] is misleading and can trip type-checkers.

🔧 Proposed fix
-def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
+def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
@@
-        Optional[int]: timeout value if present in the resource_dict, None otherwise.
+        Optional[float]: timeout value if present in the resource_dict, None otherwise.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
"""
Extract timeout value from the task_dict if present.
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
Optional[int]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
"""
Extract timeout value from the task_dict if present.
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
Optional[float]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/interactive/shared.py` around lines 170 - 184,
The return type of _get_timeout_from_task_dict is currently Optional[int] but
callers pass fractional seconds; change the typing to Optional[float] and update
the function signature to reflect Optional[float] (keep the same behavior of
popping "timeout" from task_dict and returning its value or None) so static
type-checkers and readers see the correct type for timeout; reference function
name _get_timeout_from_task_dict and ensure any associated docstring mentions
float seconds.

12 changes: 12 additions & 0 deletions tests/test_cache_fileexecutor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def get_error(a):
raise ValueError(a)


def reply(i):
sleep(1)
return i


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped."
)
Expand All @@ -57,6 +62,13 @@ def test_executor_dependence_mixed(self):
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())

def test_executor_timeout(self):
with FileTaskScheduler(execute_function=execute_in_subprocess) as exe:
fs1 = exe.submit(reply, 2, resource_dict={"timeout": 0.01})
with self.assertRaises(TimeoutError):
fs1.result()
self.assertTrue(fs1.done())

def test_create_file_executor_error(self):
with self.assertRaises(TypeError):
create_file_executor()
Expand Down
12 changes: 12 additions & 0 deletions tests/test_mpiexecspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ def test_pympiexecutor_two_workers(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_pympiexecutor_timeout(self):
with BlockAllocationTaskScheduler(
max_workers=2,
executor_kwargs={"timeout": 0.01},
spawner=MpiExecSpawner,
) as exe:
cloudpickle_register(ind=1)
fs_1 = exe.submit(sleep_one, 1)
with self.assertRaises(TimeoutError):
fs_1.result()
self.assertTrue(fs_1.done())

def test_max_workers(self):
with BlockAllocationTaskScheduler(
max_workers=2,
Expand Down
28 changes: 28 additions & 0 deletions tests/test_singlenodeexecutor_noblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ def calc(i):
return i


def reply(i):
sleep(1)
return i


def resource_dict(resource_dict):
return resource_dict

Expand Down Expand Up @@ -70,6 +75,17 @@ def test_meta_executor_single(self):
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())

def test_time_out(self):
with SingleNodeExecutor(
max_cores=1,
block_allocation=False,
) as exe:
cloudpickle_register(ind=1)
fs_1 = exe.submit(reply, 1, resource_dict={"timeout": 0.01})
with self.assertRaises(TimeoutError):
fs_1.result()
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(TypeError):
SingleNodeExecutor(
Expand Down Expand Up @@ -120,6 +136,18 @@ def test_init_function(self):
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
self.assertEqual(worker_id, 0)

def test_time_out(self):
with SingleNodeExecutor(
max_cores=1,
block_allocation=True,
resource_dict={"timeout": 0.01},
) as exe:
cloudpickle_register(ind=1)
fs_1 = exe.submit(reply, 1)
with self.assertRaises(TimeoutError):
fs_1.result()
self.assertTrue(fs_1.done())

def test_init_function_two_workers(self):
with SingleNodeExecutor(
max_cores=2,
Expand Down
Loading
Loading