@@ -11,13 +11,23 @@ namespace duckdb {
1111
1212PhysicalNestedLoopJoin::PhysicalNestedLoopJoin (LogicalOperator &op, PhysicalOperator &left, PhysicalOperator &right,
1313 vector<JoinCondition> cond, JoinType join_type,
14- idx_t estimated_cardinality)
14+ idx_t estimated_cardinality,
15+ unique_ptr<JoinFilterPushdownInfo> pushdown_info_p)
1516 : PhysicalComparisonJoin(op, PhysicalOperatorType::NESTED_LOOP_JOIN, std::move(cond), join_type,
1617 estimated_cardinality) {
18+
19+ filter_pushdown = std::move (pushdown_info_p);
20+
1721 children.push_back (left);
1822 children.push_back (right);
1923}
2024
25+ PhysicalNestedLoopJoin::PhysicalNestedLoopJoin (LogicalOperator &op, PhysicalOperator &left, PhysicalOperator &right,
26+ vector<JoinCondition> cond, JoinType join_type,
27+ idx_t estimated_cardinality)
28+ : PhysicalNestedLoopJoin(op, left, right, std::move(cond), join_type, estimated_cardinality, nullptr ) {
29+ }
30+
2131bool PhysicalJoin::HasNullValues (DataChunk &chunk) {
2232 for (idx_t col_idx = 0 ; col_idx < chunk.ColumnCount (); col_idx++) {
2333 UnifiedVectorFormat vdata;
@@ -130,30 +140,16 @@ bool PhysicalNestedLoopJoin::IsSupported(const vector<JoinCondition> &conditions
130140// ===--------------------------------------------------------------------===//
131141// Sink
132142// ===--------------------------------------------------------------------===//
133- class NestedLoopJoinLocalState : public LocalSinkState {
134- public:
135- explicit NestedLoopJoinLocalState (ClientContext &context, const vector<JoinCondition> &conditions)
136- : rhs_executor(context) {
137- vector<LogicalType> condition_types;
138- for (auto &cond : conditions) {
139- rhs_executor.AddExpression (*cond.right );
140- condition_types.push_back (cond.right ->return_type );
141- }
142- right_condition.Initialize (Allocator::Get (context), condition_types);
143- }
144-
145- // ! The chunk holding the right condition
146- DataChunk right_condition;
147- // ! The executor of the RHS condition
148- ExpressionExecutor rhs_executor;
149- };
150-
151143class NestedLoopJoinGlobalState : public GlobalSinkState {
152144public:
153145 explicit NestedLoopJoinGlobalState (ClientContext &context, const PhysicalNestedLoopJoin &op)
154146 : right_payload_data(context, op.children[1 ].get().GetTypes()),
155147 right_condition_data(context, op.GetJoinTypes()), has_null(false ),
156148 right_outer(PropagatesBuildSide(op.join_type)) {
149+ if (op.filter_pushdown ) {
150+ skip_filter_pushdown = op.filter_pushdown ->probe_info .empty ();
151+ global_filter_state = op.filter_pushdown ->GetGlobalState (context, op);
152+ }
157153 }
158154
159155 mutex nj_lock;
@@ -165,6 +161,35 @@ class NestedLoopJoinGlobalState : public GlobalSinkState {
165161 atomic<bool > has_null;
166162 // ! A bool indicating for each tuple in the RHS if they found a match (only used in FULL OUTER JOIN)
167163 OuterJoinMarker right_outer;
164+ // ! Should we not bother pushing down filters?
165+ bool skip_filter_pushdown = false ;
166+ // ! The global filter states to push down (if any)
167+ unique_ptr<JoinFilterGlobalState> global_filter_state;
168+ };
169+
170+ class NestedLoopJoinLocalState : public LocalSinkState {
171+ public:
172+ explicit NestedLoopJoinLocalState (ClientContext &context, const PhysicalNestedLoopJoin &op,
173+ NestedLoopJoinGlobalState &gstate)
174+ : rhs_executor(context) {
175+ vector<LogicalType> condition_types;
176+ for (auto &cond : op.conditions ) {
177+ rhs_executor.AddExpression (*cond.right );
178+ condition_types.push_back (cond.right ->return_type );
179+ }
180+ right_condition.Initialize (Allocator::Get (context), condition_types);
181+
182+ if (op.filter_pushdown ) {
183+ local_filter_state = op.filter_pushdown ->GetLocalState (*gstate.global_filter_state );
184+ }
185+ }
186+
187+ // ! The chunk holding the right condition
188+ DataChunk right_condition;
189+ // ! The executor of the RHS condition
190+ ExpressionExecutor rhs_executor;
191+ // ! Local state for accumulating filter statistics
192+ unique_ptr<JoinFilterLocalState> local_filter_state;
168193};
169194
170195vector<LogicalType> PhysicalNestedLoopJoin::GetJoinTypes () const {
@@ -178,40 +203,54 @@ vector<LogicalType> PhysicalNestedLoopJoin::GetJoinTypes() const {
178203SinkResultType PhysicalNestedLoopJoin::Sink (ExecutionContext &context, DataChunk &chunk,
179204 OperatorSinkInput &input) const {
180205 auto &gstate = input.global_state .Cast <NestedLoopJoinGlobalState>();
181- auto &nlj_state = input.local_state .Cast <NestedLoopJoinLocalState>();
206+ auto &lstate = input.local_state .Cast <NestedLoopJoinLocalState>();
182207
183208 // resolve the join expression of the right side
184- nlj_state.right_condition .Reset ();
185- nlj_state.rhs_executor .Execute (chunk, nlj_state.right_condition );
209+ lstate.right_condition .Reset ();
210+ lstate.rhs_executor .Execute (chunk, lstate.right_condition );
211+
212+ if (filter_pushdown && !gstate.skip_filter_pushdown ) {
213+ filter_pushdown->Sink (lstate.right_condition , *lstate.local_filter_state );
214+ }
186215
187216 // if we have not seen any NULL values yet, and we are performing a MARK join, check if there are NULL values in
188217 // this chunk
189218 if (join_type == JoinType::MARK && !gstate.has_null ) {
190- if (HasNullValues (nlj_state .right_condition )) {
219+ if (HasNullValues (lstate .right_condition )) {
191220 gstate.has_null = true ;
192221 }
193222 }
194223
195224 // append the payload data and the conditions
196225 lock_guard<mutex> nj_guard (gstate.nj_lock );
197226 gstate.right_payload_data .Append (chunk);
198- gstate.right_condition_data .Append (nlj_state .right_condition );
227+ gstate.right_condition_data .Append (lstate .right_condition );
199228 return SinkResultType::NEED_MORE_INPUT;
200229}
201230
202231SinkCombineResultType PhysicalNestedLoopJoin::Combine (ExecutionContext &context,
203232 OperatorSinkCombineInput &input) const {
233+ auto &gstate = input.global_state .Cast <NestedLoopJoinGlobalState>();
234+ auto &lstate = input.local_state .Cast <NestedLoopJoinLocalState>();
235+
204236 auto &client_profiler = QueryProfiler::Get (context.client );
205237 context.thread .profiler .Flush (*this );
206238 client_profiler.Flush (context.thread .profiler );
239+ if (filter_pushdown && !gstate.skip_filter_pushdown ) {
240+ filter_pushdown->Combine (*gstate.global_filter_state , *lstate.local_filter_state );
241+ }
207242 return SinkCombineResultType::FINISHED;
208243}
209244
210245SinkFinalizeType PhysicalNestedLoopJoin::Finalize (Pipeline &pipeline, Event &event, ClientContext &context,
211246 OperatorSinkFinalizeInput &input) const {
212- auto &gstate = input.global_state .Cast <NestedLoopJoinGlobalState>();
213- gstate.right_outer .Initialize (gstate.right_payload_data .Count ());
214- if (gstate.right_payload_data .Count () == 0 && EmptyResultIfRHSIsEmpty ()) {
247+ auto &gsink = input.global_state .Cast <NestedLoopJoinGlobalState>();
248+ if (filter_pushdown && !gsink.skip_filter_pushdown ) {
249+ (void )filter_pushdown->Finalize (context, nullptr , *gsink.global_filter_state , *this );
250+ }
251+
252+ gsink.right_outer .Initialize (gsink.right_payload_data .Count ());
253+ if (gsink.right_payload_data .Count () == 0 && EmptyResultIfRHSIsEmpty ()) {
215254 return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
216255 }
217256 return SinkFinalizeType::READY;
@@ -222,7 +261,8 @@ unique_ptr<GlobalSinkState> PhysicalNestedLoopJoin::GetGlobalSinkState(ClientCon
222261}
223262
224263unique_ptr<LocalSinkState> PhysicalNestedLoopJoin::GetLocalSinkState (ExecutionContext &context) const {
225- return make_uniq<NestedLoopJoinLocalState>(context.client , conditions);
264+ auto &gstate = sink_state->Cast <NestedLoopJoinGlobalState>();
265+ return make_uniq<NestedLoopJoinLocalState>(context.client , *this , gstate);
226266}
227267
228268// ===--------------------------------------------------------------------===//
0 commit comments