diff --git a/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiter.java b/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiter.java new file mode 100644 index 0000000000..31acd39291 --- /dev/null +++ b/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import java.sql.ResultSet; +import java.util.Optional; + +/** + * Limits the size of single {@link JdbcMeta.Frame} during its construction. + */ +interface FrameLimiter { + + /** + * Create a new {@link Context} to start limiting the size of a single {@link JdbcMeta.Frame} + * + * @param resultSet the {@link ResultSet} being used to construct the frame + * @return the context, configured according to the specification of this FrameLimiter instance + */ + Context start(ResultSet resultSet); + + /** + * Return the optional row count limit for this FrameLimiter. + */ + default Optional getRowCountLimit() { + return Optional.empty(); + } + + /** + * A context configured by its creating {@link FrameLimiter}, limits a frame within a + * single fetch operation. + */ + interface Context { + + /** + * Determine if the limit configured in the FrameLimiter has been reached in the + * current Frame creation operation. + * + * @return true if the limit has been reached, otherwise false + */ + boolean limitReached(); + + /** + * Add a single row that will be included in the Frame + * + * @param row the row being added + */ + void addRow(Object[] row); + } +} +// End FrameLimiter.java diff --git a/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiters.java b/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiters.java new file mode 100644 index 0000000000..65b55c3e1f --- /dev/null +++ b/server/src/main/java/org/apache/calcite/avatica/jdbc/FrameLimiters.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import org.apache.calcite.avatica.AvaticaUtils; + +import java.sql.ResultSet; +import java.time.Clock; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Collection of factory methods for creation of {@link FrameLimiter} instances. + */ +class FrameLimiters { + + private FrameLimiters() {} + + /** + * Returns an unlimited {@link FrameLimiter}, which will always provide the + * full contents of a ResultSet + */ + static FrameLimiter unlimited() { + return resultSet -> new FrameLimiter.Context() { + @Override public boolean limitReached() { + return false; + } + + @Override public void addRow(Object[] row) { + AvaticaUtils.discard(row); + } + }; + } + + /** + * Returns a FrameLimiter which limits the frame size to a given number of rows. + * + * @param maxRowCount maximum number of rows to be included in the frame + */ + static FrameLimiter rowCountLimited(int maxRowCount) { + return new FrameLimiter() { + + @Override public Optional getRowCountLimit() { + return Optional.of(maxRowCount); + } + + @Override public Context start(ResultSet resultSet) { + return new FrameLimiter.Context() { + + int rowCount = 0; + + @Override public boolean limitReached() { + return rowCount >= maxRowCount; + } + + @Override public void addRow(Object[] row) { + rowCount++; + } + }; + } + }; + } + + /** + * Returns a FrameLimiter which will stop frame creation after a given amount of time. + * + * @param maxFrameMillis maximum number of milliseconds to be used while constructing the frame + */ + static FrameLimiter timeLimited(long maxFrameMillis) { + return timeLimited(maxFrameMillis, Clock.systemUTC()); + } + + /** + * Same as {@link #timeLimited(long)}, but allows specifying a custom Clock (for testing) + * + * @param maxFrameMillis maximum number of milliseconds to be used while constructing the frame + * @param clock clock to be used for measuring frame construction duration + */ + static FrameLimiter timeLimited(long maxFrameMillis, Clock clock) { + return resultSet -> new FrameLimiter.Context() { + + long timeLimitMillis = clock.millis() + maxFrameMillis; + + @Override public boolean limitReached() { + return clock.millis() > timeLimitMillis; + } + + @Override public void addRow(Object[] row) { + AvaticaUtils.discard(row); + } + }; + } + + /** + * Returns a FrameLimiter that is the combination of a sequency of underlying FrameLimiters. + * + * The returned FrameLimiter will return true for + * {@link FrameLimiter.Context#limitReached()} once any one of the underlying FrameLimiters limits + * have been reached. + * + * @param frameLimiters underlying limiters upon which the combined limiter is to be created. + */ + static FrameLimiter combined(FrameLimiter...frameLimiters) { + if (frameLimiters.length == 0) { + throw new IllegalArgumentException("No frame limiters supplied"); + } else if (frameLimiters.length == 1) { + return frameLimiters[0]; + } else { + List frameLimiterList = Arrays.asList(frameLimiters); + return new FrameLimiter() { + @Override public Optional getRowCountLimit() { + return frameLimiterList.stream() + .filter(limiter -> limiter.getRowCountLimit().isPresent()) + .findFirst().flatMap(FrameLimiter::getRowCountLimit); + } + + @Override public Context start(ResultSet resultSet) { + + List contexts = frameLimiterList.stream() + .map(frameLimiter -> frameLimiter.start(resultSet)) + .collect(Collectors.toList()); + + return new Context() { + @Override public boolean limitReached() { + return contexts.stream().anyMatch(Context::limitReached); + } + + @Override public void addRow(Object[] row) { + contexts.forEach(c -> c.addRow(row)); + } + }; + } + }; + } + } +} +// End FrameLimiters.java diff --git a/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java index 7f19b11a3b..4e2ab243a6 100644 --- a/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java +++ b/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java @@ -18,6 +18,7 @@ import org.apache.calcite.avatica.AvaticaParameter; import org.apache.calcite.avatica.AvaticaPreparedStatement; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ConnectionPropertiesImpl; @@ -756,7 +757,8 @@ public ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRo AvaticaUtils.getLargeUpdateCount(statement))); } else { resultSets.add( - JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), maxRowsInFirstFrame)); + JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), + createFrameLimiter(maxRowsInFirstFrame))); } LOG.trace("prepAndExec statement {}", h); // TODO: review client to ensure statementId is updated when appropriate @@ -823,7 +825,8 @@ public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws return Frame.EMPTY; } else { return JdbcResultSet.frame(statementInfo, statementInfo.getResultSet(), offset, - fetchMaxRowCount, calendar, Optional.absent()); + createFrameLimiter(fetchMaxRowCount), + calendar, Optional.absent()); } } catch (SQLException e) { throw propagate(e); @@ -885,7 +888,7 @@ private static String[] toArray(List typeList) { } else { resultSets = Collections.singletonList( JdbcResultSet.create(h.connectionId, h.id, statementInfo.getResultSet(), - maxRowsInFirstFrame, signature2)); + createFrameLimiter(maxRowsInFirstFrame), signature2)); } } else { resultSets = Collections.singletonList( @@ -993,6 +996,18 @@ private static String[] toArray(List typeList) { } } + protected FrameLimiter createFrameLimiter(int maxRowCount) { + if (maxRowCount == JdbcMeta.UNLIMITED_COUNT) { + return FrameLimiters.unlimited(); + } else if (maxRowCount < 0L) { + return FrameLimiters.rowCountLimited(AvaticaStatement.DEFAULT_FETCH_SIZE); + } else if (maxRowCount > AvaticaStatement.DEFAULT_FETCH_SIZE) { + return FrameLimiters.rowCountLimited(AvaticaStatement.DEFAULT_FETCH_SIZE); + } else { + return FrameLimiters.rowCountLimited(maxRowCount); + } + } + /** Configurable statement cache settings. */ public enum StatementCacheSettings { /** JDBC connection property for setting connection cache concurrency level. */ diff --git a/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java index 7aa7a731db..af20142625 100644 --- a/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java +++ b/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java @@ -16,7 +16,6 @@ */ package org.apache.calcite.avatica.jdbc; -import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ColumnMetaData.ArrayType; @@ -65,41 +64,28 @@ public static JdbcResultSet create(String connectionId, int statementId, ResultSet resultSet) { // -1 still limits to 100 but -2 does not limit to any number return create(connectionId, statementId, resultSet, - JdbcMeta.UNLIMITED_COUNT); + FrameLimiters.unlimited()); } - /** Creates a result set with maxRowCount. - * - *

