Skip to content

Conversation

@Dioneya
Copy link

@Dioneya Dioneya commented Jan 9, 2026

  • Implement OutboxModelMixin.
  • Implement BinaryUUID for PostgreSQL and MySQL compatibility.
  • Switch ID generation to uuid7.
  • Make sqlalchemy an optional dependency.
  • Update tests (using testcontainers or mocks).
  • Update documentation (README).

Summary by CodeRabbit

  • New Features

    • Event IDs now use UUID v7 for improved timestamp-based ordering and reduced collisions.
    • Added support for flexible Transactional Outbox model definitions using SQLAlchemy mixins.
    • Enhanced SQLAlchemy integration with dialect-aware UUID storage across PostgreSQL and MySQL.
  • Documentation

    • Expanded documentation with examples for integrating the Transactional Outbox pattern with SQLAlchemy.
  • Chores

    • Version bumped to 4.4.6; new runtime dependencies added: uuid6, aio-pika, confluent-kafka.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • ✅ Full review completed - (🔄 Check again to review again)
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

🧪 Unit Test Generation v2 is now available!

We have significantly improved our unit test generation capabilities.

To enable: Add this to your .coderabbit.yaml configuration:

reviews:
  finishing_touches:
    unit_tests:
      enabled: true

Try it out by using the @coderabbitai generate unit tests command on your code files or under ✨ Finishing Touches on the walkthrough!

Have feedback? Share your thoughts on our Discord thread!


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/cqrs/__init__.py (1)

16-61: ⚠️ Potential issue | 🟠 Major

Guard SQLAlchemy import to keep base package usable without the optional dependency.

Line 16 (and line 50 in __all__): Importing cqrs.outbox.sqlalchemy unconditionally breaks when SQLAlchemy isn't installed. Since sqlalchemy is an optional dependency (defined in pyproject.toml lines 71-73, not in main dependencies), users installing without the [sqlalchemy] extra will fail on import cqrs. Use the same conditional import pattern already established in cqrs/outbox/__init__.py (lines 10-19).

🛠️ Suggested fix
-from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository
+try:
+    from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository
+except ImportError:
+    SqlAlchemyOutboxedEventRepository = None
@@
-__all__ = (
+__all__ = [
@@
-    "OutboxedEventRepository",
-    "SqlAlchemyOutboxedEventRepository",
+    "OutboxedEventRepository",
@@
-)
+]
+if SqlAlchemyOutboxedEventRepository is not None:
+    __all__.append("SqlAlchemyOutboxedEventRepository")
README.md (1)

470-528: ⚠️ Potential issue | 🟡 Minor

Fix the outbox example to match the actual repository API.

The code example incorrectly passes session to self.outbox.add() and self.outbox.commit(). The SqlAlchemyOutboxedEventRepository stores the session at initialization and binds both methods to that session—neither method accepts a session parameter.

