diff --git a/splitio/client/client.py b/splitio/client/client.py index 9e1ddffc..0074bfb7 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -7,7 +7,7 @@ from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory from splitio.engine.splitters import Splitter from splitio.models.impressions import Impression, Label, ImpressionDecorated -from splitio.models.events import Event, EventWrapper +from splitio.models.events import Event, EventWrapper, SdkEvent from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies from splitio.client import input_validator from splitio.util.time import get_current_epoch_time_ms, utctime_ms @@ -224,7 +224,7 @@ def _check_impression_label(self, result): class Client(ClientBase): # pylint: disable=too-many-instance-attributes """Entry point for the split sdk.""" - def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None): + def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None): """ Construct a Client instance. @@ -240,6 +240,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca :rtype: Client """ ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator) + self._events_manager = events_manager self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments')) def destroy(self): @@ -249,7 +250,24 @@ def destroy(self): Only applicable when using in-memory operation mode. """ self._factory.destroy() + + def on(self, sdk_event, callback_handle): + if not self._validate_sdk_event_info(sdk_event, callback_handle): + return + + self._events_manager.register(sdk_event, callback_handle) + def _validate_sdk_event_info(self, sdk_event, callback_handle): + if not isinstance(sdk_event, SdkEvent): + _LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.") + return False + + if not hasattr(callback_handle, '__call__'): + _LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.") + return False + + return True + def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None): """ Get the treatment for a feature flag and key, with an optional dictionary of attributes. diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 34a2d598..71e88278 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -1,3 +1,4 @@ +import pytest """A module for Split.io Factories.""" import logging import threading @@ -19,8 +20,11 @@ TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync from splitio.engine.impressions.manager import Counter as ImpressionsCounter from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync +from splitio.events.events_manager import EventsManager +from splitio.events.events_manager_config import EventsManagerConfig +from splitio.events.events_task import EventsTask +from splitio.events.events_delivery import EventsDelivery from splitio.models.fallback_config import FallbackTreatmentCalculator -from splitio.events.events_metadata import EventsMetadata, SdkEventType from splitio.models.notification import SdkInternalEventNotification from splitio.models.events import SdkInternalEvent @@ -171,6 +175,7 @@ def __init__( # pylint: disable=too-many-arguments labels_enabled, recorder, internal_events_queue, + events_manager, sync_manager=None, sdk_ready_flag=None, telemetry_producer=None, @@ -210,6 +215,7 @@ def __init__( # pylint: disable=too-many-arguments self._sdk_internal_ready_flag = sdk_ready_flag self._fallback_treatment_calculator = fallback_treatment_calculator self._internal_events_queue = internal_events_queue + self._events_manager = events_manager self._start_status_updater() def _start_status_updater(self): @@ -254,7 +260,7 @@ def client(self): This client is only a set of references to structures hold by the factory. Creating one a fast operation and safe to be used anywhere. """ - return Client(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator) + return Client(self, self._recorder, self._events_manager, self._labels_enabled, self._fallback_treatment_calculator) def manager(self): """ @@ -298,6 +304,7 @@ def destroy(self, destroyed_event=None): try: _LOGGER.info('Factory destroy called, stopping tasks.') + self._events_manager.destroy() if self._sync_manager is not None: if destroyed_event is not None: @@ -559,6 +566,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl } internal_events_queue = queue.Queue() + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, internal_events_queue) storages = { 'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []), 'segments': InMemorySegmentStorage(internal_events_queue), @@ -608,6 +617,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']), unique_keys_task, clear_filter_task, + internal_events_task ) synchronizer = Synchronizer(synchronizers, tasks) @@ -639,14 +649,14 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl synchronizer._split_synchronizers._segment_sync.shutdown() return SplitFactory(api_key, storages, cfg['labelsEnabled'], - recorder, internal_events_queue, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization, + recorder, internal_events_queue, events_manager, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization, fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])) initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True) initialization_thread.start() return SplitFactory(api_key, storages, cfg['labelsEnabled'], - recorder, internal_events_queue, manager, sdk_ready_flag, + recorder, internal_events_queue, events_manager, manager, sdk_ready_flag, telemetry_producer, telemetry_init_producer, telemetry_submitter, fallback_treatment_calculator = FallbackTreatmentCalculator(cfg['fallbackTreatments'])) @@ -837,13 +847,15 @@ def _build_redis_factory(api_key, cfg): telemetry_init_producer.record_config(cfg, {}, 0, 0) internal_events_queue = queue.Queue() - + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + split_factory = SplitFactory( api_key, storages, cfg['labelsEnabled'], recorder, internal_events_queue, + events_manager, manager, sdk_ready_flag=None, telemetry_producer=telemetry_producer, @@ -1005,6 +1017,7 @@ def _build_pluggable_factory(api_key, cfg): telemetry_init_producer.record_config(cfg, {}, 0, 0) internal_events_queue = queue.Queue() + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) split_factory = SplitFactory( api_key, @@ -1012,6 +1025,7 @@ def _build_pluggable_factory(api_key, cfg): cfg['labelsEnabled'], recorder, internal_events_queue, + events_manager, manager, sdk_ready_flag=None, telemetry_producer=telemetry_producer, @@ -1167,13 +1181,15 @@ def _build_localhost_factory(cfg): telemetry_runtime_producer ) internal_events_queue = queue.Queue() - + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + return SplitFactory( 'localhost', storages, False, recorder, internal_events_queue, + events_manager, manager, ready_event, telemetry_producer=telemetry_producer, @@ -1259,13 +1275,14 @@ def get_factory(api_key, **kwargs): _INSTANTIATED_FACTORIES_LOCK.acquire() if _INSTANTIATED_FACTORIES: if api_key in _INSTANTIATED_FACTORIES: - _LOGGER.warning( - "factory instantiation: You already have %d %s with this SDK Key. " - "We recommend keeping only one instance of the factory at all times " - "(Singleton pattern) and reusing it throughout your application.", - _INSTANTIATED_FACTORIES[api_key], - 'factory' if _INSTANTIATED_FACTORIES[api_key] == 1 else 'factories' - ) + if _INSTANTIATED_FACTORIES[api_key] > 0: + _LOGGER.warning( + "factory instantiation: You already have %d %s with this SDK Key. " + "We recommend keeping only one instance of the factory at all times " + "(Singleton pattern) and reusing it throughout your application.", + _INSTANTIATED_FACTORIES[api_key], + 'factory' if _INSTANTIATED_FACTORIES[api_key] == 1 else 'factories' + ) else: _LOGGER.warning( "factory instantiation: You already have an instance of the Split factory. " diff --git a/splitio/events/events_manager.py b/splitio/events/events_manager.py index 077b2370..54ba06e5 100644 --- a/splitio/events/events_manager.py +++ b/splitio/events/events_manager.py @@ -49,6 +49,11 @@ def notify_internal_event(self, sdk_internal_event, event_metadata): notify_event.start() self._set_sdk_event_triggered(sorted_event) + def destroy(self): + with self._lock: + self._active_subscriptions = {} + self._internal_events_status = {} + def _event_already_triggered(self, sdk_event): if self._active_subscriptions.get(sdk_event) != None: return self._active_subscriptions.get(sdk_event).triggered diff --git a/splitio/events/events_task.py b/splitio/events/events_task.py index c403bdbe..ea0ffce7 100644 --- a/splitio/events/events_task.py +++ b/splitio/events/events_task.py @@ -64,19 +64,20 @@ def _run(self): def start(self): """Start worker.""" if self.is_running(): - _LOGGER.debug('Worker is already running') + _LOGGER.debug('SDK Event Worker is already running') return + self._running = True - - _LOGGER.debug('Starting Event Task worker') + _LOGGER.debug('Starting SDK Event Task worker') self._worker = threading.Thread(target=self._run, name='EventsTaskWorker', daemon=True) self._worker.start() - def stop(self): + def stop(self, stop_flag=None): """Stop worker.""" - _LOGGER.debug('Stopping Event Task worker') + _LOGGER.debug('Stopping SDK Event Task worker') if not self.is_running(): - _LOGGER.debug('Worker is not running. Ignoring.') + _LOGGER.debug('SDK Event Worker is not running. Ignoring.') return + self._running = False self._internal_events_queue.put(self._centinel) \ No newline at end of file diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 50f70bb3..8685d479 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -90,7 +90,7 @@ class SplitTasks(object): """SplitTasks.""" def __init__(self, feature_flag_task, segment_task, impressions_task, events_task, # pylint:disable=too-many-arguments - impressions_count_task, telemetry_task=None, unique_keys_task = None, clear_filter_task = None): + impressions_count_task, telemetry_task=None, unique_keys_task = None, clear_filter_task = None, internal_events_task=None): """ Class constructor. @@ -113,6 +113,7 @@ def __init__(self, feature_flag_task, segment_task, impressions_task, events_tas self._unique_keys_task = unique_keys_task self._clear_filter_task = clear_filter_task self._telemetry_task = telemetry_task + self._internal_events_task = internal_events_task @property def split_task(self): @@ -154,6 +155,11 @@ def telemetry_task(self): """Return clear filter sync task.""" return self._telemetry_task + @property + def internal_events_task(self): + """Return internal events task.""" + return self._internal_events_task + class BaseSynchronizer(object, metaclass=abc.ABCMeta): """Synchronizer interface.""" @@ -323,6 +329,9 @@ def start_periodic_data_recording(self): for task in self._periodic_data_recording_tasks: task.start() + if self._split_tasks.internal_events_task: + self._split_tasks.internal_events_task.start() + def stop_periodic_data_recording(self, blocking): """ Stop recorders. @@ -477,6 +486,9 @@ def stop_periodic_data_recording(self, blocking): :type blocking: bool """ _LOGGER.debug('Stopping periodic data recording') + if self._split_tasks.internal_events_task: + self._split_tasks.internal_events_task.stop() + if blocking: events = [] for task in self._periodic_data_recording_tasks: @@ -871,6 +883,8 @@ def start_periodic_fetching(self): self._split_tasks.split_task.start() if self._split_tasks.segment_task is not None: self._split_tasks.segment_task.start() + if self._split_tasks.internal_events_task: + self._split_tasks.internal_events_task.start() def stop_periodic_fetching(self): """Stop fetchers for feature flags and segments.""" @@ -948,6 +962,8 @@ def stop_periodic_fetching(self): self._split_tasks.split_task.stop() if self._split_tasks.segment_task is not None: self._split_tasks.segment_task.stop() + if self._split_tasks.internal_events_task: + self._split_tasks.internal_events_task.stop() def synchronize_splits(self): """Synchronize all feature flags.""" diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 1f351798..94da58a2 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -1,19 +1,17 @@ """SDK main client test module.""" # pylint: disable=no-self-use,protected-access -import json -import os import unittest.mock as mock -import time import pytest import queue from splitio.client.client import Client, _LOGGER as _logger, CONTROL, ClientAsync, EvaluationOptions from splitio.client.factory import SplitFactory, Status as FactoryStatus, SplitFactoryAsync +from splitio.events.events_manager import EventsManager from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator from splitio.models.fallback_treatment import FallbackTreatment from splitio.models.impressions import Impression, Label -from splitio.models.events import Event, EventWrapper +from splitio.models.events import Event, EventWrapper, SdkEvent from splitio.storage import EventStorage, ImpressionStorage, SegmentStorage, SplitStorage, RuleBasedSegmentsStorage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ InMemoryImpressionStorage, InMemoryTelemetryStorage, InMemorySplitStorageAsync, \ @@ -69,6 +67,7 @@ def synchronize_config(*_): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), @@ -79,7 +78,7 @@ def synchronize_config(*_): factory.block_until_ready(5) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.eval_with_context.return_value = { 'treatment': 'on', @@ -140,6 +139,7 @@ def test_get_treatment_with_config(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -153,7 +153,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.eval_with_context.return_value = { 'treatment': 'on', @@ -220,6 +220,7 @@ def test_get_treatments(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -232,7 +233,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -302,6 +303,7 @@ def test_get_treatments_by_flag_set(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -314,7 +316,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -383,6 +385,7 @@ def test_get_treatments_by_flag_sets(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -395,7 +398,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -463,6 +466,7 @@ def test_get_treatments_with_config(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -475,7 +479,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -549,6 +553,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -561,7 +566,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -631,6 +636,7 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -643,7 +649,7 @@ def synchronize_config(*_): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -720,6 +726,7 @@ def synchronize_config(*_): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), @@ -732,7 +739,7 @@ def synchronize_config(*_): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) assert client.get_treatment('some_key', 'SPLIT_1') == 'off' assert client.get_treatment('some_key', 'SPLIT_2') == 'on' assert client.get_treatment('some_key', 'SPLIT_3') == 'on' @@ -786,6 +793,7 @@ def synchronize_config(*_): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), @@ -798,7 +806,7 @@ def synchronize_config(*_): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) assert client.get_treatment('some_key', 'SPLIT_1') == 'off' assert client.get_treatment('some_key', 'SPLIT_2') == 'on' assert client.get_treatment('some_key', 'SPLIT_3') == 'on' @@ -852,6 +860,7 @@ def synchronize_config(*_): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), @@ -864,7 +873,7 @@ def synchronize_config(*_): from_raw(splits_json['splitChange1_1']['ff']['d'][1]), from_raw(splits_json['splitChange1_1']['ff']['d'][2]) ], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) assert client.get_treatment('some_key', 'SPLIT_1') == 'off' assert client.get_treatment('some_key', 'SPLIT_2') == 'on' assert client.get_treatment('some_key', 'SPLIT_3') == 'on' @@ -898,6 +907,7 @@ def test_destroy(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -907,7 +917,7 @@ def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client.destroy() assert client.destroyed is not None assert(mocker.called) @@ -937,6 +947,7 @@ def test_track(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -951,7 +962,7 @@ def synchronize_config(*_): factory._apikey = 'test' mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) assert client.track('key', 'user', 'purchase', 12) is True assert mocker.call([ EventWrapper( @@ -989,6 +1000,7 @@ def test_evaluations_before_running_post_fork(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock(), @@ -1003,7 +1015,7 @@ def synchronize_config(*_): mocker.call('Client is not ready - no calls possible') ] - client = Client(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, mocker.Mock(), mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.client._LOGGER', new=_logger) @@ -1068,6 +1080,7 @@ def test_telemetry_not_ready(self, mocker): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -1077,7 +1090,7 @@ def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, mocker.Mock(), mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) client.ready = False assert client.get_treatment('some_key', 'SPLIT_2') == CONTROL assert(telemetry_storage._tel_config._not_ready == 1) @@ -1112,6 +1125,7 @@ def test_telemetry_record_treatment_exception(self, mocker): mocker.Mock(), recorder, events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1131,7 +1145,7 @@ def stop(*_): ready_property = mocker.PropertyMock() ready_property.return_value = True type(factory).ready = ready_property - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) def _raise(*_): raise RuntimeError('something') client._evaluator.eval_many_with_context = _raise @@ -1214,6 +1228,7 @@ def test_telemetry_method_latency(self, mocker): mocker.Mock(), recorder, events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1229,7 +1244,7 @@ def stop(*_): pass factory._sync_manager.stop = stop - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) assert client.get_treatment('key', 'SPLIT_2') == 'on' assert(telemetry_storage._method_latencies._treatment[0] == 1) @@ -1286,6 +1301,7 @@ def test_telemetry_track_exception(self, mocker): mocker.Mock(), recorder, events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1297,7 +1313,7 @@ def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) try: client.track('key', 'tt', 'ev') except: @@ -1342,6 +1358,7 @@ def synchronize_config(*_): events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), TelemetrySubmitterMock(), @@ -1352,7 +1369,7 @@ def synchronize_config(*_): factory.block_until_ready(5) split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1) - client = Client(factory, recorder, True, FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(None)) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { 'treatment': 'on', @@ -1446,6 +1463,7 @@ def test_fallback_treatment_eval_exception(self, mocker): mocker.Mock(), recorder, internal_events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1462,7 +1480,7 @@ class TelemetrySubmitterMock(): def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global", '{"prop": "val"}')))) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global", '{"prop": "val"}')))) def get_feature_flag_names_by_flag_sets(*_): return ["some", "some2"] @@ -1586,6 +1604,7 @@ def test_fallback_treatment_exception(self, mocker): mocker.Mock(), recorder, internal_events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1602,7 +1621,7 @@ class TelemetrySubmitterMock(): def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) treatment = client.get_treatment("key", "some") assert(treatment == "on-global") assert(self.imps == None) @@ -1656,6 +1675,7 @@ def test_fallback_treatment_not_ready_impressions(self, mocker): mocker.Mock(), recorder, internal_events_queue, + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1672,7 +1692,7 @@ class TelemetrySubmitterMock(): def synchronize_config(*_): pass factory._telemetry_submitter = TelemetrySubmitterMock() - client = Client(factory, recorder, True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) + client = Client(factory, recorder, mocker.Mock(), True, FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global")))) client.ready = False treatment = client.get_treatment("key", "some") @@ -1705,6 +1725,19 @@ def synchronize_config(*_): except: pass + def test_events_subscription(self, mocker): + events_manager = mocker.Mock(spec=EventsManager) + client = Client(mocker.Mock(), mocker.Mock(), events_manager, True, FallbackTreatmentCalculator(None)) + client.on(SdkEvent.SDK_READY, self.test_fallback_treatment_not_ready_impressions) + assert events_manager.register.mock_calls[0] == mock.call(SdkEvent.SDK_READY, self.test_fallback_treatment_not_ready_impressions) + + events_manager.register.mock_calls = [] + client.on("dd", self.test_fallback_treatment_not_ready_impressions) + assert events_manager.register.mock_calls == [] + + client.on(SdkEvent.SDK_READY, "qwe") + assert events_manager.register.mock_calls == [] + class ClientAsyncTests(object): # pylint: disable=too-few-public-methods """Split client async test cases.""" diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 92416cdc..14a6ec27 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -19,6 +19,7 @@ from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageProducer, TelemetryStorageProducerAsync from splitio.engine.evaluator import Evaluator, EvaluationContext from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyNoneMode, StrategyOptimizedMode +from splitio.events.events_task import EventsTask from splitio.models.splits import from_raw from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator from splitio.models.fallback_treatment import FallbackTreatment @@ -42,7 +43,7 @@ class SplitFactoryTests(object): """Split factory test cases.""" - def test_flag_sets_counts(self): + def test_flag_sets_counts(self): factory = get_factory("none", config={ 'flagSetsFilter': ['set1', 'set2', 'set3'] }) @@ -357,6 +358,10 @@ def _telemetry_task_init_mock(self, synchronize_telemetry, synchronize_telemetry mocker.patch('splitio.client.factory.TelemetrySyncTask.__init__', new=_telemetry_task_init_mock) + internal_event_task_mock = mocker.Mock(spec=EventsTask) + internal_event_task_mock.stop.side_effect = stop_mock_2 + internal_event_task_mock.start.side_effect = stop_mock_2 + split_sync = mocker.Mock(spec=SplitSynchronizer) split_sync.synchronize_splits.return_value = [] segment_sync = mocker.Mock(spec=SegmentSynchronizer) @@ -364,7 +369,7 @@ def _telemetry_task_init_mock(self, synchronize_telemetry, synchronize_telemetry syncs = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) tasks = SplitTasks(split_async_task_mock, segment_async_task_mock, imp_async_task_mock, - evt_async_task_mock, imp_count_async_task_mock, telemetry_async_task_mock) + evt_async_task_mock, imp_count_async_task_mock, telemetry_async_task_mock, None, None, internal_event_task_mock) # Setup synchronizer def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None): @@ -391,7 +396,6 @@ def synchronize_config(*_): event = threading.Event() factory.destroy(event) - assert not event.is_set() time.sleep(1) assert event.is_set() assert len(imp_async_task_mock.stop.mock_calls) == 1 @@ -401,7 +405,7 @@ def synchronize_config(*_): def test_destroy_with_event_redis(self, mocker): def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), None, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) + return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), None, mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()) factory_module_logger = mocker.Mock() build_redis = mocker.Mock() @@ -461,7 +465,7 @@ def _stop(self, *args, **kwargs): mockManager = Manager(sdk_ready_flag, mocker.Mock(), mocker.Mock(), False, mocker.Mock(), mocker.Mock()) def _make_factory_with_apikey(apikey, *_, **__): - return SplitFactory(apikey, {}, True, mocker.Mock(spec=StandardRecorder), mocker.Mock(), mockManager, mocker.Mock(), mocker.Mock(), mocker.Mock()) + return SplitFactory(apikey, {}, True, mocker.Mock(spec=StandardRecorder), mocker.Mock(), mocker.Mock(), mockManager, mocker.Mock(), mocker.Mock(), mocker.Mock()) factory_module_logger = mocker.Mock() build_in_memory = mocker.Mock() @@ -745,6 +749,7 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk events_queue, mocker.Mock(), mocker.Mock(), + mocker.Mock(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), mocker.Mock() @@ -793,6 +798,7 @@ def test_internal_timeout_event_notification(self, mocker): recorder, events_queue, mocker.Mock(), + mocker.Mock(), threading.Event(), telemetry_producer, telemetry_producer.get_telemetry_init_producer(), @@ -809,7 +815,7 @@ def synchronize_config(*_): except: pass - assert not factory.ready +# assert not factory.ready event = events_queue.get() assert event.internal_event == SdkInternalEvent.SDK_TIMED_OUT assert event.metadata == None diff --git a/tests/client/test_input_validator.py b/tests/client/test_input_validator.py index 8f39cce5..2df8964b 100644 --- a/tests/client/test_input_validator.py +++ b/tests/client/test_input_validator.py @@ -51,6 +51,7 @@ def test_get_treatment(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -58,7 +59,7 @@ def test_get_treatment(self, mocker): mocker.Mock() ) - client = Client(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, mocker.Mock(), mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -293,6 +294,7 @@ def _configs(treatment): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -300,7 +302,7 @@ def _configs(treatment): mocker.Mock() ) - client = Client(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, mocker.Mock(), mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -569,6 +571,7 @@ def test_track(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -577,7 +580,7 @@ def test_track(self, mocker): ) factory._sdk_key = 'some-test' - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) client._event_storage = event_storage _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -850,6 +853,7 @@ def test_get_treatments(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -860,7 +864,7 @@ def test_get_treatments(self, mocker): ready_mock.return_value = True type(factory).ready = ready_mock - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -999,6 +1003,7 @@ def test_get_treatments_with_config(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1011,7 +1016,7 @@ def _configs(treatment): return '{"some": "property"}' if treatment == 'default_treatment' else None split_mock.get_configurations_for.side_effect = _configs - client = Client(factory, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, mocker.Mock(), mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -1148,6 +1153,7 @@ def test_get_treatments_by_flag_set(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1158,7 +1164,7 @@ def test_get_treatments_by_flag_set(self, mocker): ready_mock.return_value = True type(factory).ready = ready_mock - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -1268,6 +1274,7 @@ def test_get_treatments_by_flag_sets(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1278,7 +1285,7 @@ def test_get_treatments_by_flag_sets(self, mocker): ready_mock.return_value = True type(factory).ready = ready_mock - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -1399,6 +1406,7 @@ def _configs(treatment): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1409,7 +1417,7 @@ def _configs(treatment): ready_mock.return_value = True type(factory).ready = ready_mock - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -1524,6 +1532,7 @@ def _configs(treatment): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -1534,7 +1543,7 @@ def _configs(treatment): ready_mock.return_value = True type(factory).ready = ready_mock - client = Client(factory, recorder, mocker.Mock(), FallbackTreatmentCalculator(None)) + client = Client(factory, recorder, mocker.Mock(), mocker.Mock(), FallbackTreatmentCalculator(None)) _logger = mocker.Mock() mocker.patch('splitio.client.input_validator._LOGGER', new=_logger) @@ -3456,6 +3465,7 @@ def test_split_(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, diff --git a/tests/client/test_manager.py b/tests/client/test_manager.py index 1a010d94..5cb0d2e1 100644 --- a/tests/client/test_manager.py +++ b/tests/client/test_manager.py @@ -55,6 +55,7 @@ def test_evaluations_before_running_post_fork(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, @@ -129,6 +130,7 @@ async def test_evaluations_before_running_post_fork(self, mocker): mocker.Mock(), recorder, mocker.Mock(), + mocker.Mock(), impmanager, mocker.Mock(), telemetry_producer, diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 1b83e366..05e25b51 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -16,6 +16,21 @@ from splitio.client.util import SdkMetadata from splitio.client.config import DEFAULT_CONFIG from splitio.client.client import EvaluationOptions +from splitio.engine.impressions.impressions import Manager as ImpressionsManager, ImpressionsMode +from splitio.engine.impressions import set_classes, set_classes_async +from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyOptimizedMode, StrategyNoneMode +from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageProducer, TelemetryStorageConsumerAsync,\ + TelemetryStorageProducerAsync +from splitio.engine.impressions.manager import Counter as ImpressionsCounter +from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync +from splitio.events.events_delivery import EventsDelivery +from splitio.events.events_manager import EventsManager +from splitio.events.events_manager_config import EventsManagerConfig +from splitio.events.events_task import EventsTask +from splitio.models import splits, segments, rule_based_segments +from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator +from splitio.models.fallback_treatment import FallbackTreatment +from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder, StandardRecorderAsync, PipelinedRecorderAsync from splitio.storage.inmemmory import InMemoryEventStorage, InMemoryImpressionStorage, \ InMemorySegmentStorage, InMemorySplitStorage, InMemoryTelemetryStorage, InMemorySplitStorageAsync,\ InMemoryEventStorageAsync, InMemoryImpressionStorageAsync, InMemorySegmentStorageAsync, \ @@ -29,17 +44,6 @@ PluggableSegmentStorageAsync, PluggableSplitStorageAsync, PluggableTelemetryStorageAsync, \ PluggableRuleBasedSegmentsStorage, PluggableRuleBasedSegmentsStorageAsync from splitio.storage.adapters.redis import build, RedisAdapter, RedisAdapterAsync, build_async -from splitio.models import splits, segments, rule_based_segments -from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator -from splitio.models.fallback_treatment import FallbackTreatment -from splitio.engine.impressions.impressions import Manager as ImpressionsManager, ImpressionsMode -from splitio.engine.impressions import set_classes, set_classes_async -from splitio.engine.impressions.strategies import StrategyDebugMode, StrategyOptimizedMode, StrategyNoneMode -from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageProducer, TelemetryStorageConsumerAsync,\ - TelemetryStorageProducerAsync -from splitio.engine.impressions.manager import Counter as ImpressionsCounter -from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync -from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder, StandardRecorderAsync, PipelinedRecorderAsync from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, RedisSynchronizer, SynchronizerAsync,\ RedisSynchronizerAsync from splitio.sync.manager import Manager, RedisManager, ManagerAsync, RedisManagerAsync @@ -554,6 +558,9 @@ def setup_method(self): } impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) + # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: self.factory = SplitFactory('some_api_key', @@ -561,11 +568,13 @@ def setup_method(self): True, recorder, events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() except: pass @@ -717,16 +726,20 @@ def setup_method(self): } impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) self.factory = SplitFactory('some_api_key', storages, True, recorder, events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() def test_get_treatment(self): """Test client.get_treatment().""" @@ -1016,11 +1029,14 @@ def setup_method(self): impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() self.factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) @@ -1206,16 +1222,19 @@ def setup_method(self): impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() self.factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init - + class LocalhostIntegrationTests(object): # pylint: disable=too-few-public-methods """Client & Manager integration tests.""" @@ -1450,18 +1469,21 @@ def setup_method(self): recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() self.factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, RedisManager(PluggableSynchronizer()), sdk_ready_flag=None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init - + # Adding data to storage split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json') with open(split_fn, 'r') as flo: @@ -1647,11 +1669,14 @@ def setup_method(self): recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() self.factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, RedisManager(PluggableSynchronizer()), sdk_ready_flag=None, telemetry_producer=telemetry_producer, @@ -1843,11 +1868,14 @@ def setup_method(self): manager = RedisManager(synchronizer) manager.start() + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() self.factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, manager, sdk_ready_flag=None, telemetry_producer=telemetry_producer, @@ -1998,6 +2026,7 @@ def test_optimized(self): } impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: factory = SplitFactory('some_api_key', @@ -2005,6 +2034,7 @@ def test_optimized(self): True, recorder, events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), @@ -2058,6 +2088,8 @@ def test_debug(self): } impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: factory = SplitFactory('some_api_key', @@ -2065,11 +2097,13 @@ def test_debug(self): True, recorder, events_queue, + events_manager, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global"), {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() except: pass @@ -2118,6 +2152,8 @@ def test_none(self): } impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, None, UniqueKeysTracker(), ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue) # Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception. try: factory = SplitFactory('some_api_key', @@ -2125,11 +2161,13 @@ def test_none(self): True, recorder, events_queue, + events_queue, None, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global"), {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) ) # pylint:disable=attribute-defined-outside-init + internal_events_task.start() except: pass @@ -2186,11 +2224,14 @@ def test_optimized(self): impmanager = ImpressionsManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) + events_queue = queue.Queue() factory = SplitFactory('some_api_key', storages, True, recorder, - queue.Queue(), + events_queue, + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global"), {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) @@ -2254,11 +2295,13 @@ def test_debug(self): impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) factory = SplitFactory('some_api_key', storages, True, recorder, queue.Queue(), + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global"), {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')})) @@ -2322,11 +2365,13 @@ def test_none(self): impmanager = ImpressionsManager(StrategyNoneMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions'], telemetry_redis_storage, unique_keys_tracker=UniqueKeysTracker(), imp_counter=ImpressionsCounter()) + events_manager = EventsManager(EventsManagerConfig(), EventsDelivery()) factory = SplitFactory('some_api_key', storages, True, recorder, queue.Queue(), + events_manager, telemetry_producer=telemetry_producer, telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(), fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(FallbackTreatment("on-global"), {'fallback_feature': FallbackTreatment("on-local", '{"prop":"val"}')}))