Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory
from splitio.engine.splitters import Splitter
from splitio.models.impressions import Impression, Label, ImpressionDecorated
from splitio.models.events import Event, EventWrapper
from splitio.models.events import Event, EventWrapper, SdkEvent
from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies
from splitio.client import input_validator
from splitio.util.time import get_current_epoch_time_ms, utctime_ms
Expand Down Expand Up @@ -224,7 +224,7 @@ def _check_impression_label(self, result):
class Client(ClientBase): # pylint: disable=too-many-instance-attributes
"""Entry point for the split sdk."""

def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None):
def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None):
"""
Construct a Client instance.

Expand All @@ -240,6 +240,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca
:rtype: Client
"""
ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator)
self._events_manager = events_manager
self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))

def destroy(self):
Expand All @@ -249,7 +250,24 @@ def destroy(self):
Only applicable when using in-memory operation mode.
"""
self._factory.destroy()

def on(self, sdk_event, callback_handle):
if not self._validate_sdk_event_info(sdk_event, callback_handle):
return

self._events_manager.register(sdk_event, callback_handle)

def _validate_sdk_event_info(self, sdk_event, callback_handle):
if not isinstance(sdk_event, SdkEvent):
_LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.")
return False

if not hasattr(callback_handle, '__call__'):
_LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.")
return False

return True

def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None):
"""
Get the treatment for a feature flag and key, with an optional dictionary of attributes.
Expand Down
43 changes: 30 additions & 13 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
"""A module for Split.io Factories."""
import logging
import threading
Expand All @@ -19,8 +20,11 @@
TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
from splitio.events.events_manager import EventsManager
from splitio.events.events_manager_config import EventsManagerConfig
from splitio.events.events_task import EventsTask
from splitio.events.events_delivery import EventsDelivery
from splitio.models.fallback_config import FallbackTreatmentCalculator
from splitio.events.events_metadata import EventsMetadata, SdkEventType
from splitio.models.notification import SdkInternalEventNotification
from splitio.models.events import SdkInternalEvent

