Skip to content

Fix task creation being skipped bug due to timeouts#996

Open
khandelwal-ayush wants to merge 2 commits intolinkedin:masterfrom
khandelwal-ayush:fix-task-reassignment-bug
Open

Fix task creation being skipped bug due to timeouts#996
khandelwal-ayush wants to merge 2 commits intolinkedin:masterfrom
khandelwal-ayush:fix-task-reassignment-bug

Conversation

@khandelwal-ayush
Copy link
Collaborator

Summary

Fix stale Coordinator assignment state after timeout causing missed ConnectorTask creation

When handleAssignmentChange() times out waiting for connector callbacks, it returns early without updating _assignedDatastreamTasks. This leaves stale state from the last successful assignment. On the next
assignment change, the Coordinator computes an incorrect diff and may skip creating ConnectorTasks for re-assigned DatastreamTasks — resulting in unprocessed data.

Root Cause (Production Incident)

Host: lva1-app128114.prod.linkedin.com, affected task: makto-db-152

  1. 2:11 AM — makto-db-152 assigned → ConnectorTask created → _assignedDatastreamTasks updated ✓
  2. 2:15 AM — makto-db-152 unassigned (due to restarts) → ConnectorTask stopped ✓ → but Coordinator timed out before updating _assignedDatastreamTasks ✗ (stale: still contains makto-db-152)
  3. 2:18 AM — makto-db-152 re-assigned → Coordinator diffs against stale state → sees makto-db-152 "already running" → skips creating a new ConnectorTask

Result: No ConnectorTask for makto-db-152, all connected GaaS DB/tables stopped processing.

Fix

Added _assignedDatastreamTasks.clear() in the catch (TimeoutException) block of handleAssignmentChange(). This forces a full reconciliation on the next assignment change — all connectors receive their
complete task list and reconcile against their internal state.

Why clear instead of update to new assignment? After timeout, future.cancel(true) interrupts connector threads. We don't know which connectors finished processing and which didn't. Clearing forces a full
re-dispatch. Connectors handle this idempotently since onAssignmentChange() receives the full task list, not a delta.

This follows the existing pattern in onSessionExpired() (line 693), which performs the same clear for the same reason.

Testing Done

Will be deployed and tested

// ConnectorTask being created. Clearing follows the same pattern as onSessionExpired().
_log.warn("Timeout when doing the assignment. Clearing current assignment state to force full "
+ "reconciliation on next assignment change.", e);
_assignedDatastreamTasks.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some tests to cover this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me add

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants