Skip to content

Conversation

@jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Jan 24, 2026

import executorlib
from time import sleep

def reply(x):
    sleep(10)
    return x

with executorlib.SingleNodeExecutor(hostname_localhost=True) as exe:
    f = exe.submit(reply, 4, resource_dict={"timeout": 5})

Summary by CodeRabbit

  • New Features

    • Added per-task timeout support so users can set execution time limits; tasks that exceed their timeout now raise a TimeoutError and are marked done.
  • Tests

    • Added end-to-end timeout tests covering file, MPI, single-node executors and standalone communication to validate timeout behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

@jan-janssen jan-janssen linked an issue Jan 24, 2026 that may be closed by this pull request
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 24, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

Adds optional per-call and per-task timeout support: communication APIs accept an optional timeout, task schedulers propagate and track per-task timeouts, timeouts cause TimeoutError propagation to futures, and tests exercising timeout behavior were added.

Changes

Cohort / File(s) Summary
Communication Layer
src/executorlib/standalone/interactive/communication.py
receive_dict(timeout: Optional[int]=None) and send_and_receive_dict(..., timeout: Optional[int]=None) added; polling loop measures elapsed time and raises TimeoutError when deadline elapses.
File-Based Scheduler Timeout Tracking
src/executorlib/task_scheduler/file/shared.py
Added timeout_dict to track absolute deadlines, _check_timeout() to set exceptions on overdue futures, and integrated timeout checks after task completion updates.
Interactive Scheduler: block allocation
src/executorlib/task_scheduler/interactive/blockallocation.py
Pop timeout from executor_kwargs, propagate timeout into worker threads and to tasks; _execute_multiple_tasks(...) gains timeout: Optional[int] = None.
Interactive Scheduler: one-to-one
src/executorlib/task_scheduler/interactive/onetoone.py
Move timeout from resource_dict into top-level task_dict["timeout"] when present.
Interactive Scheduler: shared logic
src/executorlib/task_scheduler/interactive/shared.py
Extract timeout from task_dict and pass it to interface.send_and_receive_dict(..., timeout=...) in cached and non-cached execution paths.
Tests — file/MPI/single-node/communication
tests/test_cache_fileexecutor_serial.py, tests/test_mpiexecspawner.py, tests/test_singlenodeexecutor_noblock.py, tests/test_standalone_interactive_communication.py
Added reply() helpers and multiple tests asserting TimeoutError is raised for very short per-task timeouts and futures are marked done.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Scheduler as TaskScheduler
    participant Shared as SharedLayer
    participant Interface as SocketInterface
    participant Worker as TaskWorker

    Client->>Scheduler: submit(task, resource_dict={timeout:0.01})
    Scheduler->>Scheduler: extract timeout -> task_dict.timeout
    Scheduler->>Shared: _execute_task(task_dict, timeout=0.01)
    Shared->>Interface: send_and_receive_dict(input, timeout=0.01)
    Interface->>Interface: record start time
    Interface->>Worker: send task request
    Worker->>Worker: execute (sleep ~1s)
    Note over Interface: poll for response\ncheck elapsed time each loop
    Interface->>Interface: elapsed > 0.01s
    Interface-->>Shared: raise TimeoutError
    Shared-->>Scheduler: propagate TimeoutError to future
    Scheduler-->>Client: future.done() true, result() raises TimeoutError
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 I timed the carrot chase today,

Tasks that dawdle—no delay!
A little tick, a gentle stop,
Timeout hops and ends the flop.
Hooray! The scheduler saved the day. 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 31.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main feature being implemented across the changeset: timeout support for long-running functions in executorlib.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Jan 24, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 93.52%. Comparing base (575a482) to head (e1eaf9b).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #890      +/-   ##
==========================================
+ Coverage   93.42%   93.52%   +0.10%     
==========================================
  Files          38       38              
  Lines        1840     1870      +30     
==========================================
+ Hits         1719     1749      +30     
  Misses        121      121              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/executorlib/task_scheduler/interactive/blockallocation.py (1)

75-90: Preserve timeout in _process_kwargs for dynamic scaling.

When max_workers is increased, new threads use self._process_kwargs which currently omits timeout, so those workers won’t enforce timeouts.

