Add lambda substrait support#21193
Conversation
|
👋 Hello from @substrait-io. Great to see the core lambda PR has gotten through! Once this PR is in a ready to review state and is rebased off of main, I will be more than happy to help review it 🙂 |
|
Thanks @benbellick, I will open this tonight. Besides rebasing, I believe it misses some tests (I tested with sqllogictests only) |
|
I updated your branch with main since the fix for detect breaking changes was resolved now, sorry for the trouble |
benbellick
left a comment
There was a problem hiding this comment.
It overall looks good to me! I left a few comments on some stylistic things but the only thing that I would particularly like to see is just the tests for consumer / producer independently. Thanks!
| /// Default implementation of lambda related methods of the [SubstraitConsumer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitConsumer] to implement them | ||
| pub struct DefaultSubstraitLambdaConsumer { |
There was a problem hiding this comment.
Is there a reason this is public? This feels like an implementation detail of the default lambda-handling logic. What about:
| pub struct DefaultSubstraitLambdaConsumer { | |
| struct LambdaConsumerState { |
There was a problem hiding this comment.
The existing required methods for trait SubstraitConsumer are trivial to implement, but that's not the case for the newly added lambda methods. This is a just a convenience to custom implementations which don't want to customize the default lambda handling, should I remove it?
struct CustomSubstraitConsumer {
extensions: Arc<Extensions>,
state: Arc<SessionState>,
// You can reuse existing consumer code related to lambdas
lambda_consumer: DefaultSubstraitLambdaConsumer,
}
#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
}
fn get_extensions(&self) -> &Extensions {
self.extensions.as_ref()
}
fn get_function_registry(&self) -> &impl FunctionRegistry {
self.state.as_ref()
}
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)> {
let (names, lambda_consumer) = self.lambda_consumer.with_lambda_parameters(
self,
lambda_parameters,
input_schema,
)?;
Ok((
names,
Self {
extensions: self.extensions.clone(),
state: self.state.clone(),
lambda_consumer,
},
))
}
fn lambda_variable(
&self,
steps_out: usize,
field_idx: usize,
) -> datafusion::common::Result<Expr> {
self.lambda_consumer.lambda_variable(steps_out, field_idx)
}
}There was a problem hiding this comment.
Ah I see now, I appreciate the explanation. That makes a lot of sense then!
AFAICT the strategy for outer schemas is to provide default impls so that consumer implementors who don't care about the behavior can ignore their existence, but then will encounter a runtime error if they are used:
I'm wondering if we should do the same thing for lambdas? Rather than enforce that implementors must implement the lambda-handling fns, they could instead optionally ignore them, resulting in an error if lambda expressions are encountered.
What do you think? I am not so particular here TBH. Ultimately my goal is to validate that the translation itself is correct, and I am happy to leave API concerns to the project maintainers :)
There was a problem hiding this comment.
Ah, I agree, they definetively should have a default impl returning an error to avoid a breaking change, thank you 2c23439
But even then, I still believe a public DefaultSubstraitLambdaConsumer is convenient, but yes, let's leave that for that maintainers, thanks
| /// Default implementation of lambda related methods of the [SubstraitProducer] trait | ||
| /// | ||
| /// Can be embedded into a custom [SubstraitProducer] to implement them | ||
| pub struct DefaultSubstraitLambdaProducer { |
There was a problem hiding this comment.
Same comment as on the consumer side. I wonder if we can just keep this private, since its usage in implementing this producer is an implementation detail.
There was a problem hiding this comment.
Same reason for the consumer. What get's decided there (#21193 (comment)) I'll also apply here
struct CustomSubstraitProducer {
extensions: Extensions,
state: Arc<SessionState>,
// You can reuse existing producer code related to lambdas
lambda_producer: DefaultSubstraitLambdaProducer,
}
impl SubstraitProducer for CustomSubstraitProducer {
fn register_function(&mut self, signature: String) -> u32 {
self.extensions.register_function(&signature)
}
fn register_type(&mut self, type_name: String) -> u32 {
self.extensions.register_type(&type_name)
}
fn get_extensions(self) -> Extensions {
self.extensions
}
fn push_lambda_parameters(
&mut self,
lambda_parameters: Vec<FieldRef>,
) -> datafusion::common::Result<()> {
let lambda_parameters_map = lambda_parameters_map(self, lambda_parameters)?;
self.lambda_producer
.push_lambda_parameters(lambda_parameters_map);
Ok(())
}
fn pop_lambda_parameters(&mut self) -> datafusion::common::Result<()> {
self.lambda_producer.pop_lambda_parameters()
}
fn lambda_variable(&self, name: &str) -> datafusion::common::Result<(u32, i32)> {
self.lambda_producer.lambda_variable(name)
}
fn lambda_parameter_type(
&self,
name: &str,
) -> datafusion::common::Result<substrait::proto::Type> {
self.lambda_producer.lambda_parameter_type(name)
}
}| /// Returns a new instance of this consumer which includes the given `lambda_parameters` and the names they got assigned | ||
| /// | ||
| /// Note for custom implementations it's possible to embed a [DefaultSubstraitLambdaConsumer] and forward this method to it | ||
| fn with_lambda_parameters( | ||
| &self, | ||
| lambda_parameters: &[Type], | ||
| input_schema: &DFSchema, | ||
| ) -> datafusion::common::Result<(Vec<String>, Self)>; |
There was a problem hiding this comment.
@benbellick I usually follows the pattern of existing methods, like push/pop_outer_schema from #20439. This is my first time dealing with substrait, so I may be wrong, but I didn't followed this pattern (using push/pop_lambda_parameter) because it modifies &self via a RwLock and I'm note sure this couldn't lead to conflicts if the same consumer is used to consume two different plans at the same time in different threads. If that's not the case I can change this to use push/pop_lambda_parameter as well.
There was a problem hiding this comment.
Ah, that is an interesting point... Is it expected/supported for the same SubstraitConsumer instance to be used concurrently?
If not, then I think it would be simpler and consistent to model lambda scope the same way as outer schemas.
If yes, then the scoped-consumer approach here makes sense, but it seems like the existing push_outer_schema / pop_outer_schema stack may have the same interleaving issue and should probably be addressed separately.
There was a problem hiding this comment.
I'll confirm with the maintainer who ends up reviewing this, but SubstraitConsumer is both Send + Sync and all it's methods take &self. Since the default consumer is cheap to create, and I expect most/all custom ones to be cheap as well, I guess it's mostly due to async and to easily embed it into other structures which should also implement Send + Sync than to allow efficient concurrent usage.
I won't expect any consumer to be used concurrently for performance, but, since it can be used, I think it's possible that it's/will be used concurrently incidentally as the easier/natural way within a given codebase
benbellick
left a comment
There was a problem hiding this comment.
Few more comments but on the whole this looks good to me! Thanks
Co-authored-by: Ben Bellick <36523439+benbellick@users.noreply.github.com>
benbellick
left a comment
There was a problem hiding this comment.
LGTM, great work! Excited to get this in 🚀
Feel free to summon the maintainers now.
Thanks for the review @benbellick |
|
I'm unfamiliar with substrait except for the idea, so I don't know if I can have meaningful review |
|
@gstvg according to the breaking change detector there are no breaking change api wise in this pr, is your commemt still correct?
|
|
@rluvaton, indeed, after applying @benbellick suggestions there's no breaking changes anymore, description updated, thanks |
Which issue does this PR close?
Part of #21172
Rationale for this change
Substrait support wasn't implemented in the core lambda support to reduce PR size
What changes are included in this PR?
Substrait consuming and producing of higher-order functions, lambdas and lambda variables
Are these changes tested?
Unit tests added to
datafusion/substrait/tests/cases/roundtrip_logical_plan.rsAre there any user-facing changes?
None