@@ -259,6 +259,11 @@ class ConditionDirective(str, Enum):
259259 ConditionDirective represents the possible directives that can be returned from a condition check.
260260 """
261261
262+ REMOVE = "remove"
263+ """
264+ REMOVE suggests that the current data source should be permanently removed from consideration.
265+ """
266+
262267 FALLBACK = "fallback"
263268 """
264269 FALLBACK suggests that this data source should be abandoned in favor of the next one.
@@ -269,9 +274,9 @@ class ConditionDirective(str, Enum):
269274 RECOVER suggests that we should try to return to the primary data source.
270275 """
271276
272- CONTINUE = "continue "
277+ FDV1 = "fdv1 "
273278 """
274- CONTINUE suggests that no action is needed and the current data source should keep running .
279+ FDV1 suggests that we should immediately revert to the FDv1 fallback synchronizer .
275280 """
276281
277282
@@ -459,27 +464,17 @@ def synchronizer_loop(self: 'FDv2'):
459464 try :
460465 with self ._lock .write ():
461466 synchronizer : Synchronizer = synchronizers_list [current_index ].build (self ._config )
467+ self ._active_synchronizer = synchronizer
462468 if isinstance (synchronizer , DiagnosticSource ) and self ._diagnostic_accumulator is not None :
463469 synchronizer .set_diagnostic_accumulator (self ._diagnostic_accumulator )
464- self ._active_synchronizer = synchronizer
465470
466471 log .info ("Synchronizer %s (index %d) is starting" , synchronizer .name , current_index )
467472
468- # Determine which condition to check based on current position
469- def combined_condition (status : DataSourceStatus ) -> ConditionDirective :
470- # Recovery condition: only applies when not at first synchronizer
471- if current_index > 0 and self ._recovery_condition (status ):
472- return ConditionDirective .RECOVER
473- # Fallback condition: applies at any position
474- if self ._fallback_condition (status ):
475- return ConditionDirective .FALLBACK
476- return ConditionDirective .CONTINUE
477-
478- remove_sync , fallback_v1 , directive = self ._consume_synchronizer_results (
479- synchronizer , set_on_ready , combined_condition
473+ directive = self ._consume_synchronizer_results (
474+ synchronizer , set_on_ready , current_index != 0
480475 )
481476
482- if fallback_v1 :
477+ if directive == ConditionDirective . FDV1 :
483478 # Abandon all synchronizers and use only fdv1 fallback
484479 log .info ("Reverting to FDv1 fallback synchronizer" )
485480 if self ._fdv1_fallback_synchronizer_builder is not None :
@@ -494,8 +489,7 @@ def combined_condition(status: DataSourceStatus) -> ConditionDirective:
494489 )
495490 break
496491 continue
497-
498- if remove_sync :
492+ elif directive == ConditionDirective .REMOVE :
499493 # Permanent failure - remove synchronizer from list
500494 log .warning ("Synchronizer %s permanently failed, removing from list" , synchronizer .name )
501495 del synchronizers_list [current_index ]
@@ -515,9 +509,8 @@ def combined_condition(status: DataSourceStatus) -> ConditionDirective:
515509 # Note: If we deleted a middle element, current_index now points to
516510 # what was the next element (shifted down), which is correct
517511 continue
518-
519512 # Condition was met - determine next synchronizer based on directive
520- if directive == ConditionDirective .RECOVER :
513+ elif directive == ConditionDirective .RECOVER :
521514 log .info ("Recovery condition met, returning to first synchronizer" )
522515 current_index = 0
523516 elif directive == ConditionDirective .FALLBACK :
@@ -552,8 +545,8 @@ def _consume_synchronizer_results(
552545 self ,
553546 synchronizer : Synchronizer ,
554547 set_on_ready : Event ,
555- condition_func : Callable [[ DataSourceStatus ], ConditionDirective ]
556- ) -> tuple [ bool , bool , ConditionDirective ] :
548+ check_recovery : bool ,
549+ ) -> ConditionDirective :
557550 """
558551 Consume results from a synchronizer until a condition is met or it fails.
559552
@@ -594,14 +587,15 @@ def reader(self: 'FDv2'):
594587 if update == "check" :
595588 # Check condition periodically
596589 current_status = self ._data_source_status_provider .status
597- directive = condition_func (current_status )
598- if directive != ConditionDirective .CONTINUE :
599- return False , False , directive
590+ if check_recovery and self ._recovery_condition (current_status ):
591+ return ConditionDirective .RECOVER
592+ if self ._fallback_condition (current_status ):
593+ return ConditionDirective .FALLBACK
600594 continue
601595
602596 log .info ("Synchronizer %s update: %s" , synchronizer .name , update .state )
603597 if self ._stop_event .is_set ():
604- return False , False , ConditionDirective .CONTINUE
598+ return ConditionDirective .FALLBACK
605599
606600 # Handle the update
607601 if update .change_set is not None :
@@ -616,14 +610,14 @@ def reader(self: 'FDv2'):
616610
617611 # Check if we should revert to FDv1 immediately
618612 if update .revert_to_fdv1 :
619- return True , True , ConditionDirective .FALLBACK
613+ return ConditionDirective .FDV1
620614
621615 # Check for OFF state indicating permanent failure
622616 if update .state == DataSourceState .OFF :
623- return True , False , ConditionDirective .FALLBACK
617+ return ConditionDirective .REMOVE
624618 except Exception as e :
625619 log .error ("Error consuming synchronizer results: %s" , e )
626- return True , False , ConditionDirective .FALLBACK
620+ return ConditionDirective .REMOVE
627621 finally :
628622 synchronizer .stop ()
629623 timer .stop ()
@@ -633,7 +627,7 @@ def reader(self: 'FDv2'):
633627 # If we reach here, the synchronizer's iterator completed normally (no more updates)
634628 # For continuous synchronizers (streaming/polling), this is unexpected and indicates
635629 # the synchronizer can't provide more updates, so we should remove it and fall back
636- return True , False , ConditionDirective .FALLBACK
630+ return ConditionDirective .REMOVE
637631
638632 def _fallback_condition (self , status : DataSourceStatus ) -> bool :
639633 """
0 commit comments