✅ Proposed fix
-        timeout = executor_kwargs.pop("timeout", None)
-        self._process_kwargs = executor_kwargs
+        timeout = executor_kwargs.pop("timeout", None)
+        self._process_kwargs = executor_kwargs | {"timeout": timeout}
@@
-                    kwargs=executor_kwargs
-                    | {
+                    kwargs=self._process_kwargs
+                    | {
                         "worker_id": worker_id,
                         "stop_function": lambda: _interrupt_bootup_dict[self_id],
-                        "timeout": timeout,
                     },
🤖 Fix all issues with AI agents
In `@src/executorlib/standalone/interactive/communication.py`:
- Line 4: The code imports time() and uses fixed _time_out_ms slices for poll(),
which can overshoot the requested timeout; replace wall-clock time() with a
monotonic clock (time.monotonic or from time import monotonic) and, inside the
methods that call poll() (the polling loop in this module — see the sections
around lines that call poll() twice, within functions handling the interactive
communication), compute the remaining timeout as (deadline - monotonic_now) and
pass min(remaining_seconds, _time_out_ms_seconds) to poll each iteration so poll
never blocks longer than the remaining allowed time; ensure conversions between
ms and seconds are consistent.

In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 177-178: The per-task timeout handling needs to be evaluated and
cleaned up individually: in the loop that sets timeout_dict[task_key] when
timeout is not None, ensure you also check each running task against
timeout_dict[task_key] every iteration (using time() > timeout_dict[task_key])
and act on that single task's timeout immediately (e.g., mark it timed
out/cancel it) rather than waiting for all timeouts to expire; remove the
task_key entry from timeout_dict when the task finishes or its timeout is
cleared to avoid stale entries; apply the same per-task check/cleanup logic to
the other block around lines 271-280 so mixed tasks (some timed, some not)
cannot suppress individual timeouts or hang shutdown.

In `@src/executorlib/task_scheduler/interactive/shared.py`:
- Around line 170-184: The return type of _get_timeout_from_task_dict is
currently Optional[int] but callers pass fractional seconds; change the typing
to Optional[float] and update the function signature to reflect Optional[float]
(keep the same behavior of popping "timeout" from task_dict and returning its
value or None) so static type-checkers and readers see the correct type for
timeout; reference function name _get_timeout_from_task_dict and ensure any
associated docstring mentions float seconds.

In `@tests/test_standalone_interactive_communication.py`:
- Around line 30-32: The timeout test is flaky because it calls calc (which can
finish in ~0.01s); replace that call with the slow helper reply() (which sleeps)
so the timeout is reliably triggered: locate the timeout assertions in
tests/test_standalone_interactive_communication.py (the blocks using calc to
test timeouts) and change the invoked function to reply(i) (or pass the same
args) so the test waits the sleep duration and deterministically hits the
timeout; apply the same replacement for the second occurrence around the later
test block.
♻️ Duplicate comments (2)
src/executorlib/task_scheduler/file/shared.py (2)

102-103: Covered by the timeout logic issue below.


195-195: Covered by the timeout logic issue above.

import logging
import sys
from socket import gethostname
from time import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce the requested timeout precisely (avoid 1s overshoot).

poll() blocks for _time_out_ms even when timeout is much smaller (e.g., 0.01s), so the call can exceed the requested limit by up to _time_out_ms. This breaks the timeout contract and makes tests slower/flaky. Consider polling with the remaining timeout and using a monotonic clock to avoid wall‑clock jumps.

