diff --git a/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java b/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java index 7b2bb5b..d0502e9 100644 --- a/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java +++ b/src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java @@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource 0; } + private boolean hasCustomLoadInterval() { + return loadIntervalMinutes > 1; + } + @Override public DataSourceDescription getDataSourceDescription() { return new JDBCDataSourceDescription(); @@ -146,6 +153,7 @@ public void setProperties(Map properties) { try (Connection con = getConnection()) { readDelay = Long.parseLong(properties.getOrDefault(readDelayProp, "0")); fullLoadIntervalMinutes = Long.parseLong(properties.getOrDefault(fullLoadIntervalProp, "0")); + loadIntervalMinutes = Long.parseLong(properties.getOrDefault(loadIntervalProp, "1")); DatabaseMetaData metadata = con.getMetaData(); String userProvidedIncColumn = properties.get(incrementingColumnNameProp); tableInfo = loadTableInfo(metadata, properties.getOrDefault(schemaPatternProp, null), properties.get(tableNameProp)); @@ -404,9 +412,10 @@ public CompletionStage>> getDataLoaders(Ta var taskCount = completedRanges.size() + wantedRanges.size(); var itemsPerTask = (taskInfo.getMetadata().itemsPerTask(taskCount)); - var emptyFullLoad = isFullLoad() && wantedRanges.stream().noneMatch(this::matchesLoadInterval); + var skipAll = hasCustomLoadInterval() && wantedRanges.stream().noneMatch(this::matchesLoadInterval); + var emptyFullLoad = isFullLoad() && wantedRanges.stream().noneMatch(this::matchesFullLoadInterval); var noDataToLoad = !isFullLoad() && !tableInfo.hasTimeColumns() && itemsPerTask == 0; - if (emptyFullLoad || noDataToLoad) { + if (skipAll || emptyFullLoad || noDataToLoad) { List> result = wantedRanges.stream().map(t -> new NoDataLoader(t, taskInfo.getMetadata())).collect(Collectors.toList()); return CompletableFuture.completedFuture(result.iterator()); @@ -423,8 +432,16 @@ public CompletionStage>> getDataLoaders(Ta } } + private boolean matchesFullLoadInterval(TaskRange x) { + return getTimeInMinutes(x.getInclusiveStartTime()) % fullLoadIntervalMinutes == 0; + } + private boolean matchesLoadInterval(TaskRange x) { - return x.getInclusiveStartTime().getEpochSecond() / 60 % fullLoadIntervalMinutes == 0; + return getTimeInMinutes(x.getInclusiveStartTime()) % loadIntervalMinutes == 0; + } + + private Long getTimeInMinutes(Instant time) { + return time.getEpochSecond() / 60L; } private List getRunMetadatas(TaskInformation taskInfo, @@ -445,10 +462,18 @@ private List getRunMetadatas(TaskInformation // First task does not have lower bound to ensure we don't skip data from the last point we stopped at var startTime = firstInBatch ? taskInfo.getMetadata().getStartTime() : wantedRange.getInclusiveStartTime().minusSeconds(readDelay); + var lowerBound = taskInfo.getMetadata().getStartTime().getEpochSecond(); + var truncatedStartTime = Math.max(lowerBound, hasCustomLoadInterval() ? + (getTimeInMinutes(startTime) / loadIntervalMinutes * 60 * loadIntervalMinutes) + : startTime.getEpochSecond()); + var endTime = wantedRange.getExclusiveEndTime().minusSeconds(readDelay); + var truncatedEndTime = Math.max(lowerBound,hasCustomLoadInterval() ? + getTimeInMinutes(endTime) / loadIntervalMinutes * 60 * loadIntervalMinutes + : endTime.getEpochSecond()); var metadata = new JDBCTaskMetadata(taskInfo.getMetadata().getInclusiveStart(), taskInfo.getMetadata().getExclusiveEnd(), - startTime, - wantedRange.getExclusiveEndTime().minusSeconds(readDelay)); + Instant.ofEpochSecond(truncatedStartTime), + Instant.ofEpochSecond(truncatedEndTime)); result.add(metadata); } } else { @@ -486,36 +511,18 @@ private CompletionStage>> splitData(Result final var isLast = i == wantedRanges.size() - 1; final var taskRange = wantedRanges.get(i); final var metadata = runMetadatas.get(i); - var loader = new DataLoader() { - @Override - public TaskRange getTaskRange() { - return taskRange; - } - - private final RowReader rowReader = new RowReader(tableInfo, valueGetter, metadata, connection, isFullLoad() && matchesLoadInterval(taskRange)); - - @Override - public Iterator loadData() { - ResultSetInputStream inputStream = new ResultSetInputStream(rowConverter, rowReader, isLast); - var result = new LoadedData(inputStream, new HashMap<>(), taskRange.getInclusiveStartTime()); - return Collections.singleton(result).iterator(); - } - - @Override - public JDBCTaskMetadata getCompletedMetadata() { - if (tableInfo.hasTimeColumns() && rowReader.readValues()) { - if (rowReader.readValues()) { - // If some data was successfully read then that's our next start point - lastReadTime.set(toUtc(rowReader.getLastTimestampValue().toInstant())); - lastReadIncValue.set(rowReader.getLastIncValue()); - } - metadata.setExclusiveEnd(lastReadIncValue.get() + 1); - metadata.setEndTime(lastReadTime.get()); + DataLoader loader = null; + if (matchesLoadInterval(taskRange)){ + loader = getLoader(connection, + lastReadIncValue, + lastReadTime, + valueGetter, + isLast, + taskRange, + metadata); + } else { + loader = new NoDataLoader(taskRange, metadata); } - - return metadata; - } - }; result.add(loader); } @@ -523,6 +530,39 @@ public JDBCTaskMetadata getCompletedMetadata() { } + private DataLoader getLoader(Connection connection, AtomicReference lastReadIncValue, AtomicReference lastReadTime, ResultSetValuesGetter valueGetter, boolean isLast, TaskRange taskRange, JDBCTaskMetadata metadata) { + return new DataLoader() { + @Override + public TaskRange getTaskRange() { + return taskRange; + } + + private final RowReader rowReader = new RowReader(tableInfo, valueGetter, metadata, connection, isFullLoad() && matchesFullLoadInterval(taskRange)); + + @Override + public Iterator loadData() { + ResultSetInputStream inputStream = new ResultSetInputStream(rowConverter, rowReader, isLast); + var result = new LoadedData(inputStream, new HashMap<>(), taskRange.getInclusiveStartTime()); + return Collections.singleton(result).iterator(); + } + + @Override + public JDBCTaskMetadata getCompletedMetadata() { + if (tableInfo.hasTimeColumns() && rowReader.readValues()) { + if (rowReader.readValues()) { + // If some data was successfully read then that's our next start point + lastReadTime.set(toUtc(rowReader.getLastTimestampValue().toInstant())); + lastReadIncValue.set(rowReader.getLastIncValue()); + } + metadata.setExclusiveEnd(lastReadIncValue.get() + 1); + metadata.setEndTime(lastReadTime.get()); + } + + return metadata; + } + }; + } + @Override public CompletionStage> getTaskInfo(JDBCTaskMetadata previousTaskMetadata,