Skip to content

Commit 0989330

Browse files
authored
Merge pull request #612 from splitio/FME-12222-sdk-events-split-storage
Updated split storage
2 parents 27792de + 6e3ea36 commit 0989330

File tree

12 files changed

+163
-52
lines changed

12 files changed

+163
-52
lines changed

splitio/client/factory.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
from collections import Counter
55
from enum import Enum
6+
import queue
67

78
from splitio.optional.loaders import asyncio
89
from splitio.client.client import Client, ClientAsync
@@ -546,9 +547,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
546547
'events': EventsAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
547548
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
548549
}
549-
550+
551+
events_queue = queue.Queue()
550552
storages = {
551-
'splits': InMemorySplitStorage(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
553+
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
552554
'segments': InMemorySegmentStorage(),
553555
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
554556
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
@@ -1096,8 +1098,9 @@ def _build_localhost_factory(cfg):
10961098
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
10971099
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
10981100

1101+
events_queue = queue.Queue()
10991102
storages = {
1100-
'splits': InMemorySplitStorage(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
1103+
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
11011104
'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors.
11021105
'rule_based_segments': InMemoryRuleBasedSegmentStorage(),
11031106
'impressions': LocalhostImpressionsStorage(),

splitio/storage/inmemmory.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
from splitio.models.segments import Segment
88
from splitio.models.telemetry import HTTPErrors, HTTPLatencies, MethodExceptions, MethodLatencies, LastSynchronization, StreamingEvents, TelemetryConfig, TelemetryCounters, CounterConstants, \
99
HTTPErrorsAsync, HTTPLatenciesAsync, MethodExceptionsAsync, MethodLatenciesAsync, LastSynchronizationAsync, StreamingEventsAsync, TelemetryConfigAsync, TelemetryCountersAsync
10+
from splitio.models.events import SdkInternalEvent
11+
from splitio.events.events_metadata import EventsMetadata, SdkEventType
12+
from splitio.models.notification import SdkInternalEventNotification
1013
from splitio.storage import FlagSetsFilter, SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage, RuleBasedSegmentsStorage
1114
from splitio.optional.loaders import asyncio
1215

@@ -479,14 +482,15 @@ def _decrease_traffic_type_count(self, traffic_type_name):
479482
class InMemorySplitStorage(InMemorySplitStorageBase):
480483
"""InMemory implementation of a feature flag storage."""
481484

482-
def __init__(self, flag_sets=[]):
485+
def __init__(self, internal_event_queue, flag_sets=[]):
483486
"""Constructor."""
484487
self._lock = threading.RLock()
485488
self._feature_flags = {}
486489
self._change_number = -1
487490
self._traffic_types = Counter()
488491
self.flag_set = FlagSets(flag_sets)
489492
self.flag_set_filter = FlagSetsFilter(flag_sets)
493+
self._internal_event_queue = internal_event_queue
490494

491495
def clear(self):
492496
"""
@@ -535,6 +539,13 @@ def update(self, to_add, to_delete, new_change_number):
535539
[self._put(add_feature_flag) for add_feature_flag in to_add]
536540
[self._remove(delete_feature_flag) for delete_feature_flag in to_delete]
537541
self._set_change_number(new_change_number)
542+
to_notify = []
543+
[to_notify.append(feature.name) for feature in to_add]
544+
to_notify.extend(to_delete)
545+
self._internal_event_queue.put(
546+
SdkInternalEventNotification(
547+
SdkInternalEvent.FLAGS_UPDATED,
548+
EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify))))
538549

539550
def _put(self, feature_flag):
540551
"""
@@ -680,6 +691,10 @@ def kill_locally(self, feature_flag_name, default_treatment, change_number):
680691
return
681692
feature_flag.local_kill(default_treatment, change_number)
682693
self._put(feature_flag)
694+
self._internal_event_queue.put(
695+
SdkInternalEventNotification(
696+
SdkInternalEvent.FLAG_KILLED_NOTIFICATION,
697+
EventsMetadata(SdkEventType.FLAG_UPDATE, {feature_flag_name})))
683698

684699
def is_flag_set_exist(self, flag_set):
685700
"""

tests/client/test_client.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import unittest.mock as mock
77
import time
88
import pytest
9+
import queue
910

1011
from splitio.client.client import Client, _LOGGER as _logger, CONTROL, ClientAsync, EvaluationOptions
1112
from splitio.client.factory import SplitFactory, Status as FactoryStatus, SplitFactoryAsync
@@ -36,7 +37,8 @@ def test_get_treatment(self, mocker):
3637
"""Test get_treatment execution paths."""
3738
telemetry_storage = InMemoryTelemetryStorage()
3839
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
39-
split_storage = InMemorySplitStorage()
40+
events_queue = queue.Queue()
41+
split_storage = InMemorySplitStorage(events_queue)
4042
segment_storage = InMemorySegmentStorage()
4143
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
4244
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -113,7 +115,8 @@ def test_get_treatment_with_config(self, mocker):
113115
"""Test get_treatment execution paths."""
114116
telemetry_storage = InMemoryTelemetryStorage()
115117
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
116-
split_storage = InMemorySplitStorage()
118+
events_queue = queue.Queue()
119+
split_storage = InMemorySplitStorage(events_queue)
117120
segment_storage = InMemorySegmentStorage()
118121
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
119122
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -190,7 +193,8 @@ def test_get_treatments(self, mocker):
190193
"""Test get_treatment execution paths."""
191194
telemetry_storage = InMemoryTelemetryStorage()
192195
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
193-
split_storage = InMemorySplitStorage()
196+
events_queue = queue.Queue()
197+
split_storage = InMemorySplitStorage(events_queue)
194198
segment_storage = InMemorySegmentStorage()
195199
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
196200
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -270,7 +274,8 @@ def test_get_treatments_by_flag_set(self, mocker):
270274
"""Test get_treatment execution paths."""
271275
telemetry_storage = InMemoryTelemetryStorage()
272276
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
273-
split_storage = InMemorySplitStorage()
277+
events_queue = queue.Queue()
278+
split_storage = InMemorySplitStorage(events_queue)
274279
segment_storage = InMemorySegmentStorage()
275280
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
276281
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -349,7 +354,8 @@ def test_get_treatments_by_flag_sets(self, mocker):
349354
"""Test get_treatment execution paths."""
350355
telemetry_storage = InMemoryTelemetryStorage()
351356
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
352-
split_storage = InMemorySplitStorage()
357+
events_queue = queue.Queue()
358+
split_storage = InMemorySplitStorage(events_queue)
353359
segment_storage = InMemorySegmentStorage()
354360
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
355361
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -428,7 +434,8 @@ def test_get_treatments_with_config(self, mocker):
428434
"""Test get_treatment execution paths."""
429435
telemetry_storage = InMemoryTelemetryStorage()
430436
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
431-
split_storage = InMemorySplitStorage()
437+
events_queue = queue.Queue()
438+
split_storage = InMemorySplitStorage(events_queue)
432439
segment_storage = InMemorySegmentStorage()
433440
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
434441
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -511,7 +518,8 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
511518
"""Test get_treatment execution paths."""
512519
telemetry_storage = InMemoryTelemetryStorage()
513520
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
514-
split_storage = InMemorySplitStorage()
521+
events_queue = queue.Queue()
522+
split_storage = InMemorySplitStorage(events_queue)
515523
segment_storage = InMemorySegmentStorage()
516524
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
517525
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -591,7 +599,8 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker):
591599
"""Test get_treatment execution paths."""
592600
telemetry_storage = InMemoryTelemetryStorage()
593601
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
594-
split_storage = InMemorySplitStorage()
602+
events_queue = queue.Queue()
603+
split_storage = InMemorySplitStorage(events_queue)
595604
segment_storage = InMemorySegmentStorage()
596605
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
597606
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -671,7 +680,8 @@ def test_impression_toggle_optimized(self, mocker):
671680
"""Test get_treatment execution paths."""
672681
telemetry_storage = InMemoryTelemetryStorage()
673682
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
674-
split_storage = InMemorySplitStorage()
683+
events_queue = queue.Queue()
684+
split_storage = InMemorySplitStorage(events_queue)
675685
segment_storage = InMemorySegmentStorage()
676686
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
677687
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -735,7 +745,8 @@ def test_impression_toggle_debug(self, mocker):
735745
"""Test get_treatment execution paths."""
736746
telemetry_storage = InMemoryTelemetryStorage()
737747
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
738-
split_storage = InMemorySplitStorage()
748+
events_queue = queue.Queue()
749+
split_storage = InMemorySplitStorage(events_queue)
739750
segment_storage = InMemorySegmentStorage()
740751
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
741752
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -799,7 +810,8 @@ def test_impression_toggle_none(self, mocker):
799810
"""Test get_treatment execution paths."""
800811
telemetry_storage = InMemoryTelemetryStorage()
801812
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
802-
split_storage = InMemorySplitStorage()
813+
events_queue = queue.Queue()
814+
split_storage = InMemorySplitStorage(events_queue)
803815
segment_storage = InMemorySegmentStorage()
804816
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
805817
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
@@ -939,7 +951,8 @@ def test_evaluations_before_running_post_fork(self, mocker):
939951
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
940952
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
941953
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
942-
split_storage = InMemorySplitStorage()
954+
events_queue = queue.Queue()
955+
split_storage = InMemorySplitStorage(events_queue)
943956
segment_storage = InMemorySegmentStorage()
944957
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
945958
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
@@ -1020,7 +1033,8 @@ def test_telemetry_not_ready(self, mocker):
10201033
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
10211034
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
10221035
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
1023-
split_storage = InMemorySplitStorage()
1036+
events_queue = queue.Queue()
1037+
split_storage = InMemorySplitStorage(events_queue)
10241038
segment_storage = InMemorySegmentStorage()
10251039
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
10261040
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
@@ -1053,7 +1067,8 @@ def synchronize_config(*_):
10531067
factory.destroy()
10541068

10551069
def test_telemetry_record_treatment_exception(self, mocker):
1056-
split_storage = InMemorySplitStorage()
1070+
events_queue = queue.Queue()
1071+
split_storage = InMemorySplitStorage(events_queue)
10571072
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
10581073
segment_storage = mocker.Mock(spec=SegmentStorage)
10591074
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
@@ -1158,7 +1173,8 @@ def test_telemetry_method_latency(self, mocker):
11581173
impression_storage = InMemoryImpressionStorage(10, telemetry_runtime_producer)
11591174
event_storage = mocker.Mock(spec=EventStorage)
11601175
impmanager = ImpressionManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer)
1161-
split_storage = InMemorySplitStorage()
1176+
events_queue = queue.Queue()
1177+
split_storage = InMemorySplitStorage(events_queue)
11621178
segment_storage = InMemorySegmentStorage()
11631179
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
11641180
split_storage.update([from_raw(splits_json['splitChange1_1']['ff']['d'][0])], [], -1)
@@ -1270,7 +1286,8 @@ def test_impressions_properties(self, mocker):
12701286
"""Test get_treatment execution paths."""
12711287
telemetry_storage = InMemoryTelemetryStorage()
12721288
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
1273-
split_storage = InMemorySplitStorage()
1289+
events_queue = queue.Queue()
1290+
split_storage = InMemorySplitStorage(events_queue)
12741291
segment_storage = InMemorySegmentStorage()
12751292
rb_segment_storage = InMemoryRuleBasedSegmentStorage()
12761293
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()

tests/client/test_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""SDK main manager test module."""
22
import pytest
3+
import queue
34

45
from splitio.client.factory import SplitFactory
56
from splitio.client.manager import SplitManager, SplitManagerAsync, _LOGGER as _logger
@@ -16,7 +17,8 @@ class SplitManagerTests(object): # pylint: disable=too-few-public-methods
1617
def test_manager_calls(self, mocker):
1718
telemetry_storage = InMemoryTelemetryStorage()
1819
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
19-
storage = InMemorySplitStorage()
20+
events_queue = queue.Queue()
21+
storage = InMemorySplitStorage(events_queue)
2022

2123
factory = mocker.Mock(spec=SplitFactory)
2224
factory._storages = {'split': storage}

tests/engine/test_evaluator.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import pytest
66
import copy
7+
import queue
78

89
from splitio.models.splits import Split, Status, from_raw, Prerequisites
910
from splitio.models import segments
@@ -261,7 +262,8 @@ def test_evaluate_treatment_with_rule_based_segment(self, mocker):
261262

262263
def test_evaluate_treatment_with_rbs_in_condition(self):
263264
e = evaluator.Evaluator(splitters.Splitter())
264-
splits_storage = InMemorySplitStorage()
265+
events_queue = queue.Queue()
266+
splits_storage = InMemorySplitStorage(events_queue)
265267
rbs_storage = InMemoryRuleBasedSegmentStorage()
266268
segment_storage = InMemorySegmentStorage()
267269
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
@@ -287,7 +289,8 @@ def test_using_segment_in_excluded(self):
287289
with open(rbs_segments, 'r') as flo:
288290
data = json.loads(flo.read())
289291
e = evaluator.Evaluator(splitters.Splitter())
290-
splits_storage = InMemorySplitStorage()
292+
events_queue = queue.Queue()
293+
splits_storage = InMemorySplitStorage(events_queue)
291294
rbs_storage = InMemoryRuleBasedSegmentStorage()
292295
segment_storage = InMemorySegmentStorage()
293296
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
@@ -311,7 +314,8 @@ def test_using_rbs_in_excluded(self):
311314
with open(rbs_segments, 'r') as flo:
312315
data = json.loads(flo.read())
313316
e = evaluator.Evaluator(splitters.Splitter())
314-
splits_storage = InMemorySplitStorage()
317+
events_queue = queue.Queue()
318+
splits_storage = InMemorySplitStorage(events_queue)
315319
rbs_storage = InMemoryRuleBasedSegmentStorage()
316320
segment_storage = InMemorySegmentStorage()
317321
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
@@ -334,7 +338,8 @@ def test_prerequisites(self):
334338
with open(splits_load, 'r') as flo:
335339
data = json.loads(flo.read())
336340
e = evaluator.Evaluator(splitters.Splitter())
337-
splits_storage = InMemorySplitStorage()
341+
events_queue = queue.Queue()
342+
splits_storage = InMemorySplitStorage(events_queue)
338343
rbs_storage = InMemoryRuleBasedSegmentStorage()
339344
segment_storage = InMemorySegmentStorage()
340345
evaluation_facctory = EvaluationDataFactory(splits_storage, segment_storage, rbs_storage)
@@ -542,7 +547,8 @@ def test_get_context(self):
542547
"""Test context."""
543548
mocked_split = Split('some', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [Prerequisites('split2', ['on'])])
544549
split2 = Split('split2', 12345, False, 'off', 'user', Status.ACTIVE, 12, split_conditions, 1.2, 100, 1234, {}, None, False, [])
545-
flag_storage = InMemorySplitStorage([])
550+
events_queue = queue.Queue()
551+
flag_storage = InMemorySplitStorage(events_queue, [])
546552
segment_storage = InMemorySegmentStorage()
547553
rbs_segment_storage = InMemoryRuleBasedSegmentStorage()
548554
flag_storage.update([mocked_split, split2], [], -1)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"ff": {"t": -1, "s": -1, "d": [{"changeNumber": 10, "trafficTypeName": "user", "name": "rbs_feature_flag", "trafficAllocation": 100, "trafficAllocationSeed": 1828377380, "seed": -286617921, "status": "ACTIVE", "killed": false, "defaultTreatment": "off", "algo": 2, "conditions": [{"conditionType": "ROLLOUT", "matcherGroup": {"combiner": "AND", "matchers": [{"keySelector": {"trafficType": "user"}, "matcherType": "IN_RULE_BASED_SEGMENT", "negate": false, "userDefinedSegmentMatcherData": {"segmentName": "sample_rule_based_segment"}}]}, "partitions": [{"treatment": "on", "size": 100}, {"treatment": "off", "size": 0}], "label": "in rule based segment sample_rule_based_segment"}, {"conditionType": "ROLLOUT", "matcherGroup": {"combiner": "AND", "matchers": [{"keySelector": {"trafficType": "user"}, "matcherType": "ALL_KEYS", "negate": false}]}, "partitions": [{"treatment": "on", "size": 0}, {"treatment": "off", "size": 100}], "label": "default rule"}], "configurations": {}, "sets": [], "impressionsDisabled": false}]}, "rbs": {"t": 1675259356568, "s": -1, "d": [{"changeNumber": 5, "name": "sample_rule_based_segment", "status": "ACTIVE", "trafficTypeName": "user", "excluded": {"keys": ["mauro@split.io", "gaston@split.io"], "segments": []}, "conditions": [{"matcherGroup": {"combiner": "AND", "matchers": [{"keySelector": {"trafficType": "user", "attribute": "email"}, "matcherType": "ENDS_WITH", "negate": false, "whitelistMatcherData": {"whitelist": ["@split.io"]}}]}}]}]}}
1+
{"ff": {"t": -1, "s": -1, "d": [{"name": "SPLIT_1", "status": "ACTIVE", "killed": false, "defaultTreatment": "off", "configurations": {}, "conditions": []}]}, "rbs": {"t": -1, "s": -1, "d": [{"changeNumber": 12, "name": "some_segment", "status": "ACTIVE", "trafficTypeName": "user", "excluded": {"keys": [], "segments": []}, "conditions": []}]}}

0 commit comments

Comments
 (0)