📌 Required doc update
-            self.outbox.add(
-                session,
+            self.outbox.add(
                 cqrs.NotificationEvent[UserJoinedNotificationPayload](
                     event_name="UserJoined",
                     topic="user_notification_events",
                     payload=UserJoinedNotificationPayload(
                         user_id=request.user_id,
                         meeting_id=request.meeting_id,
                     ),
                 ),
             )
-            self.outbox.add(
-                session,
+            self.outbox.add(
                 cqrs.NotificationEvent[UserJoinedECSTPayload](
                     event_name="UserJoined",
                     topic="user_ecst_events",
                     payload=UserJoinedECSTPayload(
                         user_id=request.user_id,
                         meeting_id=request.meeting_id,
                     ),
                 ),
             )
-            await self.outbox.commit(session)
+            await self.outbox.commit()
🤖 Fix all issues with AI agents
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-50: The fixture definition saga_session_factory currently
declares a non-existent parameter saga_engine and uses init_saga_orm as a used
name; change the parameter to engine (the real fixture) and rename init_saga_orm
to _init_saga_orm to mark it intentionally unused while keeping the dependency
so saga tables are initialized before saga_session_factory runs.
- Around line 7-9: The saga_session_factory fixture references an undefined
parameter name: replace the incorrect saga_engine parameter with engine in the
saga_session_factory function signature so pytest can resolve the fixture
(search for saga_session_factory and change its parameter to engine); also
update the DATABASE_URL default from the in-memory URI
("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI (e.g.,
"sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables are
visible across connections during tests.
- Around line 24-36: The fixture must use a nested SAVEPOINT so tests can call
session.commit() without ending the outer fixture transaction; change the
session fixture to (1) start the outer connection transaction as you already do,
(2) create the async session via async_sessionmaker(bind=connection,...), (3)
start a nested transaction on the session with await session.begin_nested(), and
(4) register an after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.

In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Around line 147-169: The test function test_get_new_event_positive contains a
redundant inline comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.
- Around line 79-83: The cleanup_table fixture uses a non-portable "TRUNCATE
TABLE" SQL and an unused parameter; change the fixture named cleanup_table to
use SQLAlchemy's delete operation against TestOutboxModel (e.g., execute a
sqla.delete(TestOutboxModel) via the session and commit) so it works with
SQLite, and rename the unused setup_db parameter to _setup_db to silence linter
warnings while keeping session and TestOutboxModel references intact.
🧹 Nitpick comments (6)
src/cqrs/saga/storage/sqlalchemy.py (1)

10-21: Chain the ImportError for clearer diagnostics.

Line 10-21: Re-raising without exception chaining drops the original ImportError context; consider from err (or from None) to improve debugging and satisfy Ruff B904.

💡 Suggested change
-except ImportError:
-    raise ImportError(
+except ImportError as err:
+    raise ImportError(
         "You are trying to use SQLAlchemy saga storage implementation, "
         "but 'sqlalchemy' is not installed. "
         "Please install it using: pip install python-cqrs[sqlalchemy]"
-    )
+    ) from err
pyproject.toml (1)

24-27: Confirm whether broker clients should be core deps.

Line 24-27 and Line 49-54: Adding aio-pika and confluent-kafka to core dependencies forces native client libs for all installs. If these transports are meant to stay optional (extras already exist), consider keeping them only in extras.

♻️ Optional change (if brokers should remain optional)
-  "aio-pika>=9.3.0",
-  "confluent-kafka>=2.6.0",

Also applies to: 49-54

src/cqrs/outbox/sqlalchemy.py (4)

12-23: Chain the ImportError in the optional SQLAlchemy guard.

Line 12-23: Re-raising without exception chaining drops the original ImportError context; consider from err (or from None) to improve diagnostics.

💡 Suggested change
-except ImportError:
-    raise ImportError(
+except ImportError as err:
+    raise ImportError(
         "You are trying to use SQLAlchemy outbox implementation, "
         "but 'sqlalchemy' is not installed. "
         "Please install it using: pip install python-cqrs[sqlalchemy]"
-    )
+    ) from err

31-58: Normalize UUID types across dialects.

Line 31-58: Depending on SQLAlchemy/driver settings, Postgres UUIDs can come back as strings and MySQL can return bytearray/memoryview. Consider explicitly requesting as_uuid=True and normalizing bytes-like values to avoid type drift.

💡 Suggested change
-        if dialect.name == "postgresql":
-            return dialect.type_descriptor(postgresql.UUID())
+        if dialect.name == "postgresql":
+            return dialect.type_descriptor(postgresql.UUID(as_uuid=True))
@@
-        if isinstance(value, uuid.UUID):
-            return value.bytes # For MySQL return 16 bytes
+        if isinstance(value, str):
+            value = uuid.UUID(value)
+        if isinstance(value, uuid.UUID):
+            return value.bytes  # For MySQL return 16 bytes
@@
-        if isinstance(value, bytes):
-            return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
+        if isinstance(value, (bytes, bytearray, memoryview)):
+            return uuid.UUID(bytes=bytes(value))  # Normalize bytes-like

113-121: Avoid unique-constraint name collisions.

Line 113-121: The fixed constraint name can collide if multiple OutboxModelMixin subclasses share the same schema. Consider incorporating the table name (or omit the explicit name and rely on naming conventions).

💡 Suggested change
-    def __table_args__(self):
+    def __table_args__(cls):
         return (
             sqlalchemy.UniqueConstraint(
                 "event_id",
                 "event_name",
-                name="event_id_unique_index",
+                name=f"{cls.__tablename__}_event_id_unique",
             ),
         )

222-227: Confirm behavior for unknown event names.

Line 222-227: Returning None means rows with unknown event_name will be skipped and remain in the outbox. If you want to avoid repeated warnings or stuck rows, consider marking them or surfacing an error.

Comment on lines +7 to +9
dotenv.load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cat -n tests/integration/sqlalchemy/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 1776


🏁 Script executed:

# Search for saga_engine definition
rg "saga_engine" --type py tests/

Repository: vadikko2/python-cqrs

Length of output: 277


🏁 Script executed:

# Check for parent conftest files
find tests/ -name "conftest.py" -type f

Repository: vadikko2/python-cqrs

Length of output: 155


🏁 Script executed:

# Check if there are imports at the top or if this is a missing fixture
rg "def saga_engine" --type py

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

cat -n tests/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 446


🏁 Script executed:

cat -n tests/unit/test_saga/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 10078


🏁 Script executed:

# Check the integration fixtures referenced in tests/conftest.py
cat -n tests/integration/fixtures.py 2>/dev/null || echo "File not found"

Repository: vadikko2/python-cqrs

Length of output: 78


🏁 Script executed:

# Search for where saga_engine might be defined
rg "saga_engine" --type py -A 2 -B 2

Repository: vadikko2/python-cqrs

Length of output: 393


🏁 Script executed:

# Check if any tests actually use these fixtures
rg "saga_session_factory|init_saga_orm" --type py tests/

Repository: vadikko2/python-cqrs

Length of output: 1335


🏁 Script executed:

# Check if there are any tests in the sqlalchemy integration directory
find tests/integration/sqlalchemy -name "*.py" -type f

Repository: vadikko2/python-cqrs

Length of output: 318


🏁 Script executed:

# Check the test file to understand the fixture signature
head -n 50 tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.py

Repository: vadikko2/python-cqrs

Length of output: 1459


🏁 Script executed:

# Look for any fixture definitions that might provide saga_engine
rg "def saga_engine|@pytest.fixture.*saga" --type py -B 2

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

# Check if saga_engine is meant to be engine or if it's missing
cat -n tests/integration/sqlalchemy/conftest.py | grep -A 5 "def saga_session_factory"

Repository: vadikko2/python-cqrs

Length of output: 209


🏁 Script executed:

# Look at how the session fixture works - does it actually share the schema?
cat -n tests/integration/sqlalchemy/conftest.py | head -40

Repository: vadikko2/python-cqrs

Length of output: 1285


🏁 Script executed:

# Check if tests are actually running successfully with the current setup
cd tests/integration/sqlalchemy && python -m pytest --collect-only test_saga_storage_sqlalchemy.py 2>&1 | head -30

Repository: vadikko2/python-cqrs

Length of output: 255


🏁 Script executed:

# Check if there's a fixtures module that should be loaded
find tests -name "fixtures.py" -o -name "*fixture*" -type f | head -20

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

# Look at the pytest_plugins reference in tests/conftest.py
grep -n "pytest_plugins\|import.*fixtures" tests/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 113


🏁 Script executed:

# Check git history to see if saga_engine was recently removed
git log --oneline --all -- "*sagaengine*" "*saga_engine*" 2>/dev/null | head -10

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

# List all files in tests/integration
find tests/integration -type f -name "*.py"

Repository: vadikko2/python-cqrs

Length of output: 616


🏁 Script executed:

# Check if tests/integration/fixtures exists or was misspelled
ls -la tests/integration/ | grep -i fixture

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

# Check if saga_engine might be a typo and should be engine
cat tests/integration/sqlalchemy/conftest.py | grep -E "(engine|saga_engine)"

Repository: vadikko2/python-cqrs

Length of output: 480


🏁 Script executed:

# Check all fixture names in conftest
grep "^def " tests/integration/sqlalchemy/conftest.py | grep -v "__"

Repository: vadikko2/python-cqrs

Length of output: 135


🏁 Script executed:

# Try to understand the intent - check if engine is used elsewhere
rg "engine" tests/integration/sqlalchemy/conftest.py -n

Repository: vadikko2/python-cqrs

Length of output: 510


Add missing saga_engine fixture and reconsider SQLite in-memory default.

The saga_session_factory fixture on line 50 references an undefined saga_engine parameter, causing fixture resolution failures in tests that depend on it. This appears to be a typo for engine. Additionally, the default SQLite in-memory database (sqlite+aiosqlite:///:memory:) isolates schema across connections, which may cause issues when init_saga_orm creates tables in one connection context and test sessions use separate connections.

Recommended fixes:

  1. Rename saga_engine parameter to engine in line 50
  2. Change the default DATABASE_URL to a file-based or shared-cache SQLite URI: sqlite+aiosqlite:///./.cqrs_test.db
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 7 - 9, The
saga_session_factory fixture references an undefined parameter name: replace the
incorrect saga_engine parameter with engine in the saga_session_factory function
signature so pytest can resolve the fixture (search for saga_session_factory and
change its parameter to engine); also update the DATABASE_URL default from the
in-memory URI ("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI
(e.g., "sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables
are visible across connections during tests.

Comment on lines +24 to +36
@pytest.fixture(scope="function")
async def session(engine):
connection = await engine.connect()
transaction = await connection.begin()

session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False)
session = session_maker()

yield session

await session.close()
await transaction.rollback()
await connection.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n --type=py "commit\(" tests -g '!**/site-packages/**'

Repository: vadikko2/python-cqrs

Length of output: 1268


🏁 Script executed:

cat -n tests/integration/sqlalchemy/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 1776


🏁 Script executed:

head -150 tests/integration/sqlalchemy/test_event_outbox.py

Repository: vadikko2/python-cqrs

Length of output: 4239


🌐 Web query:

SQLAlchemy async nested transaction SAVEPOINT pattern async_sessionmaker

💡 Result:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.exc import IntegrityError

engine = create_async_engine(DB_URL, echo=False)

Session = async_sessionmaker(engine, expire_on_commit=False)

async def do_work(records):
    async with Session() as session:
        # outer transaction (COMMIT/ROLLBACK)
        async with session.begin():
            for rec in records:
                try:
                    # nested transaction == SAVEPOINT / ROLLBACK TO SAVEPOINT
                    async with session.begin_nested():
                        session.add(rec)
                        # optional: await session.flush()  # force constraint checks here
                except IntegrityError:
                    # the SAVEPOINT block is rolled back automatically on exception;
                    # outer transaction stays active
                    pass
            # exiting session.begin() commits the outer transaction

Notes:

  • AsyncSession.begin_nested() starts a SAVEPOINT (nested transaction) and can be used as an async context manager. (docs.sqlalchemy.org)
  • Semantics match the synchronous Session.begin_nested() SAVEPOINT pattern: inner failures roll back to the savepoint without aborting the outer transaction. (docs.sqlalchemy.org)

Citations:


🏁 Script executed:

# Check SQLAlchemy version and async session capabilities
rg -n "sqlalchemy" pyproject.toml setup.py setup.cfg requirements.txt 2>/dev/null | head -20

Repository: vadikko2/python-cqrs

Length of output: 195


🌐 Web query:

SQLAlchemy 2.0 async session after_transaction_end event listener restart_savepoint

💡 Result:

To “restart” a SAVEPOINT whenever the inner (nested) transaction ends (common in test fixtures), you still use the ORM Session event after_transaction_end—but with AsyncSession you attach the listener to the underlying sync session (async_session.sync_session). SQLAlchemy explicitly documents that events for AsyncSession are registered against the sync-style Session / sync_session. [1]

AsyncSession SAVEPOINT restart pattern

from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncSession

async def setup_nested_tx(async_session: AsyncSession):
    # establish the first SAVEPOINT
    await async_session.begin_nested()

    `@event.listens_for`(async_session.sync_session, "after_transaction_end")
    def restart_savepoint(session, transaction):
        # fired for *all* SessionTransaction objects, including nested ones [2]
        if transaction.nested and transaction.parent and not transaction.parent.nested:
            # optional: mimic outer commit behavior used in some recipes
            # session.expire_all()
            session.begin_nested()  # re-open SAVEPOINT (sync call inside event)

    return restart_savepoint  # keep ref if you want to remove it later

Why the condition? after_transaction_end is invoked for nested (SAVEPOINT) transactions as well as outer transactions. SQLAlchemy documents using transaction.nested to detect SAVEPOINTs and transaction.parent is None to detect outermost transactions. [2] The “restart when a SAVEPOINT ends” recipe checks for “nested ended, but parent is not nested”, then opens a new SAVEPOINT. [3]

Key async-specific detail: inside these event handlers you use the sync-style Session/Connection APIs (no await), because the handler is invoked in the synchronous event system even in asyncio usage. [1]

Sources:
[1] SQLAlchemy asyncio docs on AsyncSession.sync_session + how to register events for async sessions. (docs.sqlalchemy.org)
[2] SQLAlchemy ORM event docs for after_transaction_end / nested / parent. (docs.sqlalchemy.org)
[3] SQLAlchemy docs recipe demonstrating after_transaction_end “restart_savepoint”. (docs.sqlalchemy.org)

Citations:


🏁 Script executed:

# Look for any existing uses of event listeners in the codebase
rg -n "event.listens_for\|from sqlalchemy import event" tests/ src/ --type=py 2>/dev/null

Repository: vadikko2/python-cqrs

Length of output: 46


🌐 Web query:

SQLAlchemy transaction._parent vs transaction.parent attribute

💡 Result:

In SQLAlchemy ORM, transaction._parent is the internal/backwards-compat attribute, while transaction.parent is the public API.

  • SessionTransaction.parent (public, supported): points to the enclosing SessionTransaction, or None if it’s the top-level transaction. It’s the attribute you should use in application/event code. [1]
  • SessionTransaction._parent (private, internal): older/legacy/internal name for the same concept; SQLAlchemy docs note that parent was added and that you’d use ._parent only on older versions that didn’t have parent. [2]

That’s why older recipes/event handlers often reference _parent (e.g. checking transaction.nested vs transaction._parent.nested). [3]

Recommendation: use transaction.parent on supported SQLAlchemy versions; only fall back to ._parent if you’re maintaining compatibility with very old SQLAlchemy releases. [1][2]

Sources:
[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)
[3] (docs.sqlalchemy.org)

Citations:


🏁 Script executed:

# Check if there are any existing transaction handling patterns in the codebase
rg -n "transaction\." tests/ src/ --type=py -A 2 | head -40

Repository: vadikko2/python-cqrs

Length of output: 781


Use nested transactions (SAVEPOINT) with event listener to allow test commits while rolling back fixture transaction.

Per-test transaction will fail on teardown rollback once tests call session.commit() via the repository. The outer transaction closes, preventing the fixture's teardown rollback. Use a nested transaction pattern with begin_nested() and after_transaction_end event listener to restart the SAVEPOINT after each test commit.

🛠️ Proposed fix (nested transaction pattern)
+from sqlalchemy import event
-import asyncio
+import asyncio
 from sqlalchemy.ext import asyncio as sqla_async
@@
     session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False)
     session = session_maker()
+    await session.begin_nested()
+
+    `@event.listens_for`(session.sync_session, "after_transaction_end")
+    def restart_savepoint(sess, trans):
+        if trans.nested and trans.parent and not trans.parent.nested:
+            sess.begin_nested()
+
     yield session

Note: For SQLAlchemy 2.0+, use the public parent attribute instead of the private _parent.

🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 24 - 36, The fixture
must use a nested SAVEPOINT so tests can call session.commit() without ending
the outer fixture transaction; change the session fixture to (1) start the outer
connection transaction as you already do, (2) create the async session via
async_sessionmaker(bind=connection,...), (3) start a nested transaction on the
session with await session.begin_nested(), and (4) register an
after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.

Comment on lines +49 to +50
@pytest.fixture(scope="session")
def saga_session_factory(saga_engine, init_saga_orm):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the full fixture in conftest.py to see the complete context
cat -n tests/integration/sqlalchemy/conftest.py | sed -n '45,55p'

Repository: vadikko2/python-cqrs

Length of output: 343


🏁 Script executed:

# Check if saga_engine fixture is defined anywhere in the tests directory
rg -n "def saga_engine\b" tests/

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

# Check for any saga_engine references in the entire tests directory
rg -n "saga_engine" tests/ | head -20

Repository: vadikko2/python-cqrs

Length of output: 283


🏁 Script executed:

# Check what other fixtures are defined in the same conftest.py file
rg -n "@pytest.fixture" tests/integration/sqlalchemy/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 244


🏁 Script executed:

cat -n tests/integration/sqlalchemy/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 1776


Fix fixture parameter saga_engine to engine and mark init_saga_orm unused.

The saga_engine fixture doesn't exist in the test suite; only engine is defined. The init_saga_orm parameter is correctly kept as a dependency to ensure saga tables are initialized before this factory, but should be renamed to _init_saga_orm to indicate it's intentionally unused in the function body.

Required fix
`@pytest.fixture`(scope="session")
-def saga_session_factory(saga_engine, init_saga_orm):
-    return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False)
+def saga_session_factory(engine, _init_saga_orm):
+    return sqla_async.async_sessionmaker(engine, expire_on_commit=False)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@pytest.fixture(scope="session")
def saga_session_factory(saga_engine, init_saga_orm):
`@pytest.fixture`(scope="session")
def saga_session_factory(engine, _init_saga_orm):
return sqla_async.async_sessionmaker(engine, expire_on_commit=False)
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 50-50: Unused function argument: init_saga_orm

(ARG001)

🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 49 - 50, The fixture
definition saga_session_factory currently declares a non-existent parameter
saga_engine and uses init_saga_orm as a used name; change the parameter to
engine (the real fixture) and rename init_saga_orm to _init_saga_orm to mark it
intentionally unused while keeping the dependency so saga tables are initialized
before saga_session_factory runs.

Comment on lines +79 to +83
@pytest.fixture(autouse=True)
async def cleanup_table(self, session, setup_db):
await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}"))
await session.commit()
yield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "test_event_outbox.py" -type f

Repository: vadikko2/python-cqrs

Length of output: 115


🏁 Script executed:

cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '70,95p'

Repository: vadikko2/python-cqrs

Length of output: 1121


🏁 Script executed:

cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '1,40p'

Repository: vadikko2/python-cqrs

Length of output: 1090


🏁 Script executed:

rg -n "setup_db|TRUNCATE|DELETE" tests/integration/sqlalchemy/test_event_outbox.py

Repository: vadikko2/python-cqrs

Length of output: 256


🏁 Script executed:

rg -n "DATABASE_URL|test.*url|sqlite" tests/integration/ --max-count=20

Repository: vadikko2/python-cqrs

Length of output: 305


🏁 Script executed:

find tests -name "conftest.py" -type f

Repository: vadikko2/python-cqrs

Length of output: 155


🏁 Script executed:

cat tests/integration/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 125


🏁 Script executed:

rg -n "sqla\.delete|delete\(" tests/ --type py | head -20

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

cat -n tests/integration/sqlalchemy/conftest.py

Repository: vadikko2/python-cqrs

Length of output: 1776


🏁 Script executed:

web_search
SQLite TRUNCATE TABLE support compatibility

Repository: vadikko2/python-cqrs

Length of output: 159


🌐 Web query:

SQLite TRUNCATE TABLE support compatibility

💡 Result:

SQLite does not support the TRUNCATE TABLE statement/syntax. If you run TRUNCATE TABLE t; you’ll get a syntax error, so SQL written for MySQL/Postgres/SQL Server needs a compatibility change. [1]

SQLite-compatible equivalents

  • Delete all rows (portable replacement):

    DELETE FROM t;

    When a DELETE has no WHERE and no RETURNING, and the table has no triggers, SQLite may apply its “truncate optimization” to erase the table contents without visiting rows one-by-one (i.e., closer to what other DBs mean by “truncate”). [2]

  • If you also need to reset an AUTOINCREMENT counter:
    SQLite tracks AUTOINCREMENT using the internal sqlite_sequence table. [3] A common pattern is:

    DELETE FROM t;
    DELETE FROM sqlite_sequence WHERE name='t';

    (Only relevant if the table was created with INTEGER PRIMARY KEY AUTOINCREMENT.) [3]

Sources:
[1] https://www.sqlitetutor.com/truncate-table/ (sqlitetutor.com)
[2] SQLite DELETE docs (“Truncate Optimization”) (sqlite.org)
[3] SQLite AUTOINCREMENT / sqlite_sequence docs (sqlite.org)

Citations:


🏁 Script executed:

cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '70,100p'

Repository: vadikko2/python-cqrs

Length of output: 1331


🏁 Script executed:

rg -n "sqla.delete\|from_statement\|DELETE FROM" . --type py

Repository: vadikko2/python-cqrs

Length of output: 46


🏁 Script executed:

rg -B5 -A10 "class TestOutboxModel" tests/integration/sqlalchemy/test_event_outbox.py

Repository: vadikko2/python-cqrs

Length of output: 343


🏁 Script executed:

web_search
SQLAlchemy delete statement syntax sqla.delete()

Repository: vadikko2/python-cqrs

Length of output: 248


🌐 Web query:

SQLAlchemy delete statement syntax sqla.delete

💡 Result:

SQLAlchemy delete() / sqla.delete syntax (Core + ORM)

1) Core (table-based) DELETE

