-
Notifications
You must be signed in to change notification settings - Fork 5
[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7 #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
✨ Finishing touches🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/cqrs/__init__.py (1)
16-61:⚠️ Potential issue | 🟠 MajorGuard SQLAlchemy import to keep base package usable without the optional dependency.
Line 16 (and line 50 in
__all__): Importingcqrs.outbox.sqlalchemyunconditionally breaks when SQLAlchemy isn't installed. Sincesqlalchemyis an optional dependency (defined inpyproject.tomllines 71-73, not in main dependencies), users installing without the[sqlalchemy]extra will fail onimport cqrs. Use the same conditional import pattern already established incqrs/outbox/__init__.py(lines 10-19).🛠️ Suggested fix
-from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository +try: + from cqrs.outbox.sqlalchemy import SqlAlchemyOutboxedEventRepository +except ImportError: + SqlAlchemyOutboxedEventRepository = None @@ -__all__ = ( +__all__ = [ @@ - "OutboxedEventRepository", - "SqlAlchemyOutboxedEventRepository", + "OutboxedEventRepository", @@ -) +] +if SqlAlchemyOutboxedEventRepository is not None: + __all__.append("SqlAlchemyOutboxedEventRepository")README.md (1)
470-528:⚠️ Potential issue | 🟡 MinorFix the outbox example to match the actual repository API.
The code example incorrectly passes
sessiontoself.outbox.add()andself.outbox.commit(). TheSqlAlchemyOutboxedEventRepositorystores the session at initialization and binds both methods to that session—neither method accepts a session parameter.📌 Required doc update
- self.outbox.add( - session, + self.outbox.add( cqrs.NotificationEvent[UserJoinedNotificationPayload]( event_name="UserJoined", topic="user_notification_events", payload=UserJoinedNotificationPayload( user_id=request.user_id, meeting_id=request.meeting_id, ), ), ) - self.outbox.add( - session, + self.outbox.add( cqrs.NotificationEvent[UserJoinedECSTPayload]( event_name="UserJoined", topic="user_ecst_events", payload=UserJoinedECSTPayload( user_id=request.user_id, meeting_id=request.meeting_id, ), ), ) - await self.outbox.commit(session) + await self.outbox.commit()
🤖 Fix all issues with AI agents
In `@tests/integration/sqlalchemy/conftest.py`:
- Around line 49-50: The fixture definition saga_session_factory currently
declares a non-existent parameter saga_engine and uses init_saga_orm as a used
name; change the parameter to engine (the real fixture) and rename init_saga_orm
to _init_saga_orm to mark it intentionally unused while keeping the dependency
so saga tables are initialized before saga_session_factory runs.
- Around line 7-9: The saga_session_factory fixture references an undefined
parameter name: replace the incorrect saga_engine parameter with engine in the
saga_session_factory function signature so pytest can resolve the fixture
(search for saga_session_factory and change its parameter to engine); also
update the DATABASE_URL default from the in-memory URI
("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI (e.g.,
"sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables are
visible across connections during tests.
- Around line 24-36: The fixture must use a nested SAVEPOINT so tests can call
session.commit() without ending the outer fixture transaction; change the
session fixture to (1) start the outer connection transaction as you already do,
(2) create the async session via async_sessionmaker(bind=connection,...), (3)
start a nested transaction on the session with await session.begin_nested(), and
(4) register an after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.
In `@tests/integration/sqlalchemy/test_event_outbox.py`:
- Around line 147-169: The test function test_get_new_event_positive contains a
redundant inline comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.
- Around line 79-83: The cleanup_table fixture uses a non-portable "TRUNCATE
TABLE" SQL and an unused parameter; change the fixture named cleanup_table to
use SQLAlchemy's delete operation against TestOutboxModel (e.g., execute a
sqla.delete(TestOutboxModel) via the session and commit) so it works with
SQLite, and rename the unused setup_db parameter to _setup_db to silence linter
warnings while keeping session and TestOutboxModel references intact.
🧹 Nitpick comments (6)
src/cqrs/saga/storage/sqlalchemy.py (1)
10-21: Chain the ImportError for clearer diagnostics.Line 10-21: Re-raising without exception chaining drops the original ImportError context; consider
from err(orfrom None) to improve debugging and satisfy Ruff B904.💡 Suggested change
-except ImportError: - raise ImportError( +except ImportError as err: + raise ImportError( "You are trying to use SQLAlchemy saga storage implementation, " "but 'sqlalchemy' is not installed. " "Please install it using: pip install python-cqrs[sqlalchemy]" - ) + ) from errpyproject.toml (1)
24-27: Confirm whether broker clients should be core deps.Line 24-27 and Line 49-54: Adding
aio-pikaandconfluent-kafkato core dependencies forces native client libs for all installs. If these transports are meant to stay optional (extras already exist), consider keeping them only in extras.♻️ Optional change (if brokers should remain optional)
- "aio-pika>=9.3.0", - "confluent-kafka>=2.6.0",Also applies to: 49-54
src/cqrs/outbox/sqlalchemy.py (4)
12-23: Chain the ImportError in the optional SQLAlchemy guard.Line 12-23: Re-raising without exception chaining drops the original ImportError context; consider
from err(orfrom None) to improve diagnostics.💡 Suggested change
-except ImportError: - raise ImportError( +except ImportError as err: + raise ImportError( "You are trying to use SQLAlchemy outbox implementation, " "but 'sqlalchemy' is not installed. " "Please install it using: pip install python-cqrs[sqlalchemy]" - ) + ) from err
31-58: Normalize UUID types across dialects.Line 31-58: Depending on SQLAlchemy/driver settings, Postgres UUIDs can come back as strings and MySQL can return bytearray/memoryview. Consider explicitly requesting
as_uuid=Trueand normalizing bytes-like values to avoid type drift.💡 Suggested change
- if dialect.name == "postgresql": - return dialect.type_descriptor(postgresql.UUID()) + if dialect.name == "postgresql": + return dialect.type_descriptor(postgresql.UUID(as_uuid=True)) @@ - if isinstance(value, uuid.UUID): - return value.bytes # For MySQL return 16 bytes + if isinstance(value, str): + value = uuid.UUID(value) + if isinstance(value, uuid.UUID): + return value.bytes # For MySQL return 16 bytes @@ - if isinstance(value, bytes): - return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID + if isinstance(value, (bytes, bytearray, memoryview)): + return uuid.UUID(bytes=bytes(value)) # Normalize bytes-like
113-121: Avoid unique-constraint name collisions.Line 113-121: The fixed constraint name can collide if multiple
OutboxModelMixinsubclasses share the same schema. Consider incorporating the table name (or omit the explicit name and rely on naming conventions).💡 Suggested change
- def __table_args__(self): + def __table_args__(cls): return ( sqlalchemy.UniqueConstraint( "event_id", "event_name", - name="event_id_unique_index", + name=f"{cls.__tablename__}_event_id_unique", ), )
222-227: Confirm behavior for unknown event names.Line 222-227: Returning
Nonemeans rows with unknownevent_namewill be skipped and remain in the outbox. If you want to avoid repeated warnings or stuck rows, consider marking them or surfacing an error.
| dotenv.load_dotenv() | ||
| DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///:memory:") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n tests/integration/sqlalchemy/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 1776
🏁 Script executed:
# Search for saga_engine definition
rg "saga_engine" --type py tests/Repository: vadikko2/python-cqrs
Length of output: 277
🏁 Script executed:
# Check for parent conftest files
find tests/ -name "conftest.py" -type fRepository: vadikko2/python-cqrs
Length of output: 155
🏁 Script executed:
# Check if there are imports at the top or if this is a missing fixture
rg "def saga_engine" --type pyRepository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
cat -n tests/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 446
🏁 Script executed:
cat -n tests/unit/test_saga/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 10078
🏁 Script executed:
# Check the integration fixtures referenced in tests/conftest.py
cat -n tests/integration/fixtures.py 2>/dev/null || echo "File not found"Repository: vadikko2/python-cqrs
Length of output: 78
🏁 Script executed:
# Search for where saga_engine might be defined
rg "saga_engine" --type py -A 2 -B 2Repository: vadikko2/python-cqrs
Length of output: 393
🏁 Script executed:
# Check if any tests actually use these fixtures
rg "saga_session_factory|init_saga_orm" --type py tests/Repository: vadikko2/python-cqrs
Length of output: 1335
🏁 Script executed:
# Check if there are any tests in the sqlalchemy integration directory
find tests/integration/sqlalchemy -name "*.py" -type fRepository: vadikko2/python-cqrs
Length of output: 318
🏁 Script executed:
# Check the test file to understand the fixture signature
head -n 50 tests/integration/sqlalchemy/test_saga_storage_sqlalchemy.pyRepository: vadikko2/python-cqrs
Length of output: 1459
🏁 Script executed:
# Look for any fixture definitions that might provide saga_engine
rg "def saga_engine|@pytest.fixture.*saga" --type py -B 2Repository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
# Check if saga_engine is meant to be engine or if it's missing
cat -n tests/integration/sqlalchemy/conftest.py | grep -A 5 "def saga_session_factory"Repository: vadikko2/python-cqrs
Length of output: 209
🏁 Script executed:
# Look at how the session fixture works - does it actually share the schema?
cat -n tests/integration/sqlalchemy/conftest.py | head -40Repository: vadikko2/python-cqrs
Length of output: 1285
🏁 Script executed:
# Check if tests are actually running successfully with the current setup
cd tests/integration/sqlalchemy && python -m pytest --collect-only test_saga_storage_sqlalchemy.py 2>&1 | head -30Repository: vadikko2/python-cqrs
Length of output: 255
🏁 Script executed:
# Check if there's a fixtures module that should be loaded
find tests -name "fixtures.py" -o -name "*fixture*" -type f | head -20Repository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
# Look at the pytest_plugins reference in tests/conftest.py
grep -n "pytest_plugins\|import.*fixtures" tests/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 113
🏁 Script executed:
# Check git history to see if saga_engine was recently removed
git log --oneline --all -- "*sagaengine*" "*saga_engine*" 2>/dev/null | head -10Repository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
# List all files in tests/integration
find tests/integration -type f -name "*.py"Repository: vadikko2/python-cqrs
Length of output: 616
🏁 Script executed:
# Check if tests/integration/fixtures exists or was misspelled
ls -la tests/integration/ | grep -i fixtureRepository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
# Check if saga_engine might be a typo and should be engine
cat tests/integration/sqlalchemy/conftest.py | grep -E "(engine|saga_engine)"Repository: vadikko2/python-cqrs
Length of output: 480
🏁 Script executed:
# Check all fixture names in conftest
grep "^def " tests/integration/sqlalchemy/conftest.py | grep -v "__"Repository: vadikko2/python-cqrs
Length of output: 135
🏁 Script executed:
# Try to understand the intent - check if engine is used elsewhere
rg "engine" tests/integration/sqlalchemy/conftest.py -nRepository: vadikko2/python-cqrs
Length of output: 510
Add missing saga_engine fixture and reconsider SQLite in-memory default.
The saga_session_factory fixture on line 50 references an undefined saga_engine parameter, causing fixture resolution failures in tests that depend on it. This appears to be a typo for engine. Additionally, the default SQLite in-memory database (sqlite+aiosqlite:///:memory:) isolates schema across connections, which may cause issues when init_saga_orm creates tables in one connection context and test sessions use separate connections.
Recommended fixes:
- Rename
saga_engineparameter toenginein line 50 - Change the default DATABASE_URL to a file-based or shared-cache SQLite URI:
sqlite+aiosqlite:///./.cqrs_test.db
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 7 - 9, The
saga_session_factory fixture references an undefined parameter name: replace the
incorrect saga_engine parameter with engine in the saga_session_factory function
signature so pytest can resolve the fixture (search for saga_session_factory and
change its parameter to engine); also update the DATABASE_URL default from the
in-memory URI ("sqlite+aiosqlite:///:memory:") to a file-based/shared-cache URI
(e.g., "sqlite+aiosqlite:///./.cqrs_test.db") so init_saga_orm-created tables
are visible across connections during tests.
| @pytest.fixture(scope="function") | ||
| async def session(engine): | ||
| connection = await engine.connect() | ||
| transaction = await connection.begin() | ||
|
|
||
| session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False) | ||
| session = session_maker() | ||
|
|
||
| yield session | ||
|
|
||
| await session.close() | ||
| await transaction.rollback() | ||
| await connection.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n --type=py "commit\(" tests -g '!**/site-packages/**'Repository: vadikko2/python-cqrs
Length of output: 1268
🏁 Script executed:
cat -n tests/integration/sqlalchemy/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 1776
🏁 Script executed:
head -150 tests/integration/sqlalchemy/test_event_outbox.pyRepository: vadikko2/python-cqrs
Length of output: 4239
🌐 Web query:
SQLAlchemy async nested transaction SAVEPOINT pattern async_sessionmaker
💡 Result:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.exc import IntegrityError
engine = create_async_engine(DB_URL, echo=False)
Session = async_sessionmaker(engine, expire_on_commit=False)
async def do_work(records):
async with Session() as session:
# outer transaction (COMMIT/ROLLBACK)
async with session.begin():
for rec in records:
try:
# nested transaction == SAVEPOINT / ROLLBACK TO SAVEPOINT
async with session.begin_nested():
session.add(rec)
# optional: await session.flush() # force constraint checks here
except IntegrityError:
# the SAVEPOINT block is rolled back automatically on exception;
# outer transaction stays active
pass
# exiting session.begin() commits the outer transactionNotes:
AsyncSession.begin_nested()starts a SAVEPOINT (nested transaction) and can be used as an async context manager. (docs.sqlalchemy.org)- Semantics match the synchronous
Session.begin_nested()SAVEPOINT pattern: inner failures roll back to the savepoint without aborting the outer transaction. (docs.sqlalchemy.org)
Citations:
- 1: https://docs.sqlalchemy.org/en/21/orm/extensions/asyncio.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/en/14/orm/session_transaction.html?utm_source=openai
🏁 Script executed:
# Check SQLAlchemy version and async session capabilities
rg -n "sqlalchemy" pyproject.toml setup.py setup.cfg requirements.txt 2>/dev/null | head -20Repository: vadikko2/python-cqrs
Length of output: 195
🌐 Web query:
SQLAlchemy 2.0 async session after_transaction_end event listener restart_savepoint
💡 Result:
To “restart” a SAVEPOINT whenever the inner (nested) transaction ends (common in test fixtures), you still use the ORM Session event after_transaction_end—but with AsyncSession you attach the listener to the underlying sync session (async_session.sync_session). SQLAlchemy explicitly documents that events for AsyncSession are registered against the sync-style Session / sync_session. [1]
AsyncSession SAVEPOINT restart pattern
from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncSession
async def setup_nested_tx(async_session: AsyncSession):
# establish the first SAVEPOINT
await async_session.begin_nested()
`@event.listens_for`(async_session.sync_session, "after_transaction_end")
def restart_savepoint(session, transaction):
# fired for *all* SessionTransaction objects, including nested ones [2]
if transaction.nested and transaction.parent and not transaction.parent.nested:
# optional: mimic outer commit behavior used in some recipes
# session.expire_all()
session.begin_nested() # re-open SAVEPOINT (sync call inside event)
return restart_savepoint # keep ref if you want to remove it laterWhy the condition? after_transaction_end is invoked for nested (SAVEPOINT) transactions as well as outer transactions. SQLAlchemy documents using transaction.nested to detect SAVEPOINTs and transaction.parent is None to detect outermost transactions. [2] The “restart when a SAVEPOINT ends” recipe checks for “nested ended, but parent is not nested”, then opens a new SAVEPOINT. [3]
Key async-specific detail: inside these event handlers you use the sync-style Session/Connection APIs (no await), because the handler is invoked in the synchronous event system even in asyncio usage. [1]
Sources:
[1] SQLAlchemy asyncio docs on AsyncSession.sync_session + how to register events for async sessions. (docs.sqlalchemy.org)
[2] SQLAlchemy ORM event docs for after_transaction_end / nested / parent. (docs.sqlalchemy.org)
[3] SQLAlchemy docs recipe demonstrating after_transaction_end “restart_savepoint”. (docs.sqlalchemy.org)
Citations:
- 1: https://docs.sqlalchemy.org/20/orm/extensions/asyncio.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/20/orm/events.html?utm_source=openai
- 3: https://docs.sqlalchemy.org/13/orm/session_transaction.html?utm_source=openai
🏁 Script executed:
# Look for any existing uses of event listeners in the codebase
rg -n "event.listens_for\|from sqlalchemy import event" tests/ src/ --type=py 2>/dev/nullRepository: vadikko2/python-cqrs
Length of output: 46
🌐 Web query:
SQLAlchemy transaction._parent vs transaction.parent attribute
💡 Result:
In SQLAlchemy ORM, transaction._parent is the internal/backwards-compat attribute, while transaction.parent is the public API.
SessionTransaction.parent(public, supported): points to the enclosingSessionTransaction, orNoneif it’s the top-level transaction. It’s the attribute you should use in application/event code. [1]SessionTransaction._parent(private, internal): older/legacy/internal name for the same concept; SQLAlchemy docs note thatparentwas added and that you’d use._parentonly on older versions that didn’t haveparent. [2]
That’s why older recipes/event handlers often reference _parent (e.g. checking transaction.nested vs transaction._parent.nested). [3]
Recommendation: use transaction.parent on supported SQLAlchemy versions; only fall back to ._parent if you’re maintaining compatibility with very old SQLAlchemy releases. [1][2]
Sources:
[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)
[3] (docs.sqlalchemy.org)
Citations:
- 1: https://docs.sqlalchemy.org/en/21/orm/session_api.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/en/13/orm/session_api.html?highlight=commit&utm_source=openai
- 3: https://docs.sqlalchemy.org/13/orm/session_transaction.html?utm_source=openai
🏁 Script executed:
# Check if there are any existing transaction handling patterns in the codebase
rg -n "transaction\." tests/ src/ --type=py -A 2 | head -40Repository: vadikko2/python-cqrs
Length of output: 781
Use nested transactions (SAVEPOINT) with event listener to allow test commits while rolling back fixture transaction.
Per-test transaction will fail on teardown rollback once tests call session.commit() via the repository. The outer transaction closes, preventing the fixture's teardown rollback. Use a nested transaction pattern with begin_nested() and after_transaction_end event listener to restart the SAVEPOINT after each test commit.
🛠️ Proposed fix (nested transaction pattern)
+from sqlalchemy import event
-import asyncio
+import asyncio
from sqlalchemy.ext import asyncio as sqla_async
@@
session_maker = sqla_async.async_sessionmaker(bind=connection, expire_on_commit=False)
session = session_maker()
+ await session.begin_nested()
+
+ `@event.listens_for`(session.sync_session, "after_transaction_end")
+ def restart_savepoint(sess, trans):
+ if trans.nested and trans.parent and not trans.parent.nested:
+ sess.begin_nested()
+
yield sessionNote: For SQLAlchemy 2.0+, use the public parent attribute instead of the private _parent.
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 24 - 36, The fixture
must use a nested SAVEPOINT so tests can call session.commit() without ending
the outer fixture transaction; change the session fixture to (1) start the outer
connection transaction as you already do, (2) create the async session via
async_sessionmaker(bind=connection,...), (3) start a nested transaction on the
session with await session.begin_nested(), and (4) register an
after-transaction-end event listener on the session (use the
session.sync_session for event.listen) that, when a subtransaction ends and the
transaction has a parent (use the public parent attribute), restarts a new
nested transaction by calling session.begin_nested(); keep yielding the session,
and on teardown close the session and rollback/close the outer connection
transaction as before.
| @pytest.fixture(scope="session") | ||
| def saga_session_factory(saga_engine, init_saga_orm): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the full fixture in conftest.py to see the complete context
cat -n tests/integration/sqlalchemy/conftest.py | sed -n '45,55p'Repository: vadikko2/python-cqrs
Length of output: 343
🏁 Script executed:
# Check if saga_engine fixture is defined anywhere in the tests directory
rg -n "def saga_engine\b" tests/Repository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
# Check for any saga_engine references in the entire tests directory
rg -n "saga_engine" tests/ | head -20Repository: vadikko2/python-cqrs
Length of output: 283
🏁 Script executed:
# Check what other fixtures are defined in the same conftest.py file
rg -n "@pytest.fixture" tests/integration/sqlalchemy/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 244
🏁 Script executed:
cat -n tests/integration/sqlalchemy/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 1776
Fix fixture parameter saga_engine to engine and mark init_saga_orm unused.
The saga_engine fixture doesn't exist in the test suite; only engine is defined. The init_saga_orm parameter is correctly kept as a dependency to ensure saga tables are initialized before this factory, but should be renamed to _init_saga_orm to indicate it's intentionally unused in the function body.
Required fix
`@pytest.fixture`(scope="session")
-def saga_session_factory(saga_engine, init_saga_orm):
- return sqla_async.async_sessionmaker(saga_engine, expire_on_commit=False)
+def saga_session_factory(engine, _init_saga_orm):
+ return sqla_async.async_sessionmaker(engine, expire_on_commit=False)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @pytest.fixture(scope="session") | |
| def saga_session_factory(saga_engine, init_saga_orm): | |
| `@pytest.fixture`(scope="session") | |
| def saga_session_factory(engine, _init_saga_orm): | |
| return sqla_async.async_sessionmaker(engine, expire_on_commit=False) |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 50-50: Unused function argument: init_saga_orm
(ARG001)
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/conftest.py` around lines 49 - 50, The fixture
definition saga_session_factory currently declares a non-existent parameter
saga_engine and uses init_saga_orm as a used name; change the parameter to
engine (the real fixture) and rename init_saga_orm to _init_saga_orm to mark it
intentionally unused while keeping the dependency so saga tables are initialized
before saga_session_factory runs.
| @pytest.fixture(autouse=True) | ||
| async def cleanup_table(self, session, setup_db): | ||
| await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}")) | ||
| await session.commit() | ||
| yield |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "test_event_outbox.py" -type fRepository: vadikko2/python-cqrs
Length of output: 115
🏁 Script executed:
cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '70,95p'Repository: vadikko2/python-cqrs
Length of output: 1121
🏁 Script executed:
cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '1,40p'Repository: vadikko2/python-cqrs
Length of output: 1090
🏁 Script executed:
rg -n "setup_db|TRUNCATE|DELETE" tests/integration/sqlalchemy/test_event_outbox.pyRepository: vadikko2/python-cqrs
Length of output: 256
🏁 Script executed:
rg -n "DATABASE_URL|test.*url|sqlite" tests/integration/ --max-count=20Repository: vadikko2/python-cqrs
Length of output: 305
🏁 Script executed:
find tests -name "conftest.py" -type fRepository: vadikko2/python-cqrs
Length of output: 155
🏁 Script executed:
cat tests/integration/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 125
🏁 Script executed:
rg -n "sqla\.delete|delete\(" tests/ --type py | head -20Repository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
cat -n tests/integration/sqlalchemy/conftest.pyRepository: vadikko2/python-cqrs
Length of output: 1776
🏁 Script executed:
web_search
SQLite TRUNCATE TABLE support compatibilityRepository: vadikko2/python-cqrs
Length of output: 159
🌐 Web query:
SQLite TRUNCATE TABLE support compatibility
💡 Result:
SQLite does not support the TRUNCATE TABLE statement/syntax. If you run TRUNCATE TABLE t; you’ll get a syntax error, so SQL written for MySQL/Postgres/SQL Server needs a compatibility change. [1]
SQLite-compatible equivalents
-
Delete all rows (portable replacement):
DELETE FROM t;
When a
DELETEhas noWHEREand noRETURNING, and the table has no triggers, SQLite may apply its “truncate optimization” to erase the table contents without visiting rows one-by-one (i.e., closer to what other DBs mean by “truncate”). [2] -
If you also need to reset an
AUTOINCREMENTcounter:
SQLite tracksAUTOINCREMENTusing the internalsqlite_sequencetable. [3] A common pattern is:DELETE FROM t; DELETE FROM sqlite_sequence WHERE name='t';
(Only relevant if the table was created with
INTEGER PRIMARY KEY AUTOINCREMENT.) [3]
Sources:
[1] https://www.sqlitetutor.com/truncate-table/ (sqlitetutor.com)
[2] SQLite DELETE docs (“Truncate Optimization”) (sqlite.org)
[3] SQLite AUTOINCREMENT / sqlite_sequence docs (sqlite.org)
Citations:
- 1: https://www.sqlitetutor.com/truncate-table/?utm_source=openai
- 2: https://sqlite.org/draft/lang_delete.html?utm_source=openai
- 3: https://www.sqlite.org/autoinc.html?utm_source=openai
🏁 Script executed:
cat -n tests/integration/sqlalchemy/test_event_outbox.py | sed -n '70,100p'Repository: vadikko2/python-cqrs
Length of output: 1331
🏁 Script executed:
rg -n "sqla.delete\|from_statement\|DELETE FROM" . --type pyRepository: vadikko2/python-cqrs
Length of output: 46
🏁 Script executed:
rg -B5 -A10 "class TestOutboxModel" tests/integration/sqlalchemy/test_event_outbox.pyRepository: vadikko2/python-cqrs
Length of output: 343
🏁 Script executed:
web_search
SQLAlchemy delete statement syntax sqla.delete()Repository: vadikko2/python-cqrs
Length of output: 248
🌐 Web query:
SQLAlchemy delete statement syntax sqla.delete
💡 Result:
SQLAlchemy delete() / sqla.delete syntax (Core + ORM)
1) Core (table-based) DELETE
from sqlalchemy import delete
stmt = delete(user_table).where(user_table.c.name == "patrick")
result = conn.execute(stmt) # or: await conn.execute(stmt)
conn.commit()
print(result.rowcount)This compiles to DELETE FROM user_account WHERE .... [1]
2) DELETE with RETURNING (if the DB supports it)
stmt = (
delete(user_table)
.where(user_table.c.name == "patrick")
.returning(user_table.c.id, user_table.c.name)
)
rows = conn.execute(stmt).all()
conn.commit()Delete.returning() returns deleted rows/columns. [1]
3) ORM-enabled DELETE (class-based) via Session.execute
from sqlalchemy import delete
stmt = delete(User).where(User.name.in_(["squidward", "sandy"]))
session.execute(stmt)
session.commit()This emits a single SQL DELETE affecting all matching rows. [2]
4) Notes (ORM)
- ORM-enabled bulk
delete(User)...bypasses normal unit-of-work behavior (no in-Python relationship cascading); rely on DB cascades / constraints as appropriate. [2]
Source of truth: SQLAlchemy docs for delete() and ORM-enabled DELETE. [1] [2] [3]
Replace TRUNCATE TABLE with DELETE for SQLite compatibility; rename unused fixture parameter.
TRUNCATE TABLE is not supported by SQLite (the default test database), causing test failures in local runs. Use SQLAlchemy's delete() method instead, which is portable across all databases. Additionally, the setup_db parameter is unused and should be renamed to _setup_db to suppress linting warnings.
🛠️ Proposed fix
- async def cleanup_table(self, session, setup_db):
- await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}"))
+ async def cleanup_table(self, session, _setup_db):
+ await session.execute(sqla.delete(TestOutboxModel))
await session.commit()
yield📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @pytest.fixture(autouse=True) | |
| async def cleanup_table(self, session, setup_db): | |
| await session.execute(sqla.text(f"TRUNCATE TABLE {TestOutboxModel.__tablename__}")) | |
| await session.commit() | |
| yield | |
| `@pytest.fixture`(autouse=True) | |
| async def cleanup_table(self, session, _setup_db): | |
| await session.execute(sqla.delete(TestOutboxModel)) | |
| await session.commit() | |
| yield |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 80-80: Unused method argument: setup_db
(ARG002)
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/test_event_outbox.py` around lines 79 - 83, The
cleanup_table fixture uses a non-portable "TRUNCATE TABLE" SQL and an unused
parameter; change the fixture named cleanup_table to use SQLAlchemy's delete
operation against TestOutboxModel (e.g., execute a sqla.delete(TestOutboxModel)
via the session and commit) so it works with SQLite, and rename the unused
setup_db parameter to _setup_db to silence linter warnings while keeping session
and TestOutboxModel references intact.
| async def test_get_new_event_positive(self, outbox_repo): | ||
| """ | ||
| checks getting one event positive | ||
| """ | ||
| repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) | ||
| request = OutboxRequest( | ||
| message="test_outbox_mark_event_as_produced_positive", | ||
| count=1, | ||
| ) | ||
| await OutboxRequestHandler(repository).handle(request) | ||
| [event_over_get_all_events_method] = await repository.get_many(1) | ||
| await OutboxRequestHandler(outbox_repo).handle(request) | ||
| [event_over_get_all_events_method] = await outbox_repo.get_many(1) | ||
|
|
||
| event: outbox_repository.OutboxedEvent | None = next( | ||
| iter( | ||
| await repository.get_many( | ||
| await outbox_repo.get_many( | ||
| batch_size=1, | ||
| ), | ||
| ), | ||
| None, | ||
| ) | ||
| await session.commit() | ||
| await outbox_repo.commit() | ||
|
|
||
| assert event | ||
| assert event.id == event_over_get_all_events_method.id # noqa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the unused noqa.
The # noqa is now redundant and flagged by Ruff.
✂️ Proposed fix
- assert event.id == event_over_get_all_events_method.id # noqa
+ assert event.id == event_over_get_all_events_method.id📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def test_get_new_event_positive(self, outbox_repo): | |
| """ | |
| checks getting one event positive | |
| """ | |
| repository = sqlalchemy.SqlAlchemyOutboxedEventRepository(session) | |
| request = OutboxRequest( | |
| message="test_outbox_mark_event_as_produced_positive", | |
| count=1, | |
| ) | |
| await OutboxRequestHandler(repository).handle(request) | |
| [event_over_get_all_events_method] = await repository.get_many(1) | |
| await OutboxRequestHandler(outbox_repo).handle(request) | |
| [event_over_get_all_events_method] = await outbox_repo.get_many(1) | |
| event: outbox_repository.OutboxedEvent | None = next( | |
| iter( | |
| await repository.get_many( | |
| await outbox_repo.get_many( | |
| batch_size=1, | |
| ), | |
| ), | |
| None, | |
| ) | |
| await session.commit() | |
| await outbox_repo.commit() | |
| assert event | |
| assert event.id == event_over_get_all_events_method.id # noqa | |
| async def test_get_new_event_positive(self, outbox_repo): | |
| """ | |
| checks getting one event positive | |
| """ | |
| request = OutboxRequest( | |
| message="test_outbox_mark_event_as_produced_positive", | |
| count=1, | |
| ) | |
| await OutboxRequestHandler(outbox_repo).handle(request) | |
| [event_over_get_all_events_method] = await outbox_repo.get_many(1) | |
| event: outbox_repository.OutboxedEvent | None = next( | |
| iter( | |
| await outbox_repo.get_many( | |
| batch_size=1, | |
| ), | |
| ), | |
| None, | |
| ) | |
| await outbox_repo.commit() | |
| assert event | |
| assert event.id == event_over_get_all_events_method.id |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 169-169: Unused blanket noqa directive
Remove unused noqa directive
(RUF100)
🤖 Prompt for AI Agents
In `@tests/integration/sqlalchemy/test_event_outbox.py` around lines 147 - 169,
The test function test_get_new_event_positive contains a redundant inline
comment "# noqa" on the assert comparing event.id and
event_over_get_all_events_method.id; remove the "# noqa" token from that
assertion line (in the test_get_new_event_positive body where event and
event_over_get_all_events_method are compared) so the linter (Ruff) no longer
flags it as unused, then run the test suite/linter to confirm the warning is
gone.
Summary by CodeRabbit
New Features
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.