diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml new file mode 100644 index 0000000000..cc9024fada --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml @@ -0,0 +1,25 @@ +bundle: + name: wal-corrupted-test + +resources: + jobs: + valid_job: + name: "valid-job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + another_valid: + name: "another-valid" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt new file mode 100644 index 0000000000..1192629332 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt @@ -0,0 +1,56 @@ +=== Creating state file with serial 5 === +=== Creating WAL with corrupted entry === +=== WAL content === +{"lineage":"test-lineage-123","serial": [SERIAL]} +{"k":"resources.jobs.valid_job","v":{"__id__": "[ID]","state":{"name":"valid-job"}}} +not valid json - this line should be skipped +{"k":"resources.jobs.another_valid","v":{"__id__": "[ID]","state":{"name":"another-valid"}}} +=== Deploy (should recover valid entries, skip corrupted) === + +>>> [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.another_valid.tasks[0].new_cluster + in databricks.yml:23:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Warning: Single node cluster is not correctly configured + at resources.jobs.valid_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (should have recovered entries) === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.another_valid", + "resources.jobs.valid_job" + ] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/script b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script new file mode 100644 index 0000000000..d73595a6f4 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script @@ -0,0 +1,35 @@ +echo "=== Creating state file with serial 5 ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 5, + "state": {} +} +EOF + +echo "=== Creating WAL with corrupted entry ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-123","serial":6} +{"k":"resources.jobs.valid_job","v":{"__id__":"1111","state":{"name":"valid-job"}}} +not valid json - this line should be skipped +{"k":"resources.jobs.another_valid","v":{"__id__":"2222","state":{"name":"another-valid"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should recover valid entries, skip corrupted) ===" +trace $CLI bundle deploy 2>&1 | python3 sort_warnings.py + +echo "=== Final state (should have recovered entries) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys | sort)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/sort_warnings.py b/acceptance/bundle/deploy/wal/corrupted-wal-entry/sort_warnings.py new file mode 100644 index 0000000000..06a6a0e59c --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/sort_warnings.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Sort warning blocks in CLI output to make test output deterministic. + +Warning blocks look like: +Warning: Single node cluster is not correctly configured + at resources.jobs.XXX.tasks[0].new_cluster + in databricks.yml:NN:NN + +num_workers should be 0 only for single-node clusters... + spark_conf: + ... + custom_tags: + ... + +This script groups consecutive warning blocks, sorts them by job name, and outputs. +""" + +import re +import sys + + +def main(): + content = sys.stdin.read() + lines = content.split("\n") + + result = [] + i = 0 + + while i < len(lines): + line = lines[i] + + # Check if this is the start of a warning block + if line.startswith("Warning:"): + # Collect all consecutive warning blocks + warnings = [] + while i < len(lines) and ( + lines[i].startswith("Warning:") + or ( + warnings + and not lines[i].startswith("Uploading") + and not lines[i].startswith("Deploying") + and not lines[i].startswith(">>>") + and not lines[i].startswith("===") + ) + ): + # Collect one complete warning block + block = [] + if lines[i].startswith("Warning:"): + block.append(lines[i]) + i += 1 + # Collect until next Warning or end marker + while i < len(lines): + if lines[i].startswith("Warning:"): + break + if lines[i].startswith("Uploading") or lines[i].startswith("Deploying"): + break + if lines[i].startswith(">>>") or lines[i].startswith("==="): + break + block.append(lines[i]) + i += 1 + warnings.append(block) + else: + i += 1 + + # Sort warnings by the job name in "at resources.jobs.XXX" + def get_sort_key(block): + for line in block: + match = re.search(r"at resources\.jobs\.(\w+)", line) + if match: + return match.group(1) + return "" + + warnings.sort(key=get_sort_key) + + # Output sorted warnings + for block in warnings: + for line in block: + result.append(line) + else: + result.append(line) + i += 1 + + print("\n".join(result), end="") + + +if __name__ == "__main__": + main() diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml new file mode 100644 index 0000000000..5bbe82835c --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml @@ -0,0 +1,13 @@ +# WAL with corrupted entry - valid entries should be recovered, corrupted skipped. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get?job_id=1111" +Response.Body = '{"job_id": 1111, "settings": {"name": "valid-job"}}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get?job_id=2222" +Response.Body = '{"job_id": 2222, "settings": {"name": "another-valid"}}' diff --git a/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml new file mode 100644 index 0000000000..ebee1d9699 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-crash-test + +resources: + jobs: + job_a: + name: "test-job-a" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/crash-after-create/output.txt b/acceptance/bundle/deploy/wal/crash-after-create/output.txt new file mode 100644 index 0000000000..9c33326382 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/output.txt @@ -0,0 +1,38 @@ +=== Creating state directory === +=== Creating WAL file (simulating crash after job create) === +=== WAL content before deploy === +{"lineage":"test-lineage-123","serial": [SERIAL]} +{"k":"resources.jobs.job_a","v":{"__id__": "[ID]","state":{"name":"test-job-a"}}} +=== Deploy (should recover from WAL) === + +>>> [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.job_a.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== State file after recovery === +{ + "lineage": "test-lineage-123", + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.job_a" + ] +} +=== WAL file after successful deploy === +WAL file deleted (expected) diff --git a/acceptance/bundle/deploy/wal/crash-after-create/script b/acceptance/bundle/deploy/wal/crash-after-create/script new file mode 100644 index 0000000000..c583a5eead --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/script @@ -0,0 +1,24 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating WAL file (simulating crash after job create) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-123","serial":1} +{"k":"resources.jobs.job_a","v":{"__id__":"1001","state":{"name":"test-job-a"}}} +EOF + +echo "=== WAL content before deploy ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should recover from WAL) ===" +trace $CLI bundle deploy + +echo "=== State file after recovery ===" +cat .databricks/bundle/default/resources.json | jq -S '{lineage: .lineage, serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL file after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "WAL file deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.py b/acceptance/bundle/deploy/wal/crash-after-create/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.toml b/acceptance/bundle/deploy/wal/crash-after-create/test.toml new file mode 100644 index 0000000000..9e20bac15d --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.toml @@ -0,0 +1,10 @@ +# WAL recovery after simulated crash. Job was created but state wasn't finalized. +# Deploy should recover job from WAL and update it. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job-a"}}' diff --git a/acceptance/bundle/deploy/wal/empty-wal/databricks.yml b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml new file mode 100644 index 0000000000..147a1e1482 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-empty-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/empty-wal/out.test.toml b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/empty-wal/output.txt b/acceptance/bundle/deploy/wal/empty-wal/output.txt new file mode 100644 index 0000000000..91a31fe322 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/output.txt @@ -0,0 +1,37 @@ +=== Creating state directory === +=== Creating empty WAL file === +=== Empty WAL file exists === +[FILE_INFO] .databricks/bundle/default/resources.json.wal +=== Deploy (should handle empty WAL gracefully) === + +>>> [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-empty-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +Empty WAL deleted (expected) +=== State file content === +{ + "lineage": "[UUID]", + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/empty-wal/script b/acceptance/bundle/deploy/wal/empty-wal/script new file mode 100644 index 0000000000..f693753ac7 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/script @@ -0,0 +1,21 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating empty WAL file ===" +touch .databricks/bundle/default/resources.json.wal + +echo "=== Empty WAL file exists ===" +ls -la .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should handle empty WAL gracefully) ===" +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "Empty WAL deleted (expected)" +fi + +echo "=== State file content ===" +cat .databricks/bundle/default/resources.json | jq -S '{lineage: .lineage, serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.py b/acceptance/bundle/deploy/wal/empty-wal/test.py new file mode 100644 index 0000000000..11b15b1a45 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.py @@ -0,0 +1 @@ +print("hello") diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.toml b/acceptance/bundle/deploy/wal/empty-wal/test.toml new file mode 100644 index 0000000000..b97264c2be --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.toml @@ -0,0 +1,13 @@ +# Empty WAL file should be deleted and deploy should proceed normally. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + +[[Repls]] +Old = '-rw[^ ]+ \d+ [^ ]+ [^ ]+ \d+ [A-Z][a-z]+ \d+ \d+:\d+' +New = '[FILE_INFO]' diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml new file mode 100644 index 0000000000..67079aaef8 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-future-serial-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/output.txt b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt new file mode 100644 index 0000000000..ffb03147dc --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt @@ -0,0 +1,29 @@ +=== Creating state file (serial=2) === +=== Creating WAL with future serial (serial=5, expected=3) === +=== WAL content === +{"lineage":"test-lineage-123","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== Deploy (should fail with corruption error) === + +>>> errcode [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-future-serial-test/default/files... +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL serial (5) is ahead of expected (3), state may be corrupted + + +Exit code: [KILLED] diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/script b/acceptance/bundle/deploy/wal/future-serial-wal/script new file mode 100644 index 0000000000..7b1784b0c6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/script @@ -0,0 +1,28 @@ +echo "=== Creating state file (serial=2) ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with future serial (serial=5, expected=3) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-123","serial":5} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should fail with corruption error) ===" +trace errcode $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/test.py b/acceptance/bundle/deploy/wal/future-serial-wal/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/test.toml b/acceptance/bundle/deploy/wal/future-serial-wal/test.toml new file mode 100644 index 0000000000..424fe2f127 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/test.toml @@ -0,0 +1,4 @@ +# WAL with serial ahead of state - indicates corruption, should error. +# State has serial=2, WAL has serial=5 (expected would be 3). + +# No server stubs needed - deploy should fail before any API calls. diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml new file mode 100644 index 0000000000..014ec7f886 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-lineage-mismatch-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt new file mode 100644 index 0000000000..2419e7a612 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt @@ -0,0 +1,29 @@ +=== Creating state file with lineage-A === +=== Creating WAL with lineage-B (mismatch) === +=== WAL content === +{"lineage":"wal-lineage-bbb","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== Deploy (should fail with lineage mismatch error) === + +>>> errcode [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-lineage-mismatch-test/default/files... +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL lineage (wal-lineage-bbb) does not match state lineage (state-lineage-aaa) + + +Exit code: [KILLED] diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/script b/acceptance/bundle/deploy/wal/lineage-mismatch/script new file mode 100644 index 0000000000..b241246e6c --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/script @@ -0,0 +1,28 @@ +echo "=== Creating state file with lineage-A ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "state-lineage-aaa", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with lineage-B (mismatch) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"wal-lineage-bbb","serial":2} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should fail with lineage mismatch error) ===" +trace errcode $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.py b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml new file mode 100644 index 0000000000..509cc82f09 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml @@ -0,0 +1,4 @@ +# WAL with different lineage than state - should error. +# State has lineage "state-lineage-aaa", WAL has lineage "wal-lineage-bbb". + +# No server stubs needed - deploy should fail before any API calls. diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml b/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml new file mode 100644 index 0000000000..b4162d8fdf --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-multi-crash-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml b/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/output.txt b/acceptance/bundle/deploy/wal/multiple-crashes/output.txt new file mode 100644 index 0000000000..3e0426a628 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/output.txt @@ -0,0 +1,64 @@ +=== Creating state directory === +=== Creating WAL file (simulating crash after job create) === +=== WAL content === +{"lineage":"test-lineage-456","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== First deploy attempt (will crash during update) === + +>>> errcode [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] +=== WAL after first crash === +{"lineage":"test-lineage-456","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== Second deploy attempt (should succeed) === + +>>> [CLI] bundle deploy --force-lock +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/script b/acceptance/bundle/deploy/wal/multiple-crashes/script new file mode 100644 index 0000000000..795e4261e1 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/script @@ -0,0 +1,32 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating WAL file (simulating crash after job create) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-456","serial":1} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== First deploy attempt (will crash during update) ===" +trace errcode $CLI bundle deploy + +echo "=== WAL after first crash ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + cat .databricks/bundle/default/resources.json.wal +fi + +echo "=== Second deploy attempt (should succeed) ===" +trace $CLI bundle deploy --force-lock + +echo "=== Final state ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/test.py b/acceptance/bundle/deploy/wal/multiple-crashes/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/test.toml b/acceptance/bundle/deploy/wal/multiple-crashes/test.toml new file mode 100644 index 0000000000..2e9973c846 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/test.toml @@ -0,0 +1,10 @@ +# Multiple crashes during recovery - WAL should persist until successful finalize. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +KillCaller = 1 +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml b/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml new file mode 100644 index 0000000000..413705d40c --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml b/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/normal-deploy/output.txt b/acceptance/bundle/deploy/wal/normal-deploy/output.txt new file mode 100644 index 0000000000..50c1430641 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/output.txt @@ -0,0 +1,32 @@ + +>>> [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +WAL file deleted after successful deploy (expected) +=== State file content === +{ + "lineage": "[UUID]", + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/normal-deploy/script b/acceptance/bundle/deploy/wal/normal-deploy/script new file mode 100644 index 0000000000..5acc4d9b58 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/script @@ -0,0 +1,12 @@ +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected - should be deleted after Finalize)" + cat .databricks/bundle/default/resources.json.wal +else + echo "WAL file deleted after successful deploy (expected)" +fi + +echo "=== State file content ===" +cat .databricks/bundle/default/resources.json | jq -S '{lineage: .lineage, serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/normal-deploy/test.py b/acceptance/bundle/deploy/wal/normal-deploy/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/normal-deploy/test.toml b/acceptance/bundle/deploy/wal/normal-deploy/test.toml new file mode 100644 index 0000000000..1299046974 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/test.toml @@ -0,0 +1,9 @@ +# WAL is created during deploy, used for state tracking, and deleted after Finalize. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/stale-wal/databricks.yml b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml new file mode 100644 index 0000000000..6b24f6fd26 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-stale-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/stale-wal/out.test.toml b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/stale-wal/output.txt b/acceptance/bundle/deploy/wal/stale-wal/output.txt new file mode 100644 index 0000000000..3722788e52 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/output.txt @@ -0,0 +1,38 @@ +=== Creating state directory === +=== Creating state file (serial=2) === +=== Creating stale WAL with old serial (serial=1) === +=== WAL content before deploy === +{"lineage":"stale-test-lineage","serial": [SERIAL]} +{"k":"resources.jobs.stale_job","v":{"__id__": "[ID]","state":{"name":"stale-job"}}} +=== Deploy (should ignore stale WAL) === + +>>> [CLI] bundle deploy +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-stale-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +Stale WAL deleted (expected) +=== State file should NOT contain stale_job === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/stale-wal/script b/acceptance/bundle/deploy/wal/stale-wal/script new file mode 100644 index 0000000000..d814639a00 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/script @@ -0,0 +1,40 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating state file (serial=2) ===" +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "stale-test-lineage", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating stale WAL with old serial (serial=1) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"stale-test-lineage","serial":1} +{"k":"resources.jobs.stale_job","v":{"__id__":"9999","state":{"name":"stale-job"}}} +EOF + +echo "=== WAL content before deploy ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should ignore stale WAL) ===" +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "Stale WAL deleted (expected)" +fi + +echo "=== State file should NOT contain stale_job ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.py b/acceptance/bundle/deploy/wal/stale-wal/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.toml b/acceptance/bundle/deploy/wal/stale-wal/test.toml new file mode 100644 index 0000000000..934683ba6d --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.toml @@ -0,0 +1,9 @@ +# Deploy with a stale WAL (old serial) - WAL should be deleted and ignored. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml b/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml new file mode 100644 index 0000000000..063faa8e54 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-summary-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml b/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/output.txt b/acceptance/bundle/deploy/wal/summary-after-crash/output.txt new file mode 100644 index 0000000000..2e6abf645a --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/output.txt @@ -0,0 +1,25 @@ +=== Creating state directory === +=== Creating WAL file (simulating crash after job create) === +=== Bundle summary (should show job from WAL with id) === + +>>> [CLI] bundle summary -o json +Warning: Single node cluster is not correctly configured + at resources.jobs.test_job.tasks[0].new_cluster + in databricks.yml:13:13 + +num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + + +{ + "job_id": "[ID]", + "modified_status": null +} diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/script b/acceptance/bundle/deploy/wal/summary-after-crash/script new file mode 100644 index 0000000000..d2017c6590 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/script @@ -0,0 +1,11 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating WAL file (simulating crash after job create) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"summary-test-lineage","serial":1} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== Bundle summary (should show job from WAL with id) ===" +trace $CLI bundle summary -o json | jq '{job_id: .resources.jobs.test_job.id, modified_status: .resources.jobs.test_job.modified_status}' diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/test.py b/acceptance/bundle/deploy/wal/summary-after-crash/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/test.toml b/acceptance/bundle/deploy/wal/summary-after-crash/test.toml new file mode 100644 index 0000000000..3363a1c516 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/test.toml @@ -0,0 +1,2 @@ +# Bundle summary should show resources recovered from WAL. +# No server stubs needed - we just run bundle summary which reads state. diff --git a/acceptance/bundle/deploy/wal/test.toml b/acceptance/bundle/deploy/wal/test.toml new file mode 100644 index 0000000000..7fd1daf93b --- /dev/null +++ b/acceptance/bundle/deploy/wal/test.toml @@ -0,0 +1,43 @@ +# WAL (Write-Ahead Log) tests verify crash recovery during bundle deployment. +# These tests simulate process crashes using KillCaller and verify state recovery. +# Only runs with direct engine since WAL is a direct-engine feature. + +Local = true +Env.DATABRICKS_CLI_TEST_PID = "1" + +[EnvMatrix] +DATABRICKS_BUNDLE_ENGINE = ["direct"] + +[[Repls]] +Old = 'script: line \d+:\s+\d+ Killed(: 9)?\s+"\$@"' +New = '[PROCESS_KILLED]' + +[[Repls]] +Old = '(\n>>> errcode [^\n]+\n)\nExit code:' +New = """${1}[PROCESS_KILLED] + +Exit code:""" + +[[Repls]] +Old = 'Exit code: (137|1)' +New = 'Exit code: [KILLED]' + +[[Repls]] +Old = "\r" +New = '' + +[[Repls]] +Old = '"lineage":\s*"[0-9a-f-]+"' +New = '"lineage": "[UUID]"' + +[[Repls]] +Old = '"serial":\s*\d+' +New = '"serial": [SERIAL]' + +[[Repls]] +Old = '"__id__":\s*"\d+"' +New = '"__id__": "[ID]"' + +[[Repls]] +Old = '"job_id":\s*"\d+"' +New = '"job_id": "[ID]"' diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml new file mode 100644 index 0000000000..457a2d3e96 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-delete-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/output.txt b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt new file mode 100644 index 0000000000..8f52732d3e --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt @@ -0,0 +1,21 @@ +=== Creating state directory === +=== Creating state file (job exists) === +=== Creating WAL with delete entry (simulating crash during delete) === +=== WAL content === +{"lineage":"delete-test-lineage","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":null} +=== Updating config to remove job === +=== Deploy (should recover delete from WAL) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-delete-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (should have no jobs) === +{ + "serial": [SERIAL], + "state_keys": [] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/script b/acceptance/bundle/deploy/wal/wal-with-delete/script new file mode 100644 index 0000000000..f840355267 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/script @@ -0,0 +1,48 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating state file (job exists) ===" +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "delete-test-lineage", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with delete entry (simulating crash during delete) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"delete-test-lineage","serial":2} +{"k":"resources.jobs.test_job","v":null} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Updating config to remove job ===" +cat > databricks.yml << 'EOF' +bundle: + name: wal-delete-test + +resources: {} +EOF + +echo "=== Deploy (should recover delete from WAL) ===" +trace $CLI bundle deploy + +echo "=== Final state (should have no jobs) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/test.py b/acceptance/bundle/deploy/wal/wal-with-delete/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/test.toml b/acceptance/bundle/deploy/wal/wal-with-delete/test.toml new file mode 100644 index 0000000000..27045f8885 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/test.toml @@ -0,0 +1,5 @@ +# WAL recovery after crash during delete operation. +# Delete was recorded in WAL but not finalized. Deploy should complete the delete. + +# No server stubs needed - the delete was already done (recorded in WAL) +# and the job no longer needs API calls diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 993eb7238a..14161de32e 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -22,7 +22,12 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa } if len(plan.Plan) == 0 { - // Avoid creating state file if nothing to deploy + // Still need to finalize if WAL recovery happened to commit the recovered state + if b.StateDB.RecoveredFromWAL() { + if err := b.StateDB.Finalize(); err != nil { + logdiag.LogError(ctx, err) + } + } return } diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index bd8cfa24c3..e72ca4baf6 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -41,7 +41,7 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { // ValidatePlanAgainstState validates that a plan's lineage and serial match the current state. // This should be called early in the deployment process, before any file operations. // If the plan has no lineage (first deployment), validation is skipped. -func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { +func ValidatePlanAgainstState(ctx context.Context, statePath string, plan *deployplan.Plan) error { // If plan has no lineage, this is a first deployment before any state exists // No validation needed if plan.Lineage == "" { @@ -49,7 +49,7 @@ func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { } var stateDB dstate.DeploymentState - err := stateDB.Open(statePath) + err := stateDB.Open(ctx, statePath) if err != nil { // If state file doesn't exist but plan has lineage, something is wrong if os.IsNotExist(err) { @@ -74,7 +74,7 @@ func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { // InitForApply initializes the DeploymentBundle for applying a pre-computed plan. // This is used when --plan is specified to skip the planning phase. func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, statePath string, plan *deployplan.Plan) error { - err := b.StateDB.Open(statePath) + err := b.StateDB.Open(ctx, statePath) if err != nil { return fmt.Errorf("reading state from %s: %w", statePath, err) } @@ -110,7 +110,7 @@ func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks. } func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath string) (*deployplan.Plan, error) { - err := b.StateDB.Open(statePath) + err := b.StateDB.Open(ctx, statePath) if err != nil { return nil, fmt.Errorf("reading state from %s: %w", statePath, err) } diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index c369195cf2..102659eb34 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -5,21 +5,25 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strings" "sync" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/statemgmt/resourcestate" "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" "github.com/google/uuid" ) const currentStateVersion = 1 type DeploymentState struct { - Path string - Data Database - mu sync.Mutex + Path string + Data Database + mu sync.Mutex + wal *WAL + recoveredFromWAL bool } type Database struct { @@ -60,12 +64,22 @@ func (db *DeploymentState) SaveState(key, newID string, state any, dependsOn []d return err } - db.Data.State[key] = ResourceEntry{ + entry := ResourceEntry{ ID: newID, State: json.RawMessage(jsonMessage), DependsOn: dependsOn, } + // Write to WAL before updating memory + if err := db.ensureWALOpen(); err != nil { + return fmt.Errorf("failed to open WAL: %w", err) + } + if err := db.wal.writeEntry(key, &entry); err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + + db.Data.State[key] = entry + return nil } @@ -78,11 +92,50 @@ func (db *DeploymentState) DeleteState(key string) error { return nil } + // Write to WAL before updating memory (nil entry means delete) + if err := db.ensureWALOpen(); err != nil { + return fmt.Errorf("failed to open WAL: %w", err) + } + if err := db.wal.writeEntry(key, nil); err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + delete(db.Data.State, key) return nil } +// ensureWALOpen opens the WAL file and writes the header if not already done. +// Must be called while holding db.mu. +func (db *DeploymentState) ensureWALOpen() error { + if db.wal != nil { + return nil + } + + wal, err := openWAL(db.Path) + if err != nil { + return err + } + + // Generate lineage if this is a fresh deployment + lineage := db.Data.Lineage + if lineage == "" { + lineage = uuid.New().String() + db.Data.Lineage = lineage + } + + // WAL serial is the NEXT serial (current + 1) + walSerial := db.Data.Serial + 1 + + if err := wal.writeHeader(lineage, walSerial); err != nil { + wal.close() + return err + } + + db.wal = wal + return nil +} + func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { db.AssertOpened() db.mu.Lock() @@ -96,7 +149,7 @@ func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { return result, ok } -func (db *DeploymentState) Open(path string) error { +func (db *DeploymentState) Open(ctx context.Context, path string) error { db.mu.Lock() defer db.mu.Unlock() @@ -113,17 +166,36 @@ func (db *DeploymentState) Open(path string) error { // Create new database with serial=0, will be incremented to 1 in Finalize() db.Data = NewDatabase("", 0) db.Path = path - return nil + + // Write state file immediately to ensure it exists before any WAL operations. + // This guarantees we have a base state file for recovery validation. + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return fmt.Errorf("failed to create state directory: %w", err) + } + if err := db.unlockedSave(); err != nil { + return err + } + } else { + return err } - return err + } else { + err = json.Unmarshal(data, &db.Data) + if err != nil { + return err + } + db.Path = path } - err = json.Unmarshal(data, &db.Data) + // Attempt WAL recovery + recovered, err := recoverFromWAL(path, &db.Data) if err != nil { - return err + return fmt.Errorf("WAL recovery failed: %w", err) + } + if recovered { + log.Infof(ctx, "Recovered deployment state from WAL") + db.recoveredFromWAL = true } - db.Path = path return nil } @@ -131,14 +203,33 @@ func (db *DeploymentState) Finalize() error { db.mu.Lock() defer db.mu.Unlock() - // Generate lineage on first save + // Generate lineage on first save (if WAL wasn't opened) if db.Data.Lineage == "" { db.Data.Lineage = uuid.New().String() } db.Data.Serial++ - return db.unlockedSave() + err := db.unlockedSave() + if err != nil { + return err + } + + // Truncate WAL after successful state file write + if db.wal != nil { + if err := db.wal.truncate(); err != nil { + return fmt.Errorf("failed to truncate WAL: %w", err) + } + db.wal = nil + } else { + // No WAL was opened, but we should still clean up any stale WAL file + wp := walPath(db.Path) + if err := os.Remove(wp); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove stale WAL file: %w", err) + } + } + + return nil } func (db *DeploymentState) AssertOpened() { @@ -147,6 +238,12 @@ func (db *DeploymentState) AssertOpened() { } } +// RecoveredFromWAL returns true if state was recovered from WAL during Open(). +// This is used to determine if Finalize() should be called even with an empty plan. +func (db *DeploymentState) RecoveredFromWAL() bool { + return db.recoveredFromWAL +} + func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap { result := make(resourcestate.ExportedResourcesMap) for key, entry := range db.Data.State { diff --git a/bundle/direct/dstate/wal.go b/bundle/direct/dstate/wal.go new file mode 100644 index 0000000000..700bfa24e2 --- /dev/null +++ b/bundle/direct/dstate/wal.go @@ -0,0 +1,218 @@ +package dstate + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" +) + +// WALHeader is the first entry in the WAL file, containing metadata for validation. +type WALHeader struct { + Lineage string `json:"lineage"` + Serial int `json:"serial"` +} + +// WALEntry represents a single state mutation in the WAL. +// For set operations, V is populated. For delete operations, V is nil. +type WALEntry struct { + K string `json:"k"` + V *ResourceEntry `json:"v,omitempty"` +} + +// WAL manages the Write-Ahead Log for deployment state recovery. +type WAL struct { + path string + file *os.File +} + +// walPath returns the WAL file path for a given state file path. +func walPath(statePath string) string { + return statePath + ".wal" +} + +// openWAL opens or creates a WAL file for writing. +func openWAL(statePath string) (*WAL, error) { + wp := walPath(statePath) + f, err := os.OpenFile(wp, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return nil, fmt.Errorf("failed to open WAL file %q: %w", wp, err) + } + return &WAL{path: wp, file: f}, nil +} + +// writeHeader writes the WAL header (lineage and serial) as the first entry. +func (w *WAL) writeHeader(lineage string, serial int) error { + header := WALHeader{ + Lineage: lineage, + Serial: serial, + } + return w.writeJSON(header) +} + +// writeEntry appends a state mutation entry to the WAL. +func (w *WAL) writeEntry(key string, entry *ResourceEntry) error { + walEntry := WALEntry{ + K: key, + V: entry, + } + return w.writeJSON(walEntry) +} + +// writeJSON marshals and writes a JSON object as a single line, then syncs to disk. +func (w *WAL) writeJSON(v any) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal WAL entry: %w", err) + } + data = append(data, '\n') + + _, err = w.file.Write(data) + if err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + + err = w.file.Sync() + if err != nil { + return fmt.Errorf("failed to sync WAL file: %w", err) + } + + return nil +} + +// close closes the WAL file handle. +func (w *WAL) close() error { + if w.file != nil { + return w.file.Close() + } + return nil +} + +// truncate deletes the WAL file after successful finalization. +func (w *WAL) truncate() error { + if w.file != nil { + w.file.Close() + w.file = nil + } + err := os.Remove(w.path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove WAL file %q: %w", w.path, err) + } + return nil +} + +// readWAL reads and parses an existing WAL file for recovery. +// Returns the header and entries, or an error if the WAL is invalid. +func readWAL(statePath string) (*WALHeader, []WALEntry, error) { + wp := walPath(statePath) + f, err := os.Open(wp) + if err != nil { + return nil, nil, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + var header *WALHeader + var entries []WALEntry + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + if header == nil { + // First line must be the header + var h WALHeader + if err := json.Unmarshal(line, &h); err != nil { + return nil, nil, fmt.Errorf("WAL line %d: failed to parse header: %w", lineNum, err) + } + header = &h + } else { + // Subsequent lines are entries + var e WALEntry + if err := json.Unmarshal(line, &e); err != nil { + // Skip corrupted lines silently - this is expected for partial writes + continue + } + if e.K == "" { + // Skip entries with empty keys + continue + } + entries = append(entries, e) + } + } + + if err := scanner.Err(); err != nil { + return nil, nil, fmt.Errorf("failed to read WAL file: %w", err) + } + + if header == nil { + return nil, nil, errors.New("WAL file is empty or missing header") + } + + return header, entries, nil +} + +// recoverFromWAL attempts to recover state from an existing WAL file. +// It validates the WAL against the current state and replays valid entries. +// Returns true if recovery was performed, false if no recovery needed. +func recoverFromWAL(statePath string, db *Database) (bool, error) { + wp := walPath(statePath) + + // Check if WAL exists + if _, err := os.Stat(wp); os.IsNotExist(err) { + return false, nil + } + + header, entries, err := readWAL(statePath) + if err != nil { + // If we can't read the WAL at all, delete it and proceed + os.Remove(wp) + return false, nil + } + + // Validate WAL serial against state serial + expectedSerial := db.Serial + 1 + if header.Serial < expectedSerial { + // Stale WAL - delete and proceed without recovery + os.Remove(wp) + return false, nil + } + + if header.Serial > expectedSerial { + // WAL is ahead of state - this indicates corruption + return false, fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expectedSerial) + } + + // Validate lineage if both exist + if db.Lineage != "" && header.Lineage != "" && db.Lineage != header.Lineage { + return false, fmt.Errorf("WAL lineage (%s) does not match state lineage (%s)", header.Lineage, db.Lineage) + } + + // Adopt lineage from WAL if state doesn't have one + if db.Lineage == "" && header.Lineage != "" { + db.Lineage = header.Lineage + } + + // Initialize state map if needed + if db.State == nil { + db.State = make(map[string]ResourceEntry) + } + + // Replay entries + for _, entry := range entries { + if entry.V != nil { + // Set operation + db.State[entry.K] = *entry.V + } else { + // Delete operation + delete(db.State, entry.K) + } + } + + return true, nil +} diff --git a/bundle/direct/dstate/wal_test.go b/bundle/direct/dstate/wal_test.go new file mode 100644 index 0000000000..e475a92e9d --- /dev/null +++ b/bundle/direct/dstate/wal_test.go @@ -0,0 +1,419 @@ +package dstate + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWALPath(t *testing.T) { + assert.Equal(t, "/path/to/state.json.wal", walPath("/path/to/state.json")) +} + +func TestWALWriteAndRead(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Open WAL for writing + wal, err := openWAL(statePath) + require.NoError(t, err) + + // Write header + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + // Write entries + entry1 := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry1) + require.NoError(t, err) + + entry2 := &ResourceEntry{ + ID: "67890", + State: json.RawMessage(`{"name":"job2"}`), + } + err = wal.writeEntry("resources.jobs.job2", entry2) + require.NoError(t, err) + + // Write a delete entry (nil value) + err = wal.writeEntry("resources.jobs.old_job", nil) + require.NoError(t, err) + + err = wal.close() + require.NoError(t, err) + + // Read WAL back + header, entries, err := readWAL(statePath) + require.NoError(t, err) + + assert.Equal(t, "test-lineage", header.Lineage) + assert.Equal(t, 1, header.Serial) + + require.Len(t, entries, 3) + + assert.Equal(t, "resources.jobs.job1", entries[0].K) + require.NotNil(t, entries[0].V) + assert.Equal(t, "12345", entries[0].V.ID) + + assert.Equal(t, "resources.jobs.job2", entries[1].K) + require.NotNil(t, entries[1].V) + assert.Equal(t, "67890", entries[1].V.ID) + + assert.Equal(t, "resources.jobs.old_job", entries[2].K) + assert.Nil(t, entries[2].V) +} + +func TestWALTruncate(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + // Create WAL file + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + // Verify file exists + _, err = os.Stat(walFilePath) + require.NoError(t, err) + + // Truncate + err = wal.truncate() + require.NoError(t, err) + + // Verify file is removed + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) +} + +func TestRecoverFromWAL_NoWAL(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + db := NewDatabase("", 0) + recovered, err := recoverFromWAL(statePath, &db) + require.NoError(t, err) + assert.False(t, recovered) +} + +func TestRecoverFromWAL_ValidWAL(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Create WAL with serial = 1 (expecting state serial 0 + 1) + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + entry := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + // Create database with serial 0 + db := NewDatabase("", 0) + + // Recover + recovered, err := recoverFromWAL(statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + // Verify state was recovered + assert.Equal(t, "test-lineage", db.Lineage) + require.Contains(t, db.State, "resources.jobs.job1") + assert.Equal(t, "12345", db.State["resources.jobs.job1"].ID) +} + +func TestRecoverFromWAL_StaleWAL(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + // Create WAL with serial = 1 + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + // Create database with serial 2 (WAL is stale) + db := NewDatabase("test-lineage", 2) + + // Recover - should skip and delete WAL + recovered, err := recoverFromWAL(statePath, &db) + require.NoError(t, err) + assert.False(t, recovered) + + // WAL should be deleted + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) +} + +func TestRecoverFromWAL_FutureWAL(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Create WAL with serial = 5 + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 5) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + // Create database with serial 0 (WAL is from future - corrupted state) + db := NewDatabase("test-lineage", 0) + + // Recover - should fail + _, err = recoverFromWAL(statePath, &db) + assert.Error(t, err) + assert.Contains(t, err.Error(), "WAL serial (5) is ahead of expected (1)") +} + +func TestRecoverFromWAL_LineageMismatch(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Create WAL with lineage A + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("lineage-A", 1) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + // Create database with lineage B + db := NewDatabase("lineage-B", 0) + + // Recover - should fail + _, err = recoverFromWAL(statePath, &db) + assert.Error(t, err) + assert.Contains(t, err.Error(), "lineage") +} + +func TestRecoverFromWAL_DeleteOperation(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Create WAL with delete operation + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + // Add an entry + entry := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry) + require.NoError(t, err) + + // Delete the entry + err = wal.writeEntry("resources.jobs.job1", nil) + require.NoError(t, err) + + err = wal.close() + require.NoError(t, err) + + // Create database + db := NewDatabase("", 0) + + // Recover + recovered, err := recoverFromWAL(statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + // Entry should NOT be present (deleted) + assert.NotContains(t, db.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALIntegration(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + // Create deployment state + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + // Save some state + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, nil) + require.NoError(t, err) + + // WAL should exist + _, err = os.Stat(walFilePath) + require.NoError(t, err) + + // Read WAL to verify content + header, entries, err := readWAL(statePath) + require.NoError(t, err) + assert.Equal(t, 1, header.Serial) // serial + 1 + require.Len(t, entries, 1) + assert.Equal(t, "resources.jobs.job1", entries[0].K) + assert.Equal(t, "12345", entries[0].V.ID) + + // Finalize + err = db.Finalize() + require.NoError(t, err) + + // WAL should be deleted + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) + + // State file should exist with correct serial + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var savedDB Database + err = json.Unmarshal(data, &savedDB) + require.NoError(t, err) + assert.Equal(t, 1, savedDB.Serial) + assert.Contains(t, savedDB.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALRecoveryOnOpen(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + // Create initial state file + initialDB := NewDatabase("test-lineage", 5) + initialDB.State["resources.jobs.existing"] = ResourceEntry{ + ID: "existing-id", + State: json.RawMessage(`{"name":"existing"}`), + } + data, err := json.Marshal(initialDB) + require.NoError(t, err) + err = os.WriteFile(statePath, data, 0o600) + require.NoError(t, err) + + // Create WAL with serial 6 (5 + 1) + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 6) + require.NoError(t, err) + entry := &ResourceEntry{ + ID: "new-id", + State: json.RawMessage(`{"name":"new"}`), + } + err = wal.writeEntry("resources.jobs.new", entry) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + // Open should recover from WAL + var db DeploymentState + err = db.Open(ctx, statePath) + require.NoError(t, err) + + // Both existing and new resources should be present + assert.Contains(t, db.Data.State, "resources.jobs.existing") + assert.Contains(t, db.Data.State, "resources.jobs.new") + assert.Equal(t, "new-id", db.Data.State["resources.jobs.new"].ID) +} + +func TestDeploymentState_DeleteStateWritesWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + // Add a resource + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, nil) + require.NoError(t, err) + + // Delete the resource + err = db.DeleteState("resources.jobs.job1") + require.NoError(t, err) + + // Read WAL to verify delete entry + _, entries, err := readWAL(statePath) + require.NoError(t, err) + + require.Len(t, entries, 2) + assert.Equal(t, "resources.jobs.job1", entries[1].K) + assert.Nil(t, entries[1].V) // nil means delete + + // Finalize + err = db.Finalize() + require.NoError(t, err) + + // State file should NOT contain the deleted resource + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var savedDB Database + err = json.Unmarshal(data, &savedDB) + require.NoError(t, err) + assert.NotContains(t, savedDB.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALWithDependsOn(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + dependsOn := []deployplan.DependsOnEntry{ + {Node: "resources.clusters.cluster1", Label: "${resources.clusters.cluster1.id}"}, + } + + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, dependsOn) + require.NoError(t, err) + + // Read WAL + _, entries, err := readWAL(statePath) + require.NoError(t, err) + + require.Len(t, entries, 1) + require.NotNil(t, entries[0].V) + require.Len(t, entries[0].V.DependsOn, 1) + assert.Equal(t, "resources.clusters.cluster1", entries[0].V.DependsOn[0].Node) +} + +func TestRecoverFromWAL_CorruptedLine(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + // Manually write WAL with corrupted line + content := `{"lineage":"test","serial":1} +{"k":"resources.jobs.job1","v":{"__id__":"12345","state":{}}} +not valid json +{"k":"resources.jobs.job2","v":{"__id__":"67890","state":{}}} +` + err := os.WriteFile(walFilePath, []byte(content), 0o600) + require.NoError(t, err) + + db := NewDatabase("", 0) + recovered, err := recoverFromWAL(statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + // Should have recovered job1 and job2, skipping corrupted line + assert.Contains(t, db.State, "resources.jobs.job1") + assert.Contains(t, db.State, "resources.jobs.job2") +} + diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index c30b133d97..c73aa672f7 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -65,7 +65,7 @@ func (d *DeploymentUnit) SetRemoteState(remoteState any) error { } func (b *DeploymentBundle) ExportState(ctx context.Context, path string) (resourcestate.ExportedResourcesMap, error) { - err := b.StateDB.Open(path) + err := b.StateDB.Open(ctx, path) if err != nil { return nil, err } diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 5a0f76ddbf..99002edb0a 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -210,7 +210,7 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (*bundle.Bundle, // Validate that the plan's lineage and serial match the current state // This must happen before any file operations _, localPath := b.StateFilenameDirect(ctx) - err = direct.ValidatePlanAgainstState(localPath, plan) + err = direct.ValidatePlanAgainstState(ctx, localPath, plan) if err != nil { logdiag.LogError(ctx, err) return b, stateDesc, root.ErrAlreadyPrinted diff --git a/wal.txt b/wal.txt new file mode 100644 index 0000000000..d365ed56d8 --- /dev/null +++ b/wal.txt @@ -0,0 +1,205 @@ +Design Document: Write-Ahead Log (WAL) for Bundle Deployment State Recovery +1. Problem Statement +When databricks bundle deploy is interrupted, resources created before the interruption become orphaned. The CLI only writes the state file at the end of deployment via Finalize(). Any resources created mid-deployment are lost from tracking. + +Current behavior: +Deploy starts → Create Job A → Create Job B → [CRASH] → State file empty → Jobs A, B orphaned + +Impact: Orphaned resources exist in Databricks but are unknown to future deployments. Users accumulate duplicate resources, leading to confusion and unexpected costs. + +Scope: Direct deployment engine only. Terraform has its own state management. +2. Solution Overview +Implement a Write-Ahead Log (WAL) that records each state mutation to disk immediately after the corresponding API call succeeds. +On recovery, replay the WAL to restore partial deployment state. + +Proposed behavior: +Deploy starts → Create Job A → [WAL: A] → Create Job B → [WAL: A,B] → [CRASH] +Next deploy → Load state → Replay WAL → State has A,B → No duplicates +3. Detailed Design +3.1 File Structure +The WAL is stored locally alongside the existing state file. + +File Path +Description +~/.databricks/bundle/// +Root directory for the bundle's state data. +~/.databricks/bundle///resources.json +The committed state file (existing). +~/.databricks/bundle///resources.json.wal +The Write-Ahead Log file (new). + +3.2 WAL Entry Format +Each entry is a JSON object written as a single line (NDJSON format). The entry embeds the existing ResourceEntry structure for consistency with the state file. + +Field +Type +Description +Lineage (First Entry Only) +String +UUID matching the state file's lineage (for validation). +Serial (First Entry Only) +Integer +Deployment serial number (for validation). +k (2nd Entry Onwards) +String +Resource key (e.g., resources.jobs.my_job). +v (2nd Entry Onwards) +ResourceEntry +The state entry. Omitted for delete operations. + + +ResourceEntry structure (existing, reused): + +Field +Type +Description +__id__ +String +The unique ID assigned by the Databricks API. +state +Object +Full snapshot of the resource configuration. + + +Example WAL: +{"lineage":"abc-123"} +{"k":"resources.jobs.my_job","v":{"__id__":"1234567","state":{...}}} +{"k":"resources.jobs.old_job"} // no v means delete op +3.3 WAL Lifecycle +Phase +Action +Open +Create or open resources.json.wal. +Write +Append entry after each successful API call. +Truncate +Delete resources.json.wal after successful Finalize(). + + +Durability: Each entry must be flushed to disk (fsync) immediately after the successful API response before proceeding. +Known Limitation: There is a small window (~microseconds) between API success and WAL write where a crash would orphan the resource. This is unavoidable is acceptable. +3.4 Recovery Mechanism +Recovery occurs at the start of deployment if the WAL file exists. + +Check: If resources.json.wal exists, initiate recovery. +Load Base State: +If resources.json exists: load it (provides lineage and serial). We are making sure it exists by writing immediately once we open/create it in the Open() method +Otherwise: create fresh state with new lineage. +Read WAL: Parse all entries from resources.json.wal (already chronologically ordered). +Validate Entries: +WAL serial == state serial + 1: Valid — replay entries. +WAL serial < state serial + 1: Stale WAL — delete WAL file, proceed without recovery. +WAL serial > state serial + 1: Corrupted state — return error. +Replay: For each valid entry: +set: Add or overwrite the resource in memory. +delete: Remove the resource from memory. +Proceed: Use the resulting state as the starting point for deployment. +Finalize: On success, write resources.json and delete resources.json.wal. +3.5 Integration Points +Action +Location +Detail +Recovery Check +Open() in dstate/state.go +Check for the WAL file and replay before proceeding. +Write WAL Entry +SaveState() / DeleteState() +Append entry before updating memory. +Truncation +Finalize() +Delete WAL after successful state file write. + +3.6 Error Handling +Scenario +Behavior +WAL write fails +Return error, abort deployment. +Corrupted WAL line +Log warning, skip line, continue replay. +Lineage mismatch +Return error, abort deployment. +Stale serial +Delete WAL + +5. Testing Plan +Use acceptance tests. Add support for the crash caller process from the test server. +Key test cases: +Tests which compile and run real binary against testserver. + +Normal deploy — WAL created, used, deleted. +Crash after 1 resource — recovery works. +Fresh deploy with existing WAL — lineage adopted. +Stale WAL (old serial) — entries skipped. +Corrupted WAL line — skipped, rest recovered. +Bundle summary works after interrupted deploy and sees ids stored in WAL +7. Open Questions +# +Question +Proposed Answer +1 +Should WAL be pushed to remote? +Never + +5. Test Plan + +We should use acceptance tests which compile and run real binary against testerver + +5.1 Unit Tests - WAL File Operations +| Test ID | Description | Expected Behavior | +|---------|-------------|-------------------| +| U01 | WAL path generation | walPath("resources.json") returns "resources.json.wal" | +| U02 | Write and read WAL | Header + entries written and read back correctly | +| U03 | Truncate WAL | File deleted from disk | +| U04 | Truncate non-existent WAL | No error returned | +| U05 | Read empty WAL | Returns error "WAL file is empty or missing header" | + +5.2 Unit Tests - WAL Recovery Logic +| Test ID | Description | Expected Behavior | +|---------|-------------|-------------------| +| R01 | No WAL exists | recoverFromWAL returns (false, nil) | +| R02 | Valid WAL (serial = state+1) | Entries replayed, returns (true, nil) | +| R03 | Stale WAL (serial < state+1) | WAL deleted, returns (false, nil) | +| R04 | Future WAL (serial > state+1) | Returns error about corruption | +| R05 | Lineage mismatch | Returns error about lineage mismatch | +| R06 | Lineage adopted from WAL | If state has no lineage, WAL lineage is used | +| R07 | Delete operation replay | Entry removed from state map | +| R08 | Corrupted entry line | Skipped, other entries recovered | + +5.3 Unit Tests - Integration with DeploymentState +| Test ID | Description | Expected Behavior | +|---------|-------------|-------------------| +| I01 | SaveState/DeleteState/Finalize flow | WAL created on first SaveState, entries written, truncated on Finalize, serial incremented | +| I02 | Finalize cleans stale WAL | If WAL file exists but wasn't opened this session, delete it | +| I03 | Open with existing WAL | Recovery performed before return | +| I04 | SaveState with DependsOn | DependsOn preserved in WAL entry | + +5.4 Acceptance Tests +| Test ID | Description | Steps | Expected Behavior | +|---------|-------------|-------|-------------------| +| A01 | Normal deploy | Deploy bundle with 2 resources | WAL created during deploy, deleted after Finalize | +| A02 | Crash recovery | 1. Deploy, crash after resource A created 2. Redeploy | Resource A recovered from WAL, resource B created, no duplicates | +| A03 | Bundle summary after crash | 1. Deploy, crash mid-deploy 2. Run bundle summary | Shows resources from WAL with correct IDs | + +5.5 Tests Implemented in wal_test.go +- TestWALPath (U01) +- TestWALWriteAndRead (U02) +- TestWALTruncate (U03, U04) +- TestRecoverFromWAL_NoWAL (R01) +- TestRecoverFromWAL_ValidWAL (R02) +- TestRecoverFromWAL_StaleWAL (R03) +- TestRecoverFromWAL_FutureWAL (R04) +- TestRecoverFromWAL_LineageMismatch (R05) +- TestRecoverFromWAL_DeleteOperation (R07) +- TestRecoverFromWAL_CorruptedLine (R08) +- TestDeploymentState_WALIntegration (I01) +- TestDeploymentState_WALRecoveryOnOpen (I03) +- TestDeploymentState_DeleteStateWritesWAL (I01) +- TestDeploymentState_WALWithDependsOn (I04) + +5.6 Tests Still Needed +| Test ID | Description | Priority | +|---------|-------------|----------| +| R06 | TestRecoverFromWAL_LineageAdoption (fresh state adopts WAL lineage) | High | +| I02 | TestDeploymentState_FinalizeCleansStaleWAL | Medium | +| U05 | TestReadEmptyWAL | Low | +| A01-A03 | Acceptance tests (require crash simulation infrastructure) | High |