from sqlalchemy import delete

stmt = delete(user_table).where(user_table.c.name == "patrick")
result = conn.execute(stmt)          # or: await conn.execute(stmt)
conn.commit()

print(result.rowcount)

This compiles to DELETE FROM user_account WHERE .... [1]

2) DELETE with RETURNING (if the DB supports it)

stmt = (
    delete(user_table)
    .where(user_table.c.name == "patrick")
    .returning(user_table.c.id, user_table.c.name)
)

rows = conn.execute(stmt).all()
conn.commit()

Delete.returning() returns deleted rows/columns. [1]

3) ORM-enabled DELETE (class-based) via Session.execute

from sqlalchemy import delete

stmt = delete(User).where(User.name.in_(["squidward", "sandy"]))
session.execute(stmt)
session.commit()

This emits a single SQL DELETE affecting all matching rows. [2]

4) Notes (ORM)

  • ORM-enabled bulk delete(User)... bypasses normal unit-of-work behavior (no in-Python relationship cascading); rely on DB cascades / constraints as appropriate. [2]

Source of truth: SQLAlchemy docs for delete() and ORM-enabled DELETE. [1] [2] [3]


Replace TRUNCATE TABLE with DELETE for SQLite compatibility; rename unused fixture parameter.

TRUNCATE TABLE is not supported by SQLite (the default test database), causing test failures in local runs. Use SQLAlchemy's delete() method instead, which is portable across all databases. Additionally, the setup_db parameter is unused and should be renamed to _setup_db to suppress linting warnings.