🔧 Proposed fix
-from time import time
+from time import monotonic
@@
-    def receive_dict(self, timeout: Optional[int] = None) -> dict:
+    def receive_dict(self, timeout: Optional[float] = None) -> dict:
@@
-            timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
+            timeout (float, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
                                       default time out set during initialization is used.
@@
-        time_start = time()
+        time_start = monotonic()
         while len(response_lst) == 0:
-            response_lst = self._poller.poll(self._time_out_ms)
+            poll_timeout_ms = self._time_out_ms
+            if timeout is not None:
+                elapsed = monotonic() - time_start
+                remaining = timeout - elapsed
+                if remaining <= 0:
+                    raise TimeoutError("SocketInterface reached timeout.")
+                poll_timeout_ms = min(self._time_out_ms, int(remaining * 1000))
+            response_lst = self._poller.poll(poll_timeout_ms)
             if not self._spawner.poll():
                 raise ExecutorlibSocketError(
                     "SocketInterface crashed during execution."
                 )
-            if timeout is not None and (time() - time_start) > timeout:
-                raise TimeoutError("SocketInterface reached timeout.")

Also applies to: 71-91, 103-113

🤖 Prompt for AI Agents
In `@src/executorlib/standalone/interactive/communication.py` at line 4, The code
imports time() and uses fixed _time_out_ms slices for poll(), which can
overshoot the requested timeout; replace wall-clock time() with a monotonic
clock (time.monotonic or from time import monotonic) and, inside the methods
that call poll() (the polling loop in this module — see the sections around
lines that call poll() twice, within functions handling the interactive
communication), compute the remaining timeout as (deadline - monotonic_now) and
pass min(remaining_seconds, _time_out_ms_seconds) to poll each iteration so poll
never blocks longer than the remaining allowed time; ensure conversions between
ms and seconds are consistent.

Comment on lines +177 to +178
if timeout is not None:
timeout_dict[task_key] = time() + timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix per-task timeout evaluation and cleanup.

Current logic only fires when all timeouts expire and all running tasks have timeouts, which can suppress timeouts (mixed tasks) and even hang shutdown when a timed-out task never completes. Also, timeout entries aren’t cleaned up.

✅ Proposed fix
 def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None:
-    if (
-        len(timeout_dict) > 0
-        and all([time() > timeout for timeout in timeout_dict.values()])
-        and all([key in timeout_dict for key in memory_dict])
-    ):
-        for key in memory_dict:
-            if key in timeout_dict:
-                memory_dict[key].set_exception(
-                    TimeoutError("Task execution exceeded the specified timeout.")
-                )
+    if not timeout_dict:
+        return
+    now = time()
+    for key, deadline in list(timeout_dict.items()):
+        if key not in memory_dict:
+            timeout_dict.pop(key, None)
+            continue
+        if now > deadline:
+            memory_dict[key].set_exception(
+                TimeoutError("Task execution exceeded the specified timeout.")
+            )
+            timeout_dict.pop(key, None)

Also applies to: 271-280

🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/file/shared.py` around lines 177 - 178, The
per-task timeout handling needs to be evaluated and cleaned up individually: in
the loop that sets timeout_dict[task_key] when timeout is not None, ensure you
also check each running task against timeout_dict[task_key] every iteration
(using time() > timeout_dict[task_key]) and act on that single task's timeout
immediately (e.g., mark it timed out/cancel it) rather than waiting for all
timeouts to expire; remove the task_key entry from timeout_dict when the task
finishes or its timeout is cleared to avoid stale entries; apply the same
per-task check/cleanup logic to the other block around lines 271-280 so mixed
tasks (some timed, some not) cannot suppress individual timeouts or hang
shutdown.

Comment on lines +170 to +184
def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
"""
Extract timeout value from the task_dict if present.
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
Optional[int]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use float for timeout typing to match call sites.

Tests pass fractional seconds (e.g., 0.01), so Optional[int] is misleading and can trip type-checkers.

🔧 Proposed fix
-def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
+def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
@@
-        Optional[int]: timeout value if present in the resource_dict, None otherwise.
+        Optional[float]: timeout value if present in the resource_dict, None otherwise.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
"""
Extract timeout value from the task_dict if present.
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
Optional[int]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
"""
Extract timeout value from the task_dict if present.
Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
Returns:
Optional[float]: timeout value if present in the resource_dict, None otherwise.
"""
if "timeout" in task_dict:
return task_dict.pop("timeout")
else:
return None
🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/interactive/shared.py` around lines 170 - 184,
The return type of _get_timeout_from_task_dict is currently Optional[int] but
callers pass fractional seconds; change the typing to Optional[float] and update
the function signature to reflect Optional[float] (keep the same behavior of
popping "timeout" from task_dict and returning its value or None) so static
type-checkers and readers see the correct type for timeout; reference function
name _get_timeout_from_task_dict and ensure any associated docstring mentions
float seconds.

Comment on lines +30 to +32
def reply(i):
sleep(1)
return i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the timeout test deterministic by using the slow helper.

calc can complete within 0.01s, so this test can intermittently pass without a timeout. Use reply() (sleep) to ensure the timeout is actually hit.

🔧 Proposed fix
-        task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}}
+        task_dict = {"fn": reply, "args": (), "kwargs": {"i": 2}}

Also applies to: 105-133

🤖 Prompt for AI Agents
In `@tests/test_standalone_interactive_communication.py` around lines 30 - 32, The
timeout test is flaky because it calls calc (which can finish in ~0.01s);
replace that call with the slow helper reply() (which sleeps) so the timeout is
reliably triggered: locate the timeout assertions in
tests/test_standalone_interactive_communication.py (the blocks using calc to
test timeouts) and change the invoked function to reply(i) (or pass the same
args) so the test waits the sleep duration and deterministically hits the
timeout; apply the same replacement for the second occurrence around the later
test block.

@jan-janssen jan-janssen marked this pull request as draft January 24, 2026 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Implement a timeout function for FileTaskScheduler

2 participants