@@ -17,11 +17,12 @@ PhysicalStreamingWindow::PhysicalStreamingWindow(PhysicalPlan &physical_plan, ve
1717
1818class StreamingWindowGlobalState : public GlobalOperatorState {
1919public:
20- StreamingWindowGlobalState () : row_number(1 ) {
21- }
20+ explicit StreamingWindowGlobalState (ClientContext &client);
2221
2322 // ! The next row number.
2423 std::atomic<int64_t > row_number;
24+ // ! The single local state
25+ unique_ptr<OperatorState> local_state;
2526};
2627
2728class StreamingWindowState : public OperatorState {
@@ -340,6 +341,10 @@ class StreamingWindowState : public OperatorState {
340341 DataChunk shifted;
341342};
342343
344+ StreamingWindowGlobalState::StreamingWindowGlobalState (ClientContext &client) : row_number(1 ) {
345+ local_state = make_uniq<StreamingWindowState>(client);
346+ }
347+
343348bool PhysicalStreamingWindow::IsStreamingFunction (ClientContext &context, unique_ptr<Expression> &expr) {
344349 auto &wexpr = expr->Cast <BoundWindowExpression>();
345350 if (!wexpr.partitions .empty () || !wexpr.orders .empty () || wexpr.ignore_nulls || !wexpr.arg_orders .empty () ||
@@ -374,12 +379,8 @@ bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique
374379 }
375380}
376381
377- unique_ptr<GlobalOperatorState> PhysicalStreamingWindow::GetGlobalOperatorState (ClientContext &context) const {
378- return make_uniq<StreamingWindowGlobalState>();
379- }
380-
381- unique_ptr<OperatorState> PhysicalStreamingWindow::GetOperatorState (ExecutionContext &context) const {
382- return make_uniq<StreamingWindowState>(context.client );
382+ unique_ptr<GlobalOperatorState> PhysicalStreamingWindow::GetGlobalOperatorState (ClientContext &client) const {
383+ return make_uniq<StreamingWindowGlobalState>(client);
383384}
384385
385386void StreamingWindowState::AggregateState::Execute (ExecutionContext &context, DataChunk &input, Vector &result) {
@@ -486,9 +487,9 @@ void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, Da
486487}
487488
488489void PhysicalStreamingWindow::ExecuteFunctions (ExecutionContext &context, DataChunk &output, DataChunk &delayed,
489- GlobalOperatorState &gstate_p, OperatorState &state_p ) const {
490+ GlobalOperatorState &gstate_p) const {
490491 auto &gstate = gstate_p.Cast <StreamingWindowGlobalState>();
491- auto &state = state_p. Cast <StreamingWindowState>();
492+ auto &state = gstate. local_state -> Cast <StreamingWindowState>();
492493
493494 // Compute window functions
494495 const idx_t count = output.size ();
@@ -530,9 +531,9 @@ void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataCh
530531}
531532
532533void PhysicalStreamingWindow::ExecuteInput (ExecutionContext &context, DataChunk &delayed, DataChunk &input,
533- DataChunk &output, GlobalOperatorState &gstate_p,
534- OperatorState &state_p) const {
535- auto &state = state_p. Cast <StreamingWindowState>();
534+ DataChunk &output, GlobalOperatorState &gstate_p) const {
535+ auto &gstate = gstate_p. Cast <StreamingWindowGlobalState>();
536+ auto &state = gstate. local_state -> Cast <StreamingWindowState>();
536537
537538 // Put payload columns in place
538539 for (idx_t col_idx = 0 ; col_idx < input.data .size (); col_idx++) {
@@ -548,13 +549,13 @@ void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk
548549 }
549550 output.SetCardinality (count);
550551
551- ExecuteFunctions (context, output, state.delayed , gstate_p, state_p );
552+ ExecuteFunctions (context, output, state.delayed , gstate_p);
552553}
553554
554555void PhysicalStreamingWindow::ExecuteShifted (ExecutionContext &context, DataChunk &delayed, DataChunk &input,
555- DataChunk &output, GlobalOperatorState &gstate_p,
556- OperatorState &state_p) const {
557- auto &state = state_p. Cast <StreamingWindowState>();
556+ DataChunk &output, GlobalOperatorState &gstate_p) const {
557+ auto &gstate = gstate_p. Cast <StreamingWindowGlobalState>();
558+ auto &state = gstate. local_state -> Cast <StreamingWindowState>();
558559 auto &shifted = state.shifted ;
559560
560561 idx_t out = output.size ();
@@ -576,25 +577,25 @@ void PhysicalStreamingWindow::ExecuteShifted(ExecutionContext &context, DataChun
576577 }
577578 delayed.SetCardinality (delay - out + in);
578579
579- ExecuteFunctions (context, output, delayed, gstate_p, state_p );
580+ ExecuteFunctions (context, output, delayed, gstate_p);
580581}
581582
582583void PhysicalStreamingWindow::ExecuteDelayed (ExecutionContext &context, DataChunk &delayed, DataChunk &input,
583- DataChunk &output, GlobalOperatorState &gstate_p,
584- OperatorState &state_p) const {
584+ DataChunk &output, GlobalOperatorState &gstate_p) const {
585585 // Put payload columns in place
586586 for (idx_t col_idx = 0 ; col_idx < delayed.data .size (); col_idx++) {
587587 output.data [col_idx].Reference (delayed.data [col_idx]);
588588 }
589589 idx_t count = delayed.size ();
590590 output.SetCardinality (count);
591591
592- ExecuteFunctions (context, output, input, gstate_p, state_p );
592+ ExecuteFunctions (context, output, input, gstate_p);
593593}
594594
595595OperatorResultType PhysicalStreamingWindow::Execute (ExecutionContext &context, DataChunk &input, DataChunk &output,
596- GlobalOperatorState &gstate_p, OperatorState &state_p) const {
597- auto &state = state_p.Cast <StreamingWindowState>();
596+ GlobalOperatorState &gstate_p, OperatorState &) const {
597+ auto &gstate = gstate_p.Cast <StreamingWindowGlobalState>();
598+ auto &state = gstate.local_state ->Cast <StreamingWindowState>();
598599 if (!state.initialized ) {
599600 state.Initialize (context.client , input, select_list);
600601 }
@@ -615,27 +616,27 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D
615616 // If we can't consume all of the delayed values,
616617 // we need to split them instead of referencing them all
617618 output.SetCardinality (input.size ());
618- ExecuteShifted (context, delayed, input, output, gstate_p, state_p );
619+ ExecuteShifted (context, delayed, input, output, gstate_p);
619620 // We delayed the unused input so ask for more
620621 return OperatorResultType::NEED_MORE_INPUT;
621622 } else if (delayed.size ()) {
622623 // We have enough delayed rows so flush them
623- ExecuteDelayed (context, delayed, input, output, gstate_p, state_p );
624+ ExecuteDelayed (context, delayed, input, output, gstate_p);
624625 // Defer resetting delayed as it may be referenced.
625626 delayed.SetCardinality (0 );
626627 // Come back to process the input
627628 return OperatorResultType::HAVE_MORE_OUTPUT;
628629 } else {
629630 // No delayed rows, so emit what we can and delay the rest.
630- ExecuteInput (context, delayed, input, output, gstate_p, state_p );
631+ ExecuteInput (context, delayed, input, output, gstate_p);
631632 return OperatorResultType::NEED_MORE_INPUT;
632633 }
633634}
634635
635636OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute (ExecutionContext &context, DataChunk &output,
636- GlobalOperatorState &gstate_p,
637- OperatorState &state_p) const {
638- auto &state = state_p. Cast <StreamingWindowState>();
637+ GlobalOperatorState &gstate_p, OperatorState &) const {
638+ auto &gstate = gstate_p. Cast <StreamingWindowGlobalState>();
639+ auto &state = gstate. local_state -> Cast <StreamingWindowState>();
639640
640641 if (state.initialized && state.lead_count ) {
641642 auto &delayed = state.delayed ;
@@ -646,10 +647,10 @@ OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContex
646647 if (output.GetCapacity () < delayed.size ()) {
647648 // More than one output buffer was delayed, so shift in what we can
648649 output.SetCardinality (output.GetCapacity ());
649- ExecuteShifted (context, delayed, input, output, gstate_p, state_p );
650+ ExecuteShifted (context, delayed, input, output, gstate_p);
650651 return OperatorFinalizeResultType::HAVE_MORE_OUTPUT;
651652 }
652- ExecuteDelayed (context, delayed, input, output, gstate_p, state_p );
653+ ExecuteDelayed (context, delayed, input, output, gstate_p);
653654 }
654655
655656 return OperatorFinalizeResultType::FINISHED;
0 commit comments