Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 75 additions & 35 deletions src/main/java/com/upsolver/datasources/jdbc/JDBCDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
private static final String timestampColumnsProp = "Timestamp Columns";
private static final String readDelayProp = "Read Delay";
private static final String fullLoadIntervalProp = "Full Load Interval";
private static final String loadIntervalProp = "Incremental Load Interval";
private static final String userNameProp = "User Name";
private static final String passwordProp = "Password";
private static final String keepSourceTypes = "Keep JDBC source types";
Expand All @@ -83,10 +84,12 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
new SimplePropertyDescription(timestampColumnsProp, "Comma separated list of timestamp columns to use for loading new rows. The fist non-null value will be used. At least one of the values must not be null for each row", true),
new SimplePropertyDescription(readDelayProp, "How long (in seconds) to wait before reading rows based on their timestamp. This allows waiting for all transactions of a certain timestamp to complete to avoid loading partial data. Default value is 0", true),
new SimplePropertyDescription(fullLoadIntervalProp, "If set the full table will be read every configured interval (in minutes). When this is configured the update time and incrementing columns are not used.", true),
new SimplePropertyDescription(keepSourceTypes, "Keep original data types from source to use string representation", true, false, null, null, null, true, Optional.of("true")));
new SimplePropertyDescription(keepSourceTypes, "Keep original data types from source to use string representation", true, false, null, null, null, true, Optional.of("true")),
new SimplePropertyDescription(loadIntervalProp, "Configures how often (in minutes) the data source will poll the database for new changes", true));

private long readDelay;
private long fullLoadIntervalMinutes;
private long loadIntervalMinutes;
private TableInfo tableInfo;
private QueryDialect queryDialect;
private long dbTimezoneOffset;
Expand All @@ -103,6 +106,10 @@ private boolean isFullLoad() {
return fullLoadIntervalMinutes > 0;
}

private boolean hasCustomLoadInterval() {
return loadIntervalMinutes > 1;
}