🛠️ Proposed fix
-    async def cleanup_table(self, session, setup_db):
-        await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}"))
+    async def cleanup_table(self, session, _setup_db):
+        await session.execute(sqla.delete(TestOutboxModel))
         await session.commit()
         yield
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@pytest.fixture(autouse=True)
async def cleanup_table(self, session, setup_db):
await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}"))
await session.commit()
yield
`@pytest.fixture`(autouse=True)
async def cleanup_table(self, session, _setup_db):
await session.execute(sqla.delete(TestOutboxModel))
await session.commit()
yield
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 80-80: Unused method argument: setup_db

(ARG002)

🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/test_event_outbox.py` around lines 79 - 83, The
cleanup_table fixture uses a non-portable "TRUNCATE TABLE" SQL and an unused
parameter; change the fixture named cleanup_table to use SQLAlchemy's delete
operation against TestOutboxModel (e.g., execute a sqla.delete(TestOutboxModel)
via the session and commit) so it works with SQLite, and rename the unused
setup_db parameter to _setup_db to silence linter warnings while keeping session
and TestOutboxModel references intact.

Comment on lines +147 to 169
async def test_get_new_event_positive(self, outbox_repo):
"""
checks getting one event positive
"""
repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session)
request = OutboxRequest(
message="test_outbox_mark_event_as_produced_positive",
count=1,
)
await OutboxRequestHandler(repository).handle(request)
[event_over_get_all_events_method] = await repository.get_many(1)
await OutboxRequestHandler(outbox_repo).handle(request)
[event_over_get_all_events_method] = await outbox_repo.get_many(1)

event: outbox_repository.OutboxedEvent | None = next(
iter(
await repository.get_many(
await outbox_repo.get_many(
batch_size=1,
),
),
None,
)
await session.commit()
await outbox_repo.commit()

assert event
assert event.id == event_over_get_all_events_method.id # noqa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove the unused noqa.

The # noqa is now redundant and flagged by Ruff.

✂️ Proposed fix
-        assert event.id == event_over_get_all_events_method.id  # noqa
+        assert event.id == event_over_get_all_events_method.id
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def test_get_new_event_positive(self, outbox_repo):
"""
checks getting one event positive
"""
repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session)
request = OutboxRequest(
message="test_outbox_mark_event_as_produced_positive",
count=1,
)
await OutboxRequestHandler(repository).handle(request)
[event_over_get_all_events_method] = await repository.get_many(1)
await OutboxRequestHandler(outbox_repo).handle(request)
[event_over_get_all_events_method] = await outbox_repo.get_many(1)
event: outbox_repository.OutboxedEvent | None = next(
iter(
await repository.get_many(
await outbox_repo.get_many(
batch_size=1,
),
),
None,
)
await session.commit()
await outbox_repo.commit()
assert event
assert event.id == event_over_get_all_events_method.id # noqa
async def test_get_new_event_positive(self, outbox_repo):
"""
checks getting one event positive
"""
request = OutboxRequest(
message="test_outbox_mark_event_as_produced_positive",
count=1,
)
await OutboxRequestHandler(outbox_repo).handle(request)
[event_over_get_all_events_method] = await outbox_repo.get_many(1)
event: outbox_repository.OutboxedEvent | None = next(
iter(
await outbox_repo.get_many(
batch_size=1,
),
),
None,
)
await outbox_repo.commit()
assert event
assert event.id == event_over_get_all_events_method.id
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 169-169: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/test_event_outbox.py` around lines 147 - 169,
The test function test_get_new_event_positive contains a redundant inline
comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants