diff --git a/driver/src/action/session.rs b/driver/src/action/session.rs index f40bd19e5..551235715 100644 --- a/driver/src/action/session.rs +++ b/driver/src/action/session.rs @@ -1,4 +1,5 @@ use crate::{ + bson::Timestamp, client::options::{SessionOptions, TransactionOptions}, error::Result, Client, diff --git a/driver/src/client/options.rs b/driver/src/client/options.rs index b2c6cdcaf..e5397c28c 100644 --- a/driver/src/client/options.rs +++ b/driver/src/client/options.rs @@ -17,7 +17,6 @@ use std::{ time::Duration, }; -use crate::bson::UuidRepresentation; use derive_where::derive_where; use macro_magic::export_tokens; use serde::{de::Unexpected, Deserialize, Deserializer, Serialize, Serializer}; @@ -35,7 +34,7 @@ use crate::options::Compressor; #[cfg(test)] use crate::srv::LookupHosts; use crate::{ - bson::{doc, Bson, Document}, + bson::{doc, Bson, Document, Timestamp, UuidRepresentation}, client::auth::{AuthMechanism, Credential}, concern::{Acknowledgment, ReadConcern, WriteConcern}, error::{Error, ErrorKind, Result}, @@ -3044,6 +3043,10 @@ pub struct SessionOptions { /// If true, all read operations performed using this client session will share the same /// snapshot. Defaults to false. pub snapshot: Option, + + /// The snapshot time to use for a snapshot session. This option can only be set if `snapshot` + /// is set to true. + pub snapshot_time: Option, } impl SessionOptions { @@ -3051,12 +3054,16 @@ impl SessionOptions { if let (Some(causal_consistency), Some(snapshot)) = (self.causal_consistency, self.snapshot) { if causal_consistency && snapshot { - return Err(ErrorKind::InvalidArgument { - message: "snapshot and causal consistency are mutually exclusive".to_string(), - } - .into()); + return Err(Error::invalid_argument( + "snapshot and causal consistency are mutually exclusive", + )); } } + if self.snapshot_time.is_some() && self.snapshot != Some(true) { + return Err(Error::invalid_argument( + "cannot set snapshot_time without setting snapshot to true", + )); + } Ok(()) } } diff --git a/driver/src/client/session.rs b/driver/src/client/session.rs index b14360a0b..cd62eaf51 100644 --- a/driver/src/client/session.rs +++ b/driver/src/client/session.rs @@ -16,6 +16,7 @@ use uuid::Uuid; use crate::{ bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp}, cmap::conn::PinnedConnectionHandle, + error::{Error, Result}, operation::Retryability, options::{SessionOptions, TransactionOptions}, sdam::ServerInfo, @@ -236,6 +237,7 @@ impl ClientSession { ) -> Self { let timeout = client.inner.topology.watcher().logical_session_timeout(); let server_session = client.inner.session_pool.check_out(timeout).await; + let snapshot_time = options.as_ref().and_then(|o| o.snapshot_time); Self { drop_token: client.register_async_drop(), client, @@ -244,7 +246,7 @@ impl ClientSession { is_implicit, options, transaction: Default::default(), - snapshot_time: None, + snapshot_time, operation_time: None, #[cfg(test)] convenient_transaction_timeout: None, @@ -306,6 +308,23 @@ impl ClientSession { self.operation_time } + /// The snapshot time for a snapshot session. This will return `None` if `snapshot_time` was not + /// provided and the server has not yet responded with a snapshot time. It is an error to call + /// this method on a non-snapshot session. + pub fn snapshot_time(&self) -> Result> { + if !self + .options + .as_ref() + .and_then(|o| o.snapshot) + .unwrap_or(false) + { + return Err(Error::invalid_argument( + "cannot access snapshot time on a non-snapshot session", + )); + } + Ok(self.snapshot_time) + } + pub(crate) fn causal_consistency(&self) -> bool { self.options() .and_then(|opts| opts.causal_consistency) diff --git a/driver/src/test/spec/sessions.rs b/driver/src/test/spec/sessions.rs index 96ab8359f..7a59c7d26 100644 --- a/driver/src/test/spec/sessions.rs +++ b/driver/src/test/spec/sessions.rs @@ -11,7 +11,7 @@ use futures::TryStreamExt; use futures_util::{future::try_join_all, FutureExt}; use crate::{ - bson::{doc, Document}, + bson::{doc, Document, Timestamp}, error::{ErrorKind, Result}, event::{ command::{CommandEvent, CommandStartedEvent}, @@ -21,8 +21,10 @@ use crate::{ get_client_options, log_uncaptured, server_version_gte, + server_version_lt, spec::unified_runner::run_unified_tests, topology_is_load_balanced, + topology_is_replica_set, topology_is_sharded, Event, }, @@ -281,3 +283,48 @@ async fn no_cluster_time_in_sdam() { // Assert that the cluster time hasn't changed assert_eq!(cluster_time.as_ref(), start.command.get("$clusterTime")); } + +// Sessions prose test 21 +#[tokio::test] +async fn snapshot_time_and_snapshot_false_disallowed() { + if server_version_lt(5, 0).await + || !(topology_is_replica_set().await || topology_is_sharded().await) + { + log_uncaptured( + "skipping snapshot_time_and_snapshot_false_disallowed: requires 5.0+ replica set or \ + sharded cluster", + ); + return; + } + + let client = Client::for_test().await; + let error = client + .start_session() + .snapshot(false) + .snapshot_time(Timestamp { + time: 0, + increment: 0, + }) + .await + .unwrap_err(); + assert!(matches!(*error.kind, ErrorKind::InvalidArgument { .. })); +} + +// Sessions prose test 22 +#[tokio::test] +async fn cannot_call_snapshot_time_on_non_snapshot_session() { + if server_version_lt(5, 0).await + || !(topology_is_replica_set().await || topology_is_sharded().await) + { + log_uncaptured( + "skipping cannot_call_snapshot_time_on_non_snapshot_session: requires 5.0+ replica \ + set or sharded cluster", + ); + return; + } + + let client = Client::for_test().await; + let session = client.start_session().snapshot(false).await.unwrap(); + let error = session.snapshot_time().unwrap_err(); + assert!(matches!(*error.kind, ErrorKind::InvalidArgument { .. })); +} diff --git a/driver/src/test/spec/unified_runner/operation.rs b/driver/src/test/spec/unified_runner/operation.rs index 2147a0be5..c459f6b57 100644 --- a/driver/src/test/spec/unified_runner/operation.rs +++ b/driver/src/test/spec/unified_runner/operation.rs @@ -25,63 +25,12 @@ mod wait; use std::{fmt::Debug, ops::Deref}; -use collection::{ - Aggregate, - AssertCollectionExists, - AssertCollectionNotExists, - CreateCollection, - DropCollection, -}; -use command::{CreateCommandCursor, RunCommand, RunCursorCommand}; -use connection::{AssertNumberConnectionsCheckedOut, Close}; -use count::{AssertEventCount, CountDocuments, Distinct, EstimatedDocumentCount}; -use delete::{DeleteMany, DeleteOne}; -use failpoint::{FailPointCommand, TargetedFailPoint}; -use find::{ - CreateFindCursor, - Find, - FindOne, - FindOneAndDelete, - FindOneAndReplace, - FindOneAndUpdate, -}; use futures::{future::BoxFuture, FutureExt}; -use gridfs::{Delete, DeleteByName, Download, DownloadByName, RenameByName, Upload}; -use index::{ - AssertIndexExists, - AssertIndexNotExists, - CreateIndex, - DropIndex, - DropIndexes, - ListIndexNames, - ListIndexes, -}; -use insert::{InsertMany, InsertOne}; -use iteration::{IterateOnce, IterateUntilDocumentOrError}; -use list::{ListCollectionNames, ListCollections, ListDatabaseNames, ListDatabases}; -use rename::Rename; use serde::{ de::{DeserializeOwned, Deserializer}, Deserialize, }; -use session::{ - AssertDifferentLsidOnLastTwoCommands, - AssertSameLsidOnLastTwoCommands, - AssertSessionDirty, - AssertSessionNotDirty, - AssertSessionPinned, - AssertSessionTransactionState, - AssertSessionUnpinned, - EndSession, -}; -use thread::{RunOnThread, WaitForThread}; use tokio::sync::Mutex; -use topology::{AssertTopologyType, RecordTopologyDescription}; -use transaction::{AbortTransaction, CommitTransaction, StartTransaction, WithTransaction}; -use update::{ReplaceOne, UpdateMany, UpdateOne}; -use wait::{Wait, WaitForEvent, WaitForPrimaryChange}; - -use super::{results_match, Entity, ExpectError, TestCursor, TestFileEntity, TestRunner}; use crate::{ bson::{doc, Bson, Document}, @@ -89,10 +38,31 @@ use crate::{ options::ChangeStreamOptions, }; +use super::{results_match, Entity, ExpectError, TestCursor, TestFileEntity, TestRunner}; + use bulk_write::*; +use collection::*; +use command::*; +use connection::*; +use count::*; #[cfg(feature = "in-use-encryption")] use csfle::*; +use delete::*; +use failpoint::*; +use find::*; +use gridfs::*; +use index::*; +use insert::*; +use iteration::*; +use list::*; +use rename::*; use search_index::*; +use session::*; +use thread::*; +use topology::*; +use transaction::*; +use update::*; +use wait::*; pub(crate) trait TestOperation: Debug + Send + Sync { fn execute_test_runner_operation<'a>( @@ -439,6 +409,7 @@ impl<'de> Deserialize<'de> for Operation { "decrypt" => deserialize_op::(definition.arguments), "dropIndex" => deserialize_op::(definition.arguments), "dropIndexes" => deserialize_op::(definition.arguments), + "getSnapshotTime" => deserialize_op::(definition.arguments), s => Ok(Box::new(UnimplementedOperation { _name: s.to_string(), }) as Box), diff --git a/driver/src/test/spec/unified_runner/operation/session.rs b/driver/src/test/spec/unified_runner/operation/session.rs index 43b643d0c..a416ede84 100644 --- a/driver/src/test/spec/unified_runner/operation/session.rs +++ b/driver/src/test/spec/unified_runner/operation/session.rs @@ -1,4 +1,5 @@ use crate::{ + bson::Bson, client::session::TransactionState, error::Result, test::spec::unified_runner::{ @@ -201,3 +202,27 @@ impl TestOperation for AssertSessionNotDirty { .boxed() } } + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(super) struct GetSnapshotTime {} + +impl TestOperation for GetSnapshotTime { + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + with_mut_session!(test_runner, id, |session| { + async move { + session + .snapshot_time() + .map(|option| option.map(|ts| Bson::Timestamp(ts).into())) + } + }) + .await + } + .boxed() + } +} diff --git a/driver/src/test/spec/unified_runner/test_file.rs b/driver/src/test/spec/unified_runner/test_file.rs index 2c4fbc214..85c67f8b7 100644 --- a/driver/src/test/spec/unified_runner/test_file.rs +++ b/driver/src/test/spec/unified_runner/test_file.rs @@ -6,7 +6,7 @@ use semver::Version; use serde::{Deserialize, Deserializer}; use tokio::sync::oneshot; -use super::{results_match, ExpectedEvent, ObserveEvent, Operation}; +use super::{results_match, EntityMap, ExpectedEvent, ObserveEvent, Operation}; #[cfg(feature = "bson-3")] use crate::bson_compat::RawDocumentBufExt; @@ -28,6 +28,8 @@ use crate::{ ReadConcern, ReadPreference, SelectionCriteria, + SessionOptions, + TransactionOptions, WriteConcern, }, serde_util, @@ -375,7 +377,40 @@ pub(crate) struct Collection { pub(crate) struct Session { pub(crate) id: String, pub(crate) client: String, - pub(crate) session_options: Option, + pub(crate) session_options: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(crate) struct TestFileSessionOptions { + default_transaction_options: Option, + causal_consistency: Option, + snapshot: Option, + snapshot_time: Option, // the id of the entity to be retrieved from the map +} + +impl TestFileSessionOptions { + pub(crate) fn as_session_options(&self, entities: &EntityMap) -> SessionOptions { + let snapshot_time = match self.snapshot_time { + Some(ref id) => { + let entity = entities + .get(id) + .unwrap_or_else(|| panic!("missing entity for id {id}")); + let bson = entity.as_bson(); + let timestamp = bson + .as_timestamp() + .unwrap_or_else(|| panic!("expected timestamp for id {id}, got {bson}")); + Some(timestamp) + } + None => None, + }; + SessionOptions { + default_transaction_options: self.default_transaction_options.clone(), + causal_consistency: self.causal_consistency, + snapshot: self.snapshot, + snapshot_time, + } + } } #[derive(Debug, Deserialize)] diff --git a/driver/src/test/spec/unified_runner/test_runner.rs b/driver/src/test/spec/unified_runner/test_runner.rs index b0b6eb6c9..271cebf7d 100644 --- a/driver/src/test/spec/unified_runner/test_runner.rs +++ b/driver/src/test/spec/unified_runner/test_runner.rs @@ -605,11 +605,15 @@ impl TestRunner { TestFileEntity::Session(session) => { let id = session.id.clone(); let client = self.get_client(&session.client).await; - let mut client_session = client - .start_session() - .with_options(session.session_options.clone()) - .await - .unwrap(); + let options = match session.session_options { + Some(ref options) => { + let entities = self.entities.read().await; + Some(options.as_session_options(&entities)) + } + None => None, + }; + let mut client_session = + client.start_session().with_options(options).await.unwrap(); if let Some(time) = &*self.cluster_time.read().await { client_session.advance_cluster_time(time); } diff --git a/spec/sessions/README.md b/spec/sessions/README.md index 8d817a59f..0a94b9639 100644 --- a/spec/sessions/README.md +++ b/spec/sessions/README.md @@ -29,6 +29,15 @@ As part of the test setup for these cases, create a `MongoClient` pointed at the in the test case and verify that the test server does NOT define a value for `logicalSessionTimeoutMinutes` by sending a hello command and checking the response. +## Specific operations and behaviour for unified tests + +An operation on sessions called `getSnapshotTime` must be supported for unified tests. This operation returns the value +of `snapshotTime` on the session, so that it can be used in subsequent operations. + +When parsing `snapshotTime` from `sessionOptions` for unified tests, the parsed string is the name of the key for the +actual value of `snapshotTime` to be found in the +[entity map](https://github.com/mongodb/specifications/blob/master/source/unified-test-format/unified-test-format.md#entity-map). + ## Prose tests ### 1. Setting both `snapshot` and `causalConsistency` to true is not allowed @@ -250,8 +259,24 @@ and configure a `MongoClient` with default options. - Run a ping command using C1 and assert that `$clusterTime` sent is the same as the `clusterTime` recorded earlier. This assertion proves that C1's `$clusterTime` was not advanced by gossiping through SDAM. +### 21. Having `snapshotTime` set and `snapshot` set to false is not allowed + +Snapshot sessions tests require server of version 5.0 or higher and replica set or a sharded cluster deployment. + +- Start a session by calling `startSession` with `snapshot = false` and `snapshotTime = new Timestamp(1)`. +- Assert that a client side error was raised. + +### 22. Retrieving `snapshotTime` on a non-snapshot session raises an error + +Snapshot sessions tests require server of version 5.0 or higher and replica set or a sharded cluster deployment. + +- Start a session by calling `startSession` on with `snapshot = false`. +- Try to access the session's snapshot time. +- Assert that a client side error was raised. + ## Changelog +- 2025-09-25: Added tests for snapshotTime. - 2025-02-24: Test drivers do not gossip $clusterTime on SDAM. - 2024-05-08: Migrated from reStructuredText to Markdown. - 2019-05-15: Initial version. diff --git a/spec/sessions/snapshot-sessions.json b/spec/sessions/snapshot-sessions.json index 260f8b6f4..8f806ea75 100644 --- a/spec/sessions/snapshot-sessions.json +++ b/spec/sessions/snapshot-sessions.json @@ -988,6 +988,810 @@ } } ] + }, + { + "description": "Find operation with snapshot and snapshot time", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + } + ] + }, + { + "name": "getSnapshotTime", + "object": "session0", + "saveResultAsEntity": "savedSnapshotTime" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "session": { + "id": "session2", + "client": "client0", + "sessionOptions": { + "snapshot": true, + "snapshotTime": "savedSnapshotTime" + } + } + } + ] + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + }, + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "Distinct operation with snapshot and snapshot time", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {}, + "fieldName": "x" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "getSnapshotTime", + "object": "session0", + "saveResultAsEntity": "savedSnapshotTime" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "session": { + "id": "session2", + "client": "client0", + "sessionOptions": { + "snapshot": true, + "snapshotTime": "savedSnapshotTime" + } + } + } + ] + } + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {}, + "fieldName": "x" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {}, + "fieldName": "x" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "filter": {}, + "fieldName": "x" + }, + "expectResult": [ + 11, + 33 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "$$exists": false + } + }, + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "Aggregate operation with snapshot and snapshot time", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "getSnapshotTime", + "object": "session0", + "saveResultAsEntity": "savedSnapshotTime" + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "session": { + "id": "session2", + "client": "client0", + "sessionOptions": { + "snapshot": true, + "snapshotTime": "savedSnapshotTime" + } + } + } + ] + } + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session2", + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session2", + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "$$exists": false + } + }, + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "countDocuments operation with snapshot and snapshot time", + "operations": [ + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectResult": 2 + }, + { + "name": "getSnapshotTime", + "object": "session0", + "saveResultAsEntity": "savedSnapshotTime" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 3, + "x": 33 + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "session": { + "id": "session2", + "client": "client0", + "sessionOptions": { + "snapshot": true, + "snapshotTime": "savedSnapshotTime" + } + } + } + ] + } + }, + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {} + }, + "expectResult": 2 + }, + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "session": "session2", + "filter": {} + }, + "expectResult": 2 + }, + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "filter": {} + }, + "expectResult": 3 + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + }, + "databaseName": "database0" + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "$$exists": false + } + }, + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "Mixed operation with snapshot and snapshotTime", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "getSnapshotTime", + "object": "session0", + "saveResultAsEntity": "savedSnapshotTime" + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "session": { + "id": "session2", + "client": "client0", + "sessionOptions": { + "snapshot": true, + "snapshotTime": "savedSnapshotTime" + } + } + } + ] + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session2" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session2" + }, + "expectResult": [ + 11 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$matchesEntity": "savedSnapshotTime" + } + } + } + } + } + ] + } + ] } ] } diff --git a/spec/sessions/snapshot-sessions.yml b/spec/sessions/snapshot-sessions.yml index bcf0f7eec..90d448199 100644 --- a/spec/sessions/snapshot-sessions.yml +++ b/spec/sessions/snapshot-sessions.yml @@ -378,7 +378,7 @@ tests: fieldName: x filter: {} session: session0 - expectResult: [ 11 ] + expectResult: [ 11 ] expectEvents: - client: client0 events: @@ -480,3 +480,406 @@ tests: isError: true isClientError: true errorContains: Transactions are not supported in snapshot sessions + +- description: Find operation with snapshot and snapshot time + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: {} + expectResult: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + - name: getSnapshotTime + object: session0 + saveResultAsEntity: &savedSnapshotTime savedSnapshotTime + - name: insertOne + object: collection0 + arguments: + document: { _id: 3, x: 33 } + - name: createEntities + object: testRunner + arguments: + entities: + - session: + id: session2 + client: client0 + sessionOptions: + snapshot: true + snapshotTime: *savedSnapshotTime + - name: find + object: collection0 + arguments: + session: session2 + filter: {} + expectResult: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + ## Calling the operation again to verify that atClusterTime/snapshotTime has not been modified after the first query + - name: find + object: collection0 + arguments: + session: session2 + filter: {} + expectResult: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + - name: find + object: collection0 + arguments: + filter: {} + expectResult: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + - { _id: 3, x: 33 } + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + databaseName: database0 + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + databaseName: database0 + +- description: Distinct operation with snapshot and snapshot time + operations: + - name: distinct + object: collection0 + arguments: + session: session0 + filter: {} + fieldName: x + expectResult: [11] + - name: getSnapshotTime + object: session0 + saveResultAsEntity: &savedSnapshotTime savedSnapshotTime + - name: insertOne + object: collection0 + arguments: + document: { _id: 3, x: 33 } + - name: createEntities + object: testRunner + arguments: + entities: + - session: + id: session2 + client: client0 + sessionOptions: + snapshot: true + snapshotTime: *savedSnapshotTime + - name: distinct + object: collection0 + arguments: + session: session2 + filter: {} + fieldName: x + expectResult: [11] + ## Calling the operation again to verify that atClusterTime/snapshotTime has not been modified after the first query + - name: distinct + object: collection0 + arguments: + session: session2 + filter: {} + fieldName: x + expectResult: [11] + - name: distinct + object: collection0 + arguments: + filter: {} + fieldName: x + expectResult: [11, 33] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + databaseName: database0 + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + "$$exists": false + databaseName: database0 + +- description: Aggregate operation with snapshot and snapshot time + operations: + - name: aggregate + object: collection0 + arguments: + session: session0 + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + - name: getSnapshotTime + object: session0 + saveResultAsEntity: &savedSnapshotTime savedSnapshotTime + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: createEntities + object: testRunner + arguments: + entities: + - session: + id: session2 + client: client0 + sessionOptions: + snapshot: true + snapshotTime: *savedSnapshotTime + - name: aggregate + object: collection0 + arguments: + session: session2 + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + ## Calling the operation again to verify that atClusterTime/snapshotTime has not been modified after the first query + - name: aggregate + object: collection0 + arguments: + session: session2 + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + "$$exists": false + databaseName: database0 + +- description: countDocuments operation with snapshot and snapshot time + operations: + - name: countDocuments + object: collection0 + arguments: + session: session0 + filter: {} + expectResult: 2 + - name: getSnapshotTime + object: session0 + saveResultAsEntity: &savedSnapshotTime savedSnapshotTime + - name: insertOne + object: collection0 + arguments: + document: { _id: 3, x: 33 } + - name: createEntities + object: testRunner + arguments: + entities: + - session: + id: session2 + client: client0 + sessionOptions: + snapshot: true + snapshotTime: *savedSnapshotTime + - name: countDocuments + object: collection0 + arguments: + session: session2 + filter: {} + expectResult: 2 + ## Calling the operation again to verify that atClusterTime/snapshotTime has not been modified after the first query + - name: countDocuments + object: collection0 + arguments: + session: session2 + filter: {} + expectResult: 2 + - name: countDocuments + object: collection0 + arguments: + filter: {} + expectResult: 3 + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + databaseName: database0 + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + "$$exists": false + databaseName: database0 + +- description: Mixed operation with snapshot and snapshotTime + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + - name: getSnapshotTime + object: session0 + saveResultAsEntity: &savedSnapshotTime savedSnapshotTime + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: createEntities + object: testRunner + arguments: + entities: + - session: + id: session2 + client: client0 + sessionOptions: + snapshot: true + snapshotTime: *savedSnapshotTime + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session2 + expectResult: + - { _id: 1, x: 11 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session2 + expectResult: [ 11 ] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: { $$matchesEntity: *savedSnapshotTime } \ No newline at end of file