Skip to content

Conversation

@aljoscha
Copy link
Contributor

@aljoscha aljoscha commented Apr 15, 2025

happening in `environmentd`, but could happen on the cluster side. We think it
is easier to lift result-size limitations for queries that don't require
post-processing. We should be able to just "stream them through", without
materializing in `environmentd`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think it wouldn't be super hard to support these cases using the Consolidator machinery we have now. Happy to chat about this if you're interested!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was thinking this might be possible, glad to know you think the same!

`environmentd`) stream those results from persist back to the client.

We would use a (configurable) threshold for determining whether to send a
result inline in the compute protocol or out of band via the blob store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like this could use the same machinery as @ParkMyCar worked up for large insert / update queries, but streaming the resulting batch into a client instead of linking it into a shard?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same! I started to build an abstraction for this in #31189, I added a new crate there called row-stash a response type called PeekResponseUnary::Batches, and finally the rows get returned to the user via ExecuteResponse::SendingRowsStreaming.

FWIW some things in that PR are a bit hacked together so the code might not be perfect 🙈

@aljoscha aljoscha force-pushed the adapter-lift-max-result-size-limitation branch from 22afd68 to e47b35c Compare April 16, 2025 09:46
@aljoscha aljoscha force-pushed the adapter-lift-max-result-size-limitation branch from d362ff8 to a276bd0 Compare April 23, 2025 18:12
@aljoscha aljoscha changed the title design: lift max result size limitation design: large SELECT result size Apr 23, 2025
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads fine, and I think would be a small incremental step towards efficient handling of large result sets. My main concern is around how we can match the current infrastructure with what might be needed for this change to work. At the moment, each cluster replica worker has a portion of the output data, and we send all partitions to envd to merge it. This step would still need to be performed to ensure consistent results. How well can persist read from many blobs and merge the outputs into one? As long as we're reading from arrangements, at least we know that the output is physically monotonic.

Comment on lines 28 to 29
Below, we will use _streamable queries_ for queries that don't require post
processing, and _non-streamable queries_ for those that do require it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always imply implicit post processing, i.e., we sort the data even if the user didn't specify any specific order. How would this impact your design?

Specifically, we might write down as many blobs as we have workers on the cluster replica. Each is sorted within itself. When sending the data to the client, we need to stream all blobs and merge them on-the-fly. Certainly possible, but at what resource utilization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to use the existing Consolidator from persist, which is used when consolidating runs/batches when persist does compaction of a persist shard.

It assumes that the batches/runs are sorted and then merges updates when reading, and it tries to do that within bounded memory. Roughly the required memory is O(num_runs). It's sort-merge machinery, and the good thing is that it already exists and there is already and will continue to be engineering efforts put into it to make persist compaction work well.

For our purposes here, each worker writes down a batch, and then on envd we use a Consolidator to merge-sort the batches and stream the results back out to the client. We get the implicit sort, and in the future we can also use it to apply an explicit ORDER BY.

Comment on lines 92 to 94
Purposefully high level, but the rough idea is that we a) need to stop sending
large amounts of data through the compute protocol, and b) need to stop
materializing large results in `environmentd` memory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much of a problem is it to send large amounts of data through the compute protocol? an alternative design could be implemented entirely within envd. It could receive the data from replicas, and instead of materializing it in memory, it could write to files or blob storage, and stream the results to the client from there. Just pointing out the option!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it is a problem, and not just for peeks but also for subscribes, which you found in https://github.com/MaterializeInc/database-issues/issues/9264.

An alternative would be to add a protocol between clusterd and envd for sending back these large responses, which doesn't block the "main" protocol. But I'd say the design proposed here is already kind of that, but it uses persist blob storage as the intermediary, and I think that's easier to implement compared to introducing a new protocol.

Comment on lines 107 to 108
We would use a (configurable) threshold for determining whether to send a
result inline in the compute protocol or out-of-band via the blob store.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: moving the task of writing data to envd would make it simple to determine whether a result is small or large.

@aljoscha aljoscha force-pushed the adapter-lift-max-result-size-limitation branch from a276bd0 to d240488 Compare June 4, 2025 17:32
@aljoscha aljoscha force-pushed the adapter-lift-max-result-size-limitation branch from d240488 to 0ed0895 Compare July 2, 2025 13:45
Copy link
Contributor

@bkirwi bkirwi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me! Thanks for following up.

@aljoscha
Copy link
Contributor Author

aljoscha commented Jul 3, 2025

Works for me! Thanks for following up.

tyty! 🙇‍♂️

@aljoscha aljoscha merged commit 35688f7 into MaterializeInc:main Jul 3, 2025
9 checks passed
@aljoscha aljoscha deleted the adapter-lift-max-result-size-limitation branch July 3, 2025 08:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants