feat: OracleDB support for CDC via LogMiner#4036
Conversation
|
Claude finished @josephwoodward's task in 4m 22s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). The integration tests are thorough and follow project patterns (testcontainers-go,
|
|
42cd45d to
e5b3386
Compare
|
Claude finished @josephwoodward's task in 8m 22s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). Component registration (
|
| lm.log.Infof("Caught up with redo logs, backing off") | ||
| } | ||
| time.Sleep(lm.cfg.MiningBackoffInterval) | ||
| } |
There was a problem hiding this comment.
Bug: time.Sleep blocks without checking context cancellation, so a shutdown signal arriving during the backoff interval is not noticed until the sleep completes. Since MiningBackoffInterval is user-configurable (and could be set to minutes), this delays graceful shutdown by up to that duration.
Per godev patterns — check for cancellation in long-running loops using a select on ctx.Done(). Replace the time.Sleep with a context-aware backoff:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(lm.cfg.MiningBackoffInterval):
}This also lets you remove the select/default wrapper above (lines 128-131) since cancellation is now checked at the sleep point.
There was a problem hiding this comment.
This was intentional as it's a tight loop so can result in a lot of unnecessary allocations, will need to benchmark to understand impact vs the additional wait time.
|
|
||
| == Permissions | ||
|
|
||
| When using the default Oracle based cache, the Connect user requires permission to create tables and stored procedures, and the ` + "rpcn" + ` schema must already exist. Refer to ` + "`" + ociFieldCheckpointCacheTableName + "`" + ` for more information. |
There was a problem hiding this comment.
We probably should have more actionable information here if possible
There was a problem hiding this comment.
I want to go through the docs separately to add more setup information so will make a note to include this.
| } else if policy.IsNoop() { | ||
| policy.Count = 1 | ||
| } |
There was a problem hiding this comment.
That is to say that Count must be > 0?
There was a problem hiding this comment.
It's to set a default value, following the same approach as other CDC components (PostgreSQL for instance).
| // Reset our stop signal | ||
| i.stopSig = shutdown.NewSignaller() | ||
|
|
||
| go func() { |
There was a problem hiding this comment.
Consider extracting to function that can return error and do not do ad-hoc
i.stopSig.TriggerHasStopped()
return
There was a problem hiding this comment.
You could actually use the wg, _ := errgroup.WithContext(softCtx) defined below
| return | ||
| } | ||
|
|
||
| if err = i.cacheSCN(softCtx, maxSCN); err != nil { |
There was a problem hiding this comment.
All the error reporting would be easier in a method.
| } | ||
|
|
||
| var currentSCN SCN | ||
| if err := s.db.QueryRowContext(ctx, "SELECT CURRENT_SCN FROM V$DATABASE").Scan(¤tSCN); err != nil { |
There was a problem hiding this comment.
nit: I saw sometimes we would use ` not " for SQL queries
| } | ||
|
|
||
| // StartSession starts a LogMiner session with ONLINE_CATALOG strategy | ||
| func (sm *SessionManager) StartSession(ctx context.Context, startSCN, endSCN uint64, committedDataOnly bool) error { |
There was a problem hiding this comment.
Do we want to add protection against calling it out of order?
There was a problem hiding this comment.
I've had a look at this and I don't think it's it's a concern personally. It's only used in the one place and I've simplified it so the call sequence is clearer.
|
Claude finished @josephwoodward's task in 2m 41s —— View job
Commits
Review Well-structured CDC implementation with good architectural separation (LogMiner engine, session manager, transaction buffer, SQL redo parser). Component registration (
|
This is more complicated than it looks. LOB data is streams in a separate redo event.
ce8f0af to
fcd491d
Compare
internal/impl/oracledb/batcher.go
Outdated
| msg := service.NewMessage(data) | ||
| msg.MetaSet("schema", m.Schema) |
There was a problem hiding this comment.
Bug: The metadata keys set here (schema, table) don't match the documented metadata field names in the component's Description (in input_oracledb_cdc.go). The documentation promises table_schema and table_name:
- table_schema: Schema of the table that the message originated from.
- table_name: Name of the table that the message originated from.
Users relying on the documented metadata keys in their Bloblang mappings (e.g., meta("table_schema")) will get empty values. Either the code should use table_schema/table_name, or the documentation should be updated to match schema/table.
| if err := createUpsertStoredProc(ctx, db, cacheTable); err != nil { | ||
| _ = db.Close() | ||
| return nil, fmt.Errorf("creating checkpoint cache write stored procedure: %w", err) | ||
| } | ||
|
|
||
| if created, err := createCacheTable(ctx, db, cacheTable); err != nil { |
There was a problem hiding this comment.
Bug: createUpsertStoredProc is called before createCacheTable, but the stored procedure body contains static SQL referencing the cache table (SELECT COUNT(*) INTO v_count FROM %s ..., UPDATE %s ..., INSERT INTO %s ...). On first run when neither exists, Oracle will create the procedure in INVALID state because the referenced table doesn't exist yet.
Notably, the MSSQL checkpoint cache in this same PR was fixed to swap the order — table creation first, then stored procedure creation. The same ordering fix should be applied here: move createCacheTable before createUpsertStoredProc.
| "github.com/redpanda-data/connect/v4/internal/license" | ||
| ) | ||
|
|
||
| func TestIntegration_OracleDBCDC_SnapshotAndStreaming(t *testing.T) { |
There was a problem hiding this comment.
Test naming convention violation: Test function names should use camelCase, not underscores per project test patterns. For example, TestIntegration_OracleDBCDC_SnapshotAndStreaming should be TestIntegrationOracleDBCDCSnapshotAndStreaming. This applies to all test functions in this file and in snapshot_test.go (e.g., TestIntegration_Snapshot).
|
Commits Review
|
| require.Truef(t, ok, "message %d missing 'schema' metadata", i) | ||
|
|
||
| table, ok := msg.MetaGet("table") | ||
| require.Truef(t, ok, "message %d missing 'table' metadata", i) | ||
|
|
||
| key := fmt.Sprintf("%s.%s", schema, table) |
There was a problem hiding this comment.
Bug: Metadata key mismatch — mustAssertMetadata reads "schema" and "table" metadata keys, but batchPublisher.Publish (in batcher.go) sets "table_schema" and "table_name".
These keys will never be found, so ok will always be false and the require.Truef will fail when the test actually runs.
Change msg.MetaGet("schema") → msg.MetaGet("table_schema") and msg.MetaGet("table") → msg.MetaGet("table_name").
|
Commits
Review
|
|
Commits
Review
|
| // https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md | ||
|
|
||
| package oracledb | ||
|
|
There was a problem hiding this comment.
Bug: Missing Oracle database driver import.
This package calls sql.Open("oracle", ...) (here in Connect(), and also in checkpoint_cache.go and snapshot.go), but the go-ora driver is never imported in any production code file. The side-effect import _ "github.com/sijms/go-ora/v2" only appears in test files (integration_test.go and oracledbtest.go). Note that the goora "github.com/sijms/go-ora/v2/network" import in logminer.go imports a sub-package for the OracleError type — it does not trigger the root package's init() that registers the "oracle" driver with database/sql.
At runtime this will fail with sql: unknown driver "oracle".
Add _ "github.com/sijms/go-ora/v2" to the import block in this file (or in the public wrapper public/components/oracledb/package.go).
| } | ||
| } | ||
| } | ||
|
|
||
| // FindStartPos finds the earliest possible SCN that exists within a log that's still available. | ||
| func (lm *LogMiner) FindStartPos(ctx context.Context) (replication.SCN, error) { |
There was a problem hiding this comment.
Context propagation: time.Sleep ignores context cancellation.
Per project Go patterns — "Check for cancellation in long-running loops" using select with ctx.Done().
The default branch sleeps without checking context cancellation. If the context is cancelled during time.Sleep(lm.cfg.MiningBackoffInterval) (default 5s), shutdown is delayed until the sleep completes. Replace the time.Sleep calls with a select on ctx.Done() and a timer, e.g.:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(lm.cfg.MiningBackoffInterval):
}|
Commits
Review
|
| for i, t := range userTables { | ||
| if i > 0 { | ||
| buf.WriteString(" OR ") | ||
| } | ||
| fmt.Fprintf(&buf, "(SEG_OWNER = '%s' AND TABLE_NAME = '%s')", t.Schema, t.Name) |
There was a problem hiding this comment.
SQL Injection: Schema and table names from DBA_TABLES are interpolated directly into SQL string literals with single quotes. Oracle quoted identifiers (created with double quotes) can contain single quotes — e.g., CREATE TABLE "it's" (id NUMBER) stores it's in DBA_TABLES. This would produce TABLE_NAME = 'it's', breaking the SQL and enabling injection.
Escape single quotes by doubling them (strings.ReplaceAll(t.Schema, "'", "''")) before interpolation, or use bind variables if the query structure permits it.
|
Commits
Review
|
| } else { | ||
| time.Sleep(lm.cfg.MiningInterval) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
time.Sleep in this loop blocks context cancellation. When ctx is canceled (e.g., during shutdown), the goroutine won't notice until the sleep completes — up to MiningBackoffInterval (default 5s) in the worst case.
Per the project context propagation pattern, long-running loops should check for cancellation using a select:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(lm.cfg.MiningBackoffInterval):
}This adds up to shutdownTimeout (5s) of unnecessary shutdown latency since Close() also waits 5s before hard-stopping.
|
Commits
Review
|

This pull request adds support for the CDC pattern via LogMiner. LogMiner is traditionally a tool for providing a relational interface to Oracle's Redo Log Files for the purposes of tasks such as data recovery and auditing, it's also commonly also used for supporting the change data capture pattern. As such, there's some additional work for enabling CDC via LogMiner which I'll break down below.
How CDC Works with LogMiner
From a high level, LogMiner works by loading redo logs then allowing SQL based access to the change events. From an architectural perspective the following core components are:
LogMiner (
logminer/logminer.go)The continuous streaming engine. Each mining cycle:
Session Manager (
logminer/session.go)This is responsible for loading log files into LogMiner as they're cycled. It does this in a mining loop that continuously checks for the latest latest SCN.
Transaction Buffer (
logminer/cache.go)Event rows queried via LogMiner begin with a transaction start and eventually end with a commit/rollback. This means as we're reading events we have to buffer them until we reach the associative commit or rollback. Rollbacks result in the buffer being discarded and commits result in the buffer being processed by the SQL Redo Parser (discussed below) before published to Benthos pipeline. As such, it's possible that long-running transactions on a high-throughput system can lead to memory exhaustion, so it's important to have controls in place to limit this (
max_transaction_eventsallows users to specify an upper limit).As mentioned, the current implementation is in-memory but it'd be good to add support for other storage mechanisms (such as redis, or potentially even OracleDB - similar to what we do with the checkpoint cache)
SQL Redo Parser (logminer/sqlredo/)
Parses the reconstructed SQL redo statements produced by LogMiner back into structured DMLEvent values (table, operation type, column values).
The queries we get back from log miner literally look like this:
In some instances they'll have functions in them such as below:
Based on the function name we have to perform conversions in Go (see
logminer/sqlredo/valueconverter_test.gofor tests).Things to follow up in separate PR