Expand Down Expand Up @@ -171,6 +175,7 @@ def __init__( # pylint: disable=too-many-arguments
labels_enabled,
recorder,
internal_events_queue,
events_manager,
sync_manager=None,
sdk_ready_flag=None,
telemetry_producer=None,
Expand Down Expand Up @@ -210,6 +215,7 @@ def __init__( # pylint: disable=too-many-arguments
self._sdk_internal_ready_flag = sdk_ready_flag
self._fallback_treatment_calculator = fallback_treatment_calculator
self._internal_events_queue = internal_events_queue
self._events_manager = events_manager
self._start_status_updater()

def _start_status_updater(self):
Expand Down Expand Up @@ -254,7 +260,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return Client(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator)
return Client(self, self._recorder, self._events_manager, self._labels_enabled, self._fallback_treatment_calculator)

def manager(self):
"""
Expand Down Expand Up @@ -298,6 +304,7 @@ def destroy(self, destroyed_event=None):

try:
_LOGGER.info('Factory destroy called, stopping tasks.')
self._events_manager.destroy()
if self._sync_manager is not None:
if destroyed_event is not None:

Expand Down Expand Up @@ -559,6 +566,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
}

internal_events_queue = queue.Queue()
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTask(events_manager.notify_internal_event, internal_events_queue)
storages = {
'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
'segments': InMemorySegmentStorage(internal_events_queue),
Expand Down Expand Up @@ -608,6 +617,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
TelemetrySyncTask(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
unique_keys_task,
clear_filter_task,
internal_events_task
)

synchronizer = Synchronizer(synchronizers, tasks)
Expand Down Expand Up @@ -639,14 +649,14 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
synchronizer._split_synchronizers._segment_sync.shutdown()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, internal_events_queue, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
recorder, internal_events_queue, events_manager, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, internal_events_queue, manager, sdk_ready_flag,
recorder, internal_events_queue, events_manager, manager, sdk_ready_flag,
telemetry_producer, telemetry_init_producer,
telemetry_submitter, fallback_treatment_calculator = FallbackTreatmentCalculator(cfg['fallbackTreatments']))

Expand Down Expand Up @@ -837,13 +847,15 @@ def _build_redis_factory(api_key, cfg):

telemetry_init_producer.record_config(cfg, {}, 0, 0)
internal_events_queue = queue.Queue()

events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())

split_factory = SplitFactory(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
events_manager,
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
Expand Down Expand Up @@ -1005,13 +1017,15 @@ def _build_pluggable_factory(api_key, cfg):

telemetry_init_producer.record_config(cfg, {}, 0, 0)
internal_events_queue = queue.Queue()
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())

split_factory = SplitFactory(
api_key,
storages,
cfg['labelsEnabled'],
recorder,
internal_events_queue,
events_manager,
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
Expand Down Expand Up @@ -1167,13 +1181,15 @@ def _build_localhost_factory(cfg):
telemetry_runtime_producer
)
internal_events_queue = queue.Queue()

events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())

return SplitFactory(
'localhost',
storages,
False,
recorder,
internal_events_queue,
events_manager,
manager,
ready_event,
telemetry_producer=telemetry_producer,
Expand Down Expand Up @@ -1259,13 +1275,14 @@ def get_factory(api_key, **kwargs):
_INSTANTIATED_FACTORIES_LOCK.acquire()
if _INSTANTIATED_FACTORIES:
if api_key in _INSTANTIATED_FACTORIES:
_LOGGER.warning(
"factory instantiation: You already have %d %s with this SDK Key. "
"We recommend keeping only one instance of the factory at all times "
"(Singleton pattern) and reusing it throughout your application.",
_INSTANTIATED_FACTORIES[api_key],
'factory' if _INSTANTIATED_FACTORIES[api_key] == 1 else 'factories'
)
if _INSTANTIATED_FACTORIES[api_key] > 0:
_LOGGER.warning(
"factory instantiation: You already have %d %s with this SDK Key. "
"We recommend keeping only one instance of the factory at all times "
"(Singleton pattern) and reusing it throughout your application.",
_INSTANTIATED_FACTORIES[api_key],
'factory' if _INSTANTIATED_FACTORIES[api_key] == 1 else 'factories'
)
else:
_LOGGER.warning(
"factory instantiation: You already have an instance of the Split factory. "
Expand Down
5 changes: 5 additions & 0 deletions splitio/events/events_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def notify_internal_event(self, sdk_internal_event, event_metadata):
notify_event.start()
self._set_sdk_event_triggered(sorted_event)

def destroy(self):
with self._lock:
self._active_subscriptions = {}
self._internal_events_status = {}

def _event_already_triggered(self, sdk_event):
if self._active_subscriptions.get(sdk_event) != None:
return self._active_subscriptions.get(sdk_event).triggered
Expand Down
13 changes: 7 additions & 6 deletions splitio/events/events_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,20 @@ def _run(self):
def start(self):
"""Start worker."""
if self.is_running():
_LOGGER.debug('Worker is already running')
_LOGGER.debug('SDK Event Worker is already running')
return

self._running = True

_LOGGER.debug('Starting Event Task worker')
_LOGGER.debug('Starting SDK Event Task worker')
self._worker = threading.Thread(target=self._run, name='EventsTaskWorker', daemon=True)
self._worker.start()

def stop(self):
def stop(self, stop_flag=None):
"""Stop worker."""
_LOGGER.debug('Stopping Event Task worker')
_LOGGER.debug('Stopping SDK Event Task worker')
if not self.is_running():
_LOGGER.debug('Worker is not running. Ignoring.')
_LOGGER.debug('SDK Event Worker is not running. Ignoring.')
return

self._running = False
self._internal_events_queue.put(self._centinel)
18 changes: 17 additions & 1 deletion splitio/sync/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SplitTasks(object):
"""SplitTasks."""

def __init__(self, feature_flag_task, segment_task, impressions_task, events_task, # pylint:disable=too-many-arguments
impressions_count_task, telemetry_task=None, unique_keys_task = None, clear_filter_task = None):
impressions_count_task, telemetry_task=None, unique_keys_task = None, clear_filter_task = None, internal_events_task=None):
"""
Class constructor.

Expand All @@ -113,6 +113,7 @@ def __init__(self, feature_flag_task, segment_task, impressions_task, events_tas
self._unique_keys_task = unique_keys_task
self._clear_filter_task = clear_filter_task
self._telemetry_task = telemetry_task
self._internal_events_task = internal_events_task

@property
def split_task(self):
Expand Down Expand Up @@ -154,6 +155,11 @@ def telemetry_task(self):
"""Return clear filter sync task."""
return self._telemetry_task

@property
def internal_events_task(self):
"""Return internal events task."""
return self._internal_events_task

class BaseSynchronizer(object, metaclass=abc.ABCMeta):
"""Synchronizer interface."""

Expand Down Expand Up @@ -323,6 +329,9 @@ def start_periodic_data_recording(self):
for task in self._periodic_data_recording_tasks:
task.start()

if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.start()

def stop_periodic_data_recording(self, blocking):
"""
Stop recorders.
Expand Down Expand Up @@ -477,6 +486,9 @@ def stop_periodic_data_recording(self, blocking):
:type blocking: bool
"""
_LOGGER.debug('Stopping periodic data recording')
if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.stop()

if blocking:
events = []
for task in self._periodic_data_recording_tasks:
Expand Down Expand Up @@ -871,6 +883,8 @@ def start_periodic_fetching(self):
self._split_tasks.split_task.start()
if self._split_tasks.segment_task is not None:
self._split_tasks.segment_task.start()
if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.start()

def stop_periodic_fetching(self):
"""Stop fetchers for feature flags and segments."""
Expand Down Expand Up @@ -948,6 +962,8 @@ def stop_periodic_fetching(self):
self._split_tasks.split_task.stop()
if self._split_tasks.segment_task is not None:
self._split_tasks.segment_task.stop()
if self._split_tasks.internal_events_task:
self._split_tasks.internal_events_task.stop()

def synchronize_splits(self):
"""Synchronize all feature flags."""
Expand Down
Loading