If {@code maxRowCount} is -2 ({@link JdbcMeta#UNLIMITED_COUNT}), - * returns an unlimited number of rows in a single frame; any other - * negative value (typically -1) returns an unlimited number of rows - * in frames of the default frame size. */ + /** + * Creates a result set with the given FrameLimiter. + */ public static JdbcResultSet create(String connectionId, int statementId, - ResultSet resultSet, int maxRowCount) { + ResultSet resultSet, FrameLimiter frameLimiter) { try { Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData()); - return create(connectionId, statementId, resultSet, maxRowCount, sig); + return create(connectionId, statementId, resultSet, frameLimiter, sig); } catch (SQLException e) { throw new RuntimeException(e); } } public static JdbcResultSet create(String connectionId, int statementId, - ResultSet resultSet, int maxRowCount, Meta.Signature signature) { + ResultSet resultSet, FrameLimiter frameLimiter, Meta.Signature signature) { try { final Calendar calendar = DateTimeUtils.calendar(); - final int fetchRowCount; - if (maxRowCount == JdbcMeta.UNLIMITED_COUNT) { - fetchRowCount = -1; - } else if (maxRowCount < 0L) { - fetchRowCount = AvaticaStatement.DEFAULT_FETCH_SIZE; - } else if (maxRowCount > AvaticaStatement.DEFAULT_FETCH_SIZE) { - fetchRowCount = AvaticaStatement.DEFAULT_FETCH_SIZE; - } else { - fetchRowCount = maxRowCount; - } - final Meta.Frame firstFrame = frame(null, resultSet, 0, fetchRowCount, calendar, - Optional.of(signature)); + final Meta.Frame firstFrame = frame(null, resultSet, 0, frameLimiter, + calendar, Optional.of(signature)); if (firstFrame.done) { resultSet.close(); } @@ -126,7 +112,8 @@ public static JdbcResultSet count(String connectionId, int statementId, /** Creates a frame containing a given number or unlimited number of rows * from a result set. */ static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset, - int fetchMaxRowCount, Calendar calendar, Optional sig) throws SQLException { + FrameLimiter frameLimiter, Calendar calendar, + Optional sig) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); final int[] types = new int[columnCount]; @@ -139,8 +126,9 @@ static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset, } final List rows = new ArrayList<>(); // Meta prepare/prepareAndExecute 0 return 0 row and done - boolean done = fetchMaxRowCount == 0; - for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) { + boolean done = frameLimiter.getRowCountLimit().map(limit -> limit == 0).orElse(false); + FrameLimiter.Context frameLimiterContext = frameLimiter.start(resultSet); + while (!frameLimiterContext.limitReached()) { final boolean hasRow; if (null != info) { hasRow = info.next(); @@ -179,6 +167,7 @@ static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset, } } rows.add(columns); + frameLimiterContext.addRow(columns); } return new Meta.Frame(offset, done, rows); } diff --git a/server/src/test/java/org/apache/calcite/avatica/jdbc/FrameLimitersTest.java b/server/src/test/java/org/apache/calcite/avatica/jdbc/FrameLimitersTest.java new file mode 100644 index 0000000000..72ed5d1501 --- /dev/null +++ b/server/src/test/java/org/apache/calcite/avatica/jdbc/FrameLimitersTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.sql.ResultSet; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link FrameLimiters} + */ +public class FrameLimitersTest { + + private static final ResultSet RESULT_SET = Mockito.mock(ResultSet.class); + private static final Object[] ROW = {1L, "ONE"}; + + @Test public void testUnlimited() { + FrameLimiter unlimited = FrameLimiters.unlimited(); + assertEquals(Optional.empty(), unlimited.getRowCountLimit()); + FrameLimiter.Context limiterContext = unlimited.start(RESULT_SET); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertFalse(limiterContext.limitReached()); + } + + @Test public void testRowCountLimited() { + FrameLimiter rowCountLimited = FrameLimiters.rowCountLimited(2); + assertEquals(Optional.of(2), rowCountLimited.getRowCountLimit()); + FrameLimiter.Context limiterContext = rowCountLimited.start(RESULT_SET); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertTrue(limiterContext.limitReached()); + } + + @Test public void testTimelimited() { + TestClock testClock = new TestClock(); + FrameLimiter timeLimited = FrameLimiters.timeLimited(2, testClock); + assertEquals(Optional.empty(), timeLimited.getRowCountLimit()); + FrameLimiter.Context limiterContext = timeLimited.start(RESULT_SET); + + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + testClock.addMillis(2); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + testClock.addMillis(1); + assertTrue(limiterContext.limitReached()); + } + + @Test + public void testCombined() { + TestClock testClock = new TestClock(); + FrameLimiter timeLimited = FrameLimiters.timeLimited(2, testClock); + FrameLimiter rowCountLimited = FrameLimiters.rowCountLimited(3); + + FrameLimiter combined = FrameLimiters.combined(timeLimited, rowCountLimited); + assertEquals(Optional.of(3), combined.getRowCountLimit()); + + // Test that the clock-based limiting works + FrameLimiter.Context limiterContext = combined.start(RESULT_SET); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + testClock.addMillis(2); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + testClock.addMillis(1); + assertTrue(limiterContext.limitReached()); + + // Test that count-based limiting works + limiterContext = rowCountLimited.start(RESULT_SET); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertFalse(limiterContext.limitReached()); + limiterContext.addRow(ROW); + assertTrue(limiterContext.limitReached()); + } + + /** + * Clock implementation that allows specifying the current time + */ + private static class TestClock extends Clock { + + private Instant currentTime = Instant.now(); + + @Override public ZoneId getZone() { + throw new UnsupportedOperationException(); + } + + @Override public Clock withZone(ZoneId zone) { + throw new UnsupportedOperationException(); + } + + void addMillis(long millis) { + this.currentTime = currentTime.plusMillis(millis); + } + + @Override public Instant instant() { + return this.currentTime; + } + } +} +// End FrameLimitersTest.java