-
Notifications
You must be signed in to change notification settings - Fork 4
[Feature] Implement time out for long running functions #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughAdds optional per-call and per-task timeout support: communication APIs accept an optional timeout, task schedulers propagate and track per-task timeouts, timeouts cause TimeoutError propagation to futures, and tests exercising timeout behavior were added. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Scheduler as TaskScheduler
participant Shared as SharedLayer
participant Interface as SocketInterface
participant Worker as TaskWorker
Client->>Scheduler: submit(task, resource_dict={timeout:0.01})
Scheduler->>Scheduler: extract timeout -> task_dict.timeout
Scheduler->>Shared: _execute_task(task_dict, timeout=0.01)
Shared->>Interface: send_and_receive_dict(input, timeout=0.01)
Interface->>Interface: record start time
Interface->>Worker: send task request
Worker->>Worker: execute (sleep ~1s)
Note over Interface: poll for response\ncheck elapsed time each loop
Interface->>Interface: elapsed > 0.01s
Interface-->>Shared: raise TimeoutError
Shared-->>Scheduler: propagate TimeoutError to future
Scheduler-->>Client: future.done() true, result() raises TimeoutError
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #890 +/- ##
==========================================
+ Coverage 93.42% 93.52% +0.10%
==========================================
Files 38 38
Lines 1840 1870 +30
==========================================
+ Hits 1719 1749 +30
Misses 121 121 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/executorlib/task_scheduler/interactive/blockallocation.py (1)
75-90: Preserve timeout in_process_kwargsfor dynamic scaling.When
max_workersis increased, new threads useself._process_kwargswhich currently omitstimeout, so those workers won’t enforce timeouts.✅ Proposed fix
- timeout = executor_kwargs.pop("timeout", None) - self._process_kwargs = executor_kwargs + timeout = executor_kwargs.pop("timeout", None) + self._process_kwargs = executor_kwargs | {"timeout": timeout} @@ - kwargs=executor_kwargs - | { + kwargs=self._process_kwargs + | { "worker_id": worker_id, "stop_function": lambda: _interrupt_bootup_dict[self_id], - "timeout": timeout, },
🤖 Fix all issues with AI agents
In `@src/executorlib/standalone/interactive/communication.py`:
- Line 4: The code imports time() and uses fixed _time_out_ms slices for poll(),
which can overshoot the requested timeout; replace wall-clock time() with a
monotonic clock (time.monotonic or from time import monotonic) and, inside the
methods that call poll() (the polling loop in this module — see the sections
around lines that call poll() twice, within functions handling the interactive
communication), compute the remaining timeout as (deadline - monotonic_now) and
pass min(remaining_seconds, _time_out_ms_seconds) to poll each iteration so poll
never blocks longer than the remaining allowed time; ensure conversions between
ms and seconds are consistent.
In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 177-178: The per-task timeout handling needs to be evaluated and
cleaned up individually: in the loop that sets timeout_dict[task_key] when
timeout is not None, ensure you also check each running task against
timeout_dict[task_key] every iteration (using time() > timeout_dict[task_key])
and act on that single task's timeout immediately (e.g., mark it timed
out/cancel it) rather than waiting for all timeouts to expire; remove the
task_key entry from timeout_dict when the task finishes or its timeout is
cleared to avoid stale entries; apply the same per-task check/cleanup logic to
the other block around lines 271-280 so mixed tasks (some timed, some not)
cannot suppress individual timeouts or hang shutdown.
In `@src/executorlib/task_scheduler/interactive/shared.py`:
- Around line 170-184: The return type of _get_timeout_from_task_dict is
currently Optional[int] but callers pass fractional seconds; change the typing
to Optional[float] and update the function signature to reflect Optional[float]
(keep the same behavior of popping "timeout" from task_dict and returning its
value or None) so static type-checkers and readers see the correct type for
timeout; reference function name _get_timeout_from_task_dict and ensure any
associated docstring mentions float seconds.
In `@tests/test_standalone_interactive_communication.py`:
- Around line 30-32: The timeout test is flaky because it calls calc (which can
finish in ~0.01s); replace that call with the slow helper reply() (which sleeps)
so the timeout is reliably triggered: locate the timeout assertions in
tests/test_standalone_interactive_communication.py (the blocks using calc to
test timeouts) and change the invoked function to reply(i) (or pass the same
args) so the test waits the sleep duration and deterministically hits the
timeout; apply the same replacement for the second occurrence around the later
test block.
♻️ Duplicate comments (2)
src/executorlib/task_scheduler/file/shared.py (2)
102-103: Covered by the timeout logic issue below.
195-195: Covered by the timeout logic issue above.
| import logging | ||
| import sys | ||
| from socket import gethostname | ||
| from time import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforce the requested timeout precisely (avoid 1s overshoot).
poll() blocks for _time_out_ms even when timeout is much smaller (e.g., 0.01s), so the call can exceed the requested limit by up to _time_out_ms. This breaks the timeout contract and makes tests slower/flaky. Consider polling with the remaining timeout and using a monotonic clock to avoid wall‑clock jumps.
🔧 Proposed fix
-from time import time
+from time import monotonic
@@
- def receive_dict(self, timeout: Optional[int] = None) -> dict:
+ def receive_dict(self, timeout: Optional[float] = None) -> dict:
@@
- timeout (int, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
+ timeout (float, optional): Time out for waiting for a message on socket in seconds. If None is provided, the
default time out set during initialization is used.
@@
- time_start = time()
+ time_start = monotonic()
while len(response_lst) == 0:
- response_lst = self._poller.poll(self._time_out_ms)
+ poll_timeout_ms = self._time_out_ms
+ if timeout is not None:
+ elapsed = monotonic() - time_start
+ remaining = timeout - elapsed
+ if remaining <= 0:
+ raise TimeoutError("SocketInterface reached timeout.")
+ poll_timeout_ms = min(self._time_out_ms, int(remaining * 1000))
+ response_lst = self._poller.poll(poll_timeout_ms)
if not self._spawner.poll():
raise ExecutorlibSocketError(
"SocketInterface crashed during execution."
)
- if timeout is not None and (time() - time_start) > timeout:
- raise TimeoutError("SocketInterface reached timeout.")Also applies to: 71-91, 103-113
🤖 Prompt for AI Agents
In `@src/executorlib/standalone/interactive/communication.py` at line 4, The code
imports time() and uses fixed _time_out_ms slices for poll(), which can
overshoot the requested timeout; replace wall-clock time() with a monotonic
clock (time.monotonic or from time import monotonic) and, inside the methods
that call poll() (the polling loop in this module — see the sections around
lines that call poll() twice, within functions handling the interactive
communication), compute the remaining timeout as (deadline - monotonic_now) and
pass min(remaining_seconds, _time_out_ms_seconds) to poll each iteration so poll
never blocks longer than the remaining allowed time; ensure conversions between
ms and seconds are consistent.
| if timeout is not None: | ||
| timeout_dict[task_key] = time() + timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix per-task timeout evaluation and cleanup.
Current logic only fires when all timeouts expire and all running tasks have timeouts, which can suppress timeouts (mixed tasks) and even hang shutdown when a timed-out task never completes. Also, timeout entries aren’t cleaned up.
✅ Proposed fix
def _check_timeout(timeout_dict: dict, memory_dict: dict) -> None:
- if (
- len(timeout_dict) > 0
- and all([time() > timeout for timeout in timeout_dict.values()])
- and all([key in timeout_dict for key in memory_dict])
- ):
- for key in memory_dict:
- if key in timeout_dict:
- memory_dict[key].set_exception(
- TimeoutError("Task execution exceeded the specified timeout.")
- )
+ if not timeout_dict:
+ return
+ now = time()
+ for key, deadline in list(timeout_dict.items()):
+ if key not in memory_dict:
+ timeout_dict.pop(key, None)
+ continue
+ if now > deadline:
+ memory_dict[key].set_exception(
+ TimeoutError("Task execution exceeded the specified timeout.")
+ )
+ timeout_dict.pop(key, None)Also applies to: 271-280
🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/file/shared.py` around lines 177 - 178, The
per-task timeout handling needs to be evaluated and cleaned up individually: in
the loop that sets timeout_dict[task_key] when timeout is not None, ensure you
also check each running task against timeout_dict[task_key] every iteration
(using time() > timeout_dict[task_key]) and act on that single task's timeout
immediately (e.g., mark it timed out/cancel it) rather than waiting for all
timeouts to expire; remove the task_key entry from timeout_dict when the task
finishes or its timeout is cleared to avoid stale entries; apply the same
per-task check/cleanup logic to the other block around lines 271-280 so mixed
tasks (some timed, some not) cannot suppress individual timeouts or hang
shutdown.
| def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]: | ||
| """ | ||
| Extract timeout value from the task_dict if present. | ||
| Args: | ||
| task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys | ||
| {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} | ||
| Returns: | ||
| Optional[int]: timeout value if present in the resource_dict, None otherwise. | ||
| """ | ||
| if "timeout" in task_dict: | ||
| return task_dict.pop("timeout") | ||
| else: | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use float for timeout typing to match call sites.
Tests pass fractional seconds (e.g., 0.01), so Optional[int] is misleading and can trip type-checkers.
🔧 Proposed fix
-def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]:
+def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]:
@@
- Optional[int]: timeout value if present in the resource_dict, None otherwise.
+ Optional[float]: timeout value if present in the resource_dict, None otherwise.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _get_timeout_from_task_dict(task_dict: dict) -> Optional[int]: | |
| """ | |
| Extract timeout value from the task_dict if present. | |
| Args: | |
| task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys | |
| {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} | |
| Returns: | |
| Optional[int]: timeout value if present in the resource_dict, None otherwise. | |
| """ | |
| if "timeout" in task_dict: | |
| return task_dict.pop("timeout") | |
| else: | |
| return None | |
| def _get_timeout_from_task_dict(task_dict: dict) -> Optional[float]: | |
| """ | |
| Extract timeout value from the task_dict if present. | |
| Args: | |
| task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys | |
| {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} | |
| Returns: | |
| Optional[float]: timeout value if present in the resource_dict, None otherwise. | |
| """ | |
| if "timeout" in task_dict: | |
| return task_dict.pop("timeout") | |
| else: | |
| return None |
🤖 Prompt for AI Agents
In `@src/executorlib/task_scheduler/interactive/shared.py` around lines 170 - 184,
The return type of _get_timeout_from_task_dict is currently Optional[int] but
callers pass fractional seconds; change the typing to Optional[float] and update
the function signature to reflect Optional[float] (keep the same behavior of
popping "timeout" from task_dict and returning its value or None) so static
type-checkers and readers see the correct type for timeout; reference function
name _get_timeout_from_task_dict and ensure any associated docstring mentions
float seconds.
| def reply(i): | ||
| sleep(1) | ||
| return i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the timeout test deterministic by using the slow helper.
calc can complete within 0.01s, so this test can intermittently pass without a timeout. Use reply() (sleep) to ensure the timeout is actually hit.
🔧 Proposed fix
- task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}}
+ task_dict = {"fn": reply, "args": (), "kwargs": {"i": 2}}Also applies to: 105-133
🤖 Prompt for AI Agents
In `@tests/test_standalone_interactive_communication.py` around lines 30 - 32, The
timeout test is flaky because it calls calc (which can finish in ~0.01s);
replace that call with the slow helper reply() (which sleeps) so the timeout is
reliably triggered: locate the timeout assertions in
tests/test_standalone_interactive_communication.py (the blocks using calc to
test timeouts) and change the invoked function to reply(i) (or pass the same
args) so the test waits the sleep duration and deterministically hits the
timeout; apply the same replacement for the second occurrence around the later
test block.
for more information, see https://pre-commit.ci
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.