|
| 1 | +import threading |
| 2 | +from distutils.util import strtobool |
| 3 | +from typing import Any, Iterable, Mapping, Optional |
| 4 | + |
| 5 | +from ffcclient.category import FFC_FEATURE_FLAGS, FFC_SEGMENTS |
| 6 | +from ffcclient.common_types import EvalDetail, FFCUser |
| 7 | +from ffcclient.config import Config |
| 8 | +from ffcclient.evaluator import (REASON_CLIENT_NOT_READY, REASON_ERROR, |
| 9 | + REASON_FLAG_NOT_FOUND, |
| 10 | + REASON_USER_NOT_SPECIFIED, Evaluator) |
| 11 | +from ffcclient.event_processor import DefaultEventProcessor, NullEventProcessor |
| 12 | +from ffcclient.event_types import FlagEvent, Metric, MetricEvent |
| 13 | +from ffcclient.interfaces import DataUpdateStatusProvider |
| 14 | +from ffcclient.status import DataUpdateStatusProviderIml |
| 15 | +from ffcclient.streaming import Streaming |
| 16 | +from ffcclient.update_processor import NullUpdateProcessor |
| 17 | +from ffcclient.utils import check_uwsgi, get_feature_flag_id, log |
| 18 | +from ffcclient.utils.repeatable_task import RepeatableTaskSchedule |
| 19 | + |
| 20 | + |
| 21 | +class FFCClient: |
| 22 | + |
| 23 | + def __init__(self, config: Config, start_wait: int = 15): |
| 24 | + |
| 25 | + check_uwsgi() |
| 26 | + |
| 27 | + self._config = config |
| 28 | + self._config.validate() |
| 29 | + |
| 30 | + # init scheduler |
| 31 | + self._scheduler = RepeatableTaskSchedule() |
| 32 | + self._scheduler.start() |
| 33 | + |
| 34 | + # init components |
| 35 | + # event processor |
| 36 | + self._event_processor = self._build_event_processor(config) |
| 37 | + # data storage |
| 38 | + self._data_storage = config.data_storage |
| 39 | + # evaluator |
| 40 | + self._evaluator = Evaluator(lambda key: self._data_storage.get(FFC_FEATURE_FLAGS, key), |
| 41 | + lambda key: self._data_storage.get(FFC_SEGMENTS, key)) |
| 42 | + # data updator and status provider |
| 43 | + self._update_status_provider = DataUpdateStatusProviderIml(config.data_storage) |
| 44 | + # update processor |
| 45 | + update_processor_ready = threading.Event() |
| 46 | + self._update_processor = self._build_update_processor(config, self._update_status_provider, |
| 47 | + update_processor_ready) |
| 48 | + # data sync |
| 49 | + self._update_processor.start() |
| 50 | + if not self._config.is_offline and start_wait > 0: |
| 51 | + log.info("Waiting for Client initialization in %s seconds" % str(start_wait)) |
| 52 | + update_processor_ready.wait(start_wait) |
| 53 | + |
| 54 | + if self._config.is_offline: |
| 55 | + log.info('Python SDK in offline mode') |
| 56 | + elif self._update_processor.initialized: |
| 57 | + log.info('Python SDK Client initialization completed') |
| 58 | + else: |
| 59 | + log.warning('Python SDK Client was not successfully initialized') |
| 60 | + |
| 61 | + def _build_event_processor(self, config: Config): |
| 62 | + if config.event_processor_imp: |
| 63 | + log.info("Using user-specified event processor: %s" % str(config.event_processor_imp)) |
| 64 | + return config.event_processor_imp(config) |
| 65 | + |
| 66 | + if config.is_offline: |
| 67 | + log.info("Offline mode, SDK disable event processing") |
| 68 | + return NullEventProcessor(config) |
| 69 | + |
| 70 | + return DefaultEventProcessor(config) |
| 71 | + |
| 72 | + def _build_update_processor(self, config: Config, update_status_provider, update_processor_event): |
| 73 | + if config.update_processor_imp: |
| 74 | + log.info("Using user-specified update processor: %s" % str(config.update_processor_imp)) |
| 75 | + return config.update_processor_imp(config, update_status_provider, update_processor_event) |
| 76 | + |
| 77 | + if config.is_offline: |
| 78 | + log.info("Offline mode, SDK disable streaming data updating") |
| 79 | + return NullUpdateProcessor(config, update_status_provider, update_processor_event) |
| 80 | + |
| 81 | + return Streaming(config, update_status_provider, update_processor_event) |
| 82 | + |
| 83 | + @property |
| 84 | + def initialize(self) -> bool: |
| 85 | + return self._update_processor.initialized |
| 86 | + |
| 87 | + @property |
| 88 | + def update_status_provider(self) -> DataUpdateStatusProvider: |
| 89 | + return self._update_status_provider |
| 90 | + |
| 91 | + def stop(self): |
| 92 | + log.info("Python SDK client is closing...") |
| 93 | + self._update_processor.stop() |
| 94 | + self._event_processor.stop() |
| 95 | + self._scheduler.stop() |
| 96 | + |
| 97 | + def is_offline(self) -> bool: |
| 98 | + return self._config.is_offline |
| 99 | + |
| 100 | + def _get_flag_internal(self, key: str) -> Optional[dict]: |
| 101 | + flag_id = get_feature_flag_id(self._config.env_secret, key) |
| 102 | + return self._data_storage.get(FFC_FEATURE_FLAGS, flag_id) |
| 103 | + |
| 104 | + def _evaluate_internal(self, key: str, user: dict, default: Any = None) -> EvalDetail: |
| 105 | + default_value = self._config.get_default_value(key, default) |
| 106 | + try: |
| 107 | + if not self.initialize: |
| 108 | + log.warn('Evaluation called before Java SDK client initialized for feature flag, well using the default value') |
| 109 | + return EvalDetail.error(REASON_CLIENT_NOT_READY, default_value, key) |
| 110 | + |
| 111 | + if not key: |
| 112 | + log.info('null feature flag key; returning default value') |
| 113 | + return EvalDetail.error(REASON_FLAG_NOT_FOUND, default_value, key) |
| 114 | + |
| 115 | + flag = self._get_flag_internal(key) |
| 116 | + if not flag: |
| 117 | + log.info('Unknown feature flag %s; returning default value' % key) |
| 118 | + return EvalDetail.error(REASON_FLAG_NOT_FOUND, default_value, key) |
| 119 | + |
| 120 | + try: |
| 121 | + ffc_user = FFCUser.from_dict(user) |
| 122 | + except ValueError as ve: |
| 123 | + log.warn(str(ve)) |
| 124 | + return EvalDetail.error(REASON_USER_NOT_SPECIFIED, default_value, key) |
| 125 | + |
| 126 | + ffc_event = FlagEvent(ffc_user) |
| 127 | + ed = self._evaluator.evaluate(flag, ffc_user, ffc_event) |
| 128 | + self._event_processor.send_event(ffc_event) |
| 129 | + return ed |
| 130 | + |
| 131 | + except Exception as e: |
| 132 | + log.exception('unexpected error in evaluation: %s' % str(e)) |
| 133 | + return EvalDetail.error(REASON_ERROR, default_value, key) |
| 134 | + |
| 135 | + def variation(self, key: str, user: dict, default: Any = None) -> Any: |
| 136 | + return self._evaluate_internal(key, user, default).variation() |
| 137 | + |
| 138 | + def variation_detail(self, key: str, user: dict, default: Any = None) -> EvalDetail: |
| 139 | + return self._evaluate_internal(key, user, default) |
| 140 | + |
| 141 | + def is_enabled(self, key: str, user: dict) -> bool: |
| 142 | + try: |
| 143 | + value = self.variation(key, user, 'off') |
| 144 | + return strtobool(str(value)) |
| 145 | + except ValueError: |
| 146 | + return False |
| 147 | + |
| 148 | + def get_all_latest_flag_variations(self, user: dict) -> Iterable[EvalDetail]: |
| 149 | + all_flag_details = [] |
| 150 | + try: |
| 151 | + if not self.initialize: |
| 152 | + log.warn('Evaluation called before Java SDK client initialized for feature flag') |
| 153 | + all_flag_details.append(EvalDetail.error(REASON_CLIENT_NOT_READY)) |
| 154 | + else: |
| 155 | + try: |
| 156 | + ffc_user = FFCUser.from_dict(user) |
| 157 | + all_flags = self._data_storage.get_all(FFC_FEATURE_FLAGS) |
| 158 | + all_flag_details.extend([self._evaluator.evaluate(flag, ffc_user) for flag in all_flags.values()]) |
| 159 | + except ValueError as ve: |
| 160 | + log.warn(str(ve)) |
| 161 | + all_flag_details.append(EvalDetail.error(REASON_CLIENT_NOT_READY)) |
| 162 | + except: |
| 163 | + raise |
| 164 | + except Exception as e: |
| 165 | + log.exception('unexpected error in evaluation: %s' % str(e)) |
| 166 | + all_flag_details.append(EvalDetail.error(REASON_ERROR)) |
| 167 | + return all_flag_details |
| 168 | + |
| 169 | + def is_flag_known(self, key: str) -> bool: |
| 170 | + try: |
| 171 | + if not self.initialize: |
| 172 | + log.warn('isFlagKnown called before Java SDK client initialized for feature flag') |
| 173 | + return False |
| 174 | + return self._get_flag_internal(key) is not None |
| 175 | + except Exception as e: |
| 176 | + log.exception('unexpected error in isFlagKnown: %s' % str(e)) |
| 177 | + return False |
| 178 | + |
| 179 | + def flush(self): |
| 180 | + self._event_processor.flush() |
| 181 | + |
| 182 | + def track_metric(self, user: dict, event_name: str, metric_value: float = 1.0): |
| 183 | + if not user or not event_name or metric_value <= 0: |
| 184 | + log.warn('event/user/metric invalid') |
| 185 | + return |
| 186 | + try: |
| 187 | + ffc_user = FFCUser.from_dict(user) |
| 188 | + metric_event = MetricEvent(ffc_user).add(Metric(event_name, metric_value)) |
| 189 | + self._event_processor.send_event(metric_event) |
| 190 | + except Exception as e: |
| 191 | + log.exception(str(e)) |
| 192 | + |
| 193 | + def track_metrics(self, user: dict, metrics: Mapping[str, float]): |
| 194 | + if not user or not metrics: |
| 195 | + log.warn('user/metrics invalid') |
| 196 | + return |
| 197 | + try: |
| 198 | + ffc_user = FFCUser.from_dict(user) |
| 199 | + metric_event = MetricEvent(ffc_user) |
| 200 | + for event_name, metric_value in metrics.items(): |
| 201 | + if event_name and metric_value > 0: |
| 202 | + metric_event.add(Metric(event_name, metric_value)) |
| 203 | + self._event_processor.send_event(metric_event) |
| 204 | + except Exception as e: |
| 205 | + log.exception(str(e)) |
0 commit comments