@Override
public DataSourceDescription getDataSourceDescription() {
return new JDBCDataSourceDescription();
Expand Down Expand Up @@ -146,6 +153,7 @@ public void setProperties(Map<String, String> properties) {
try (Connection con = getConnection()) {
readDelay = Long.parseLong(properties.getOrDefault(readDelayProp, "0"));
fullLoadIntervalMinutes = Long.parseLong(properties.getOrDefault(fullLoadIntervalProp, "0"));
loadIntervalMinutes = Long.parseLong(properties.getOrDefault(loadIntervalProp, "1"));
DatabaseMetaData metadata = con.getMetaData();
String userProvidedIncColumn = properties.get(incrementingColumnNameProp);
tableInfo = loadTableInfo(metadata, properties.getOrDefault(schemaPatternProp, null), properties.get(tableNameProp));
Expand Down Expand Up @@ -404,9 +412,10 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta

var taskCount = completedRanges.size() + wantedRanges.size();
var itemsPerTask = (taskInfo.getMetadata().itemsPerTask(taskCount));
var emptyFullLoad = isFullLoad() && wantedRanges.stream().noneMatch(this::matchesLoadInterval);
var skipAll = hasCustomLoadInterval() && wantedRanges.stream().noneMatch(this::matchesLoadInterval);
var emptyFullLoad = isFullLoad() && wantedRanges.stream().noneMatch(this::matchesFullLoadInterval);
var noDataToLoad = !isFullLoad() && !tableInfo.hasTimeColumns() && itemsPerTask == 0;
if (emptyFullLoad || noDataToLoad) {
if (skipAll || emptyFullLoad || noDataToLoad) {
List<DataLoader<JDBCTaskMetadata>> result =
wantedRanges.stream().map(t -> new NoDataLoader(t, taskInfo.getMetadata())).collect(Collectors.toList());
return CompletableFuture.completedFuture(result.iterator());
Expand All @@ -423,8 +432,16 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
}
}

private boolean matchesFullLoadInterval(TaskRange x) {
return getTimeInMinutes(x.getInclusiveStartTime()) % fullLoadIntervalMinutes == 0;
}

private boolean matchesLoadInterval(TaskRange x) {
return x.getInclusiveStartTime().getEpochSecond() / 60 % fullLoadIntervalMinutes == 0;
return getTimeInMinutes(x.getInclusiveStartTime()) % loadIntervalMinutes == 0;
}

private Long getTimeInMinutes(Instant time) {
return time.getEpochSecond() / 60L;
}

private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata> taskInfo,
Expand All @@ -445,10 +462,18 @@ private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata>
// First task does not have lower bound to ensure we don't skip data from the last point we stopped at
var startTime =
firstInBatch ? taskInfo.getMetadata().getStartTime() : wantedRange.getInclusiveStartTime().minusSeconds(readDelay);
var lowerBound = taskInfo.getMetadata().getStartTime().getEpochSecond();
var truncatedStartTime = Math.max(lowerBound, hasCustomLoadInterval() ?
(getTimeInMinutes(startTime) / loadIntervalMinutes * 60 * loadIntervalMinutes)
: startTime.getEpochSecond());
var endTime = wantedRange.getExclusiveEndTime().minusSeconds(readDelay);
var truncatedEndTime = Math.max(lowerBound,hasCustomLoadInterval() ?
getTimeInMinutes(endTime) / loadIntervalMinutes * 60 * loadIntervalMinutes
: endTime.getEpochSecond());
var metadata = new JDBCTaskMetadata(taskInfo.getMetadata().getInclusiveStart(),
taskInfo.getMetadata().getExclusiveEnd(),
startTime,
wantedRange.getExclusiveEndTime().minusSeconds(readDelay));
Instant.ofEpochSecond(truncatedStartTime),
Instant.ofEpochSecond(truncatedEndTime));
result.add(metadata);
}
} else {
Expand Down Expand Up @@ -486,43 +511,58 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result
final var isLast = i == wantedRanges.size() - 1;
final var taskRange = wantedRanges.get(i);
final var metadata = runMetadatas.get(i);
var loader = new DataLoader<JDBCTaskMetadata>() {
@Override
public TaskRange getTaskRange() {
return taskRange;
}

private final RowReader rowReader = new RowReader(tableInfo, valueGetter, metadata, connection, isFullLoad() && matchesLoadInterval(taskRange));

@Override
public Iterator<LoadedData> loadData() {
ResultSetInputStream inputStream = new ResultSetInputStream(rowConverter, rowReader, isLast);
var result = new LoadedData(inputStream, new HashMap<>(), taskRange.getInclusiveStartTime());
return Collections.singleton(result).iterator();
}

@Override
public JDBCTaskMetadata getCompletedMetadata() {
if (tableInfo.hasTimeColumns() && rowReader.readValues()) {
if (rowReader.readValues()) {
// If some data was successfully read then that's our next start point
lastReadTime.set(toUtc(rowReader.getLastTimestampValue().toInstant()));
lastReadIncValue.set(rowReader.getLastIncValue());
}
metadata.setExclusiveEnd(lastReadIncValue.get() + 1);
metadata.setEndTime(lastReadTime.get());
DataLoader<JDBCTaskMetadata> loader = null;
if (matchesLoadInterval(taskRange)){
loader = getLoader(connection,
lastReadIncValue,
lastReadTime,
valueGetter,
isLast,
taskRange,
metadata);
} else {
loader = new NoDataLoader(taskRange, metadata);
}

return metadata;
}
};
result.add(loader);

}
return CompletableFuture.completedFuture(result.iterator());

}

private DataLoader<JDBCTaskMetadata> getLoader(Connection connection, AtomicReference<Long> lastReadIncValue, AtomicReference<Instant> lastReadTime, ResultSetValuesGetter valueGetter, boolean isLast, TaskRange taskRange, JDBCTaskMetadata metadata) {
return new DataLoader<JDBCTaskMetadata>() {
@Override
public TaskRange getTaskRange() {
return taskRange;
}

private final RowReader rowReader = new RowReader(tableInfo, valueGetter, metadata, connection, isFullLoad() && matchesFullLoadInterval(taskRange));

@Override
public Iterator<LoadedData> loadData() {
ResultSetInputStream inputStream = new ResultSetInputStream(rowConverter, rowReader, isLast);
var result = new LoadedData(inputStream, new HashMap<>(), taskRange.getInclusiveStartTime());
return Collections.singleton(result).iterator();
}

@Override
public JDBCTaskMetadata getCompletedMetadata() {
if (tableInfo.hasTimeColumns() && rowReader.readValues()) {
if (rowReader.readValues()) {
// If some data was successfully read then that's our next start point
lastReadTime.set(toUtc(rowReader.getLastTimestampValue().toInstant()));
lastReadIncValue.set(rowReader.getLastIncValue());
}
metadata.setExclusiveEnd(lastReadIncValue.get() + 1);
metadata.setEndTime(lastReadTime.get());
}

return metadata;
}
};
}


@Override
public CompletionStage<TaskInformation<JDBCTaskMetadata>> getTaskInfo(JDBCTaskMetadata previousTaskMetadata,
Expand Down