-
Notifications
You must be signed in to change notification settings - Fork 488
design: large SELECT result size #32211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
design: large SELECT result size #32211
Conversation
| happening in `environmentd`, but could happen on the cluster side. We think it | ||
| is easier to lift result-size limitations for queries that don't require | ||
| post-processing. We should be able to just "stream them through", without | ||
| materializing in `environmentd`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I think it wouldn't be super hard to support these cases using the Consolidator machinery we have now. Happy to chat about this if you're interested!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I was thinking this might be possible, glad to know you think the same!
| `environmentd`) stream those results from persist back to the client. | ||
|
|
||
| We would use a (configurable) threshold for determining whether to send a | ||
| result inline in the compute protocol or out of band via the blob store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like this could use the same machinery as @ParkMyCar worked up for large insert / update queries, but streaming the resulting batch into a client instead of linking it into a shard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the same! I started to build an abstraction for this in #31189, I added a new crate there called row-stash a response type called PeekResponseUnary::Batches, and finally the rows get returned to the user via ExecuteResponse::SendingRowsStreaming.
FWIW some things in that PR are a bit hacked together so the code might not be perfect 🙈
22afd68 to
e47b35c
Compare
d362ff8 to
a276bd0
Compare
antiguru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads fine, and I think would be a small incremental step towards efficient handling of large result sets. My main concern is around how we can match the current infrastructure with what might be needed for this change to work. At the moment, each cluster replica worker has a portion of the output data, and we send all partitions to envd to merge it. This step would still need to be performed to ensure consistent results. How well can persist read from many blobs and merge the outputs into one? As long as we're reading from arrangements, at least we know that the output is physically monotonic.
| Below, we will use _streamable queries_ for queries that don't require post | ||
| processing, and _non-streamable queries_ for those that do require it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always imply implicit post processing, i.e., we sort the data even if the user didn't specify any specific order. How would this impact your design?
Specifically, we might write down as many blobs as we have workers on the cluster replica. Each is sorted within itself. When sending the data to the client, we need to stream all blobs and merge them on-the-fly. Certainly possible, but at what resource utilization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan is to use the existing Consolidator from persist, which is used when consolidating runs/batches when persist does compaction of a persist shard.
It assumes that the batches/runs are sorted and then merges updates when reading, and it tries to do that within bounded memory. Roughly the required memory is O(num_runs). It's sort-merge machinery, and the good thing is that it already exists and there is already and will continue to be engineering efforts put into it to make persist compaction work well.
For our purposes here, each worker writes down a batch, and then on envd we use a Consolidator to merge-sort the batches and stream the results back out to the client. We get the implicit sort, and in the future we can also use it to apply an explicit ORDER BY.
| Purposefully high level, but the rough idea is that we a) need to stop sending | ||
| large amounts of data through the compute protocol, and b) need to stop | ||
| materializing large results in `environmentd` memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much of a problem is it to send large amounts of data through the compute protocol? an alternative design could be implemented entirely within envd. It could receive the data from replicas, and instead of materializing it in memory, it could write to files or blob storage, and stream the results to the client from there. Just pointing out the option!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say it is a problem, and not just for peeks but also for subscribes, which you found in https://github.com/MaterializeInc/database-issues/issues/9264.
An alternative would be to add a protocol between clusterd and envd for sending back these large responses, which doesn't block the "main" protocol. But I'd say the design proposed here is already kind of that, but it uses persist blob storage as the intermediary, and I think that's easier to implement compared to introducing a new protocol.
| We would use a (configurable) threshold for determining whether to send a | ||
| result inline in the compute protocol or out-of-band via the blob store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related: moving the task of writing data to envd would make it simple to determine whether a result is small or large.
a276bd0 to
d240488
Compare
d240488 to
0ed0895
Compare
bkirwi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me! Thanks for following up.
tyty! 🙇♂️ |
Rendered: https://github.com/aljoscha/materialize/blob/adapter-lift-max-result-size-limitation/doc/developer/design/20250415_large_select_result_size.md
Design doc for https://github.com/MaterializeInc/database-issues/issues/9180