Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
exports org.cloudfoundry.multiapps.controller.persistence.services;
exports org.cloudfoundry.multiapps.controller.persistence.util;
exports org.cloudfoundry.multiapps.controller.persistence.stream;
exports org.cloudfoundry.multiapps.controller.persistence.query.options;

requires transitive io.pivotal.cfenv.core;
requires transitive io.pivotal.cfenv.jdbc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import java.sql.ResultSet;
import java.sql.SQLException;

import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;

public interface DataSourceDialect {

String getSequenceNextValueSyntax(String sequenceName);

InputStream getBinaryStreamFromBlob(ResultSet rs, String columnName, StreamFetchingOptions streamFetchingOptions) throws SQLException;

InputStream getBinaryStreamFromBlob(ResultSet rs, String columnName) throws SQLException;

void setBlobAsBinaryStream(PreparedStatement ps, int index, InputStream is) throws SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;

import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;

public class DefaultDataSourceDialect implements DataSourceDialect {

@Override
Expand All @@ -19,6 +21,17 @@ public InputStream getBinaryStreamFromBlob(ResultSet rs, String columnName) thro
.getBinaryStream();
}

@Override
public InputStream getBinaryStreamFromBlob(ResultSet rs, String columnName, StreamFetchingOptions streamFetchingOptions)
throws SQLException {
return rs.getBlob(columnName)
// + 1 is required as the first position is 1 instead of 0
// pos – the offset to the first byte of the partial value to be retrieved. The first byte in the Blob is at
// position 1.
.getBinaryStream(streamFetchingOptions.startOffset() + 1,
streamFetchingOptions.endOffset() - streamFetchingOptions.startOffset() + 1);
}

@Override
public void setBlobAsBinaryStream(PreparedStatement ps, int index, InputStream is) throws SQLException {
ps.setBlob(index, is);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.cloudfoundry.multiapps.controller.persistence.query.options;

public record StreamFetchingOptions(long startOffset, long endOffset) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.sql.SQLException;

import org.cloudfoundry.multiapps.controller.persistence.dialects.DataSourceDialect;
import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;

public class BlobSqlFileQueryProvider extends SqlFileQueryProvider {

Expand All @@ -23,4 +24,10 @@ protected InputStream getContentBinaryStream(ResultSet resultSet, String columnN
return getDataSourceDialect().getBinaryStreamFromBlob(resultSet, columnName);
}

@Override
protected InputStream getContentBinaryStreamWithOffset(ResultSet resultSet, String columnName,
StreamFetchingOptions streamFetchingOptions)
throws SQLException {
return getDataSourceDialect().getBinaryStreamFromBlob(resultSet, columnName, streamFetchingOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.sql.SQLException;

import org.cloudfoundry.multiapps.controller.persistence.dialects.DataSourceDialect;
import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;

public class ByteArraySqlFileQueryProvider extends SqlFileQueryProvider {

Expand All @@ -30,4 +31,9 @@ protected InputStream getContentBinaryStream(ResultSet resultSet, String columnN
return getDataSourceDialect().getBinaryStreamFromByteArray(resultSet, columnName);
}

@Override
protected InputStream getContentBinaryStreamWithOffset(ResultSet resultSet, String columnName,
StreamFetchingOptions streamFetchingOptions) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.sql.ResultSet;

import org.cloudfoundry.multiapps.controller.persistence.dialects.DataSourceDialect;
import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;

public class ExternalSqlFileQueryProvider extends SqlFileQueryProvider {

Expand All @@ -22,4 +23,9 @@ protected InputStream getContentBinaryStream(ResultSet resultSet, String columnN
throw new UnsupportedOperationException();
}

@Override
protected InputStream getContentBinaryStreamWithOffset(ResultSet resultSet, String columnName,
StreamFetchingOptions streamFetchingOptions) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
import java.util.List;
import java.util.stream.IntStream;

import jakarta.xml.bind.DatatypeConverter;

import org.cloudfoundry.multiapps.controller.persistence.Constants;
import org.cloudfoundry.multiapps.controller.persistence.Messages;
import org.cloudfoundry.multiapps.controller.persistence.dialects.DataSourceDialect;
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry;
import org.cloudfoundry.multiapps.controller.persistence.query.SqlQuery;
import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;
import org.cloudfoundry.multiapps.controller.persistence.services.FileContentProcessor;
import org.cloudfoundry.multiapps.controller.persistence.stream.DBInputStream;
import org.cloudfoundry.multiapps.controller.persistence.util.JdbcUtil;
import org.slf4j.Logger;

import jakarta.xml.bind.DatatypeConverter;

public abstract class SqlFileQueryProvider {

private static final String INSERT_FILE_ATTRIBUTES_AND_CONTENT_WITHOUT_DIGEST = "INSERT INTO %s (FILE_ID, SPACE, FILE_NAME, NAMESPACE, FILE_SIZE, DIGEST_ALGORITHM, MODIFIED, OPERATION_ID, %s) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
Expand Down Expand Up @@ -244,10 +245,7 @@ public SqlQuery<DBInputStream> openFileWithContentQuery(String space, String id)
statement.setString(2, space);
resultSet = statement.executeQuery();
if (resultSet.next()) {
return new DBInputStream(getContentBinaryStream(resultSet, getContentColumnName()),
statement,
resultSet,
connection);
return new DBInputStream(getContentBinaryStream(resultSet, getContentColumnName()), statement, resultSet, connection);
} else {
throw new SQLException(MessageFormat.format(Messages.FILE_NOT_FOUND, id));
}
Expand Down Expand Up @@ -281,6 +279,29 @@ public <T> SqlQuery<T> getProcessFileWithContentQuery(String space, String id, F
};
}

public <T> SqlQuery<T> getProcessFileWithContentQueryWithOffsetQuery(String space, String id,
StreamFetchingOptions streamFetchingOptions,
FileContentProcessor<T> fileContentProcessor) {
return (Connection connection) -> {
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
statement = connection.prepareStatement(getSelectWithContentQuery());
statement.setString(1, id);
statement.setString(2, space);
resultSet = statement.executeQuery();
if (resultSet.next()) {
return processFileContentWithOffset(resultSet, streamFetchingOptions, fileContentProcessor);
} else {
throw new SQLException(MessageFormat.format(Messages.FILE_NOT_FOUND, id));
}
} finally {
JdbcUtil.closeQuietly(resultSet);
JdbcUtil.closeQuietly(statement);
}
};
}

public SqlQuery<Integer> getDeleteBySpaceAndNamespaceQuery(String space, String namespace) {
return (Connection connection) -> {
PreparedStatement statement = null;
Expand Down Expand Up @@ -423,8 +444,31 @@ private <T> T processFileContent(ResultSet resultSet, FileContentProcessor<T> fi
}
}

private <T> T processFileContentWithOffset(ResultSet resultSet, StreamFetchingOptions streamFetchingOptions,
FileContentProcessor<T> fileContentProcessor)
throws SQLException {
InputStream fileStream = getContentBinaryStreamWithOffset(resultSet, getContentColumnName(), streamFetchingOptions);
try {
return fileContentProcessor.process(fileStream);
} catch (Exception e) {
throw new SQLException(e.getMessage(), e);
} finally {
if (fileStream != null) {
try {
fileStream.close();
} catch (IOException e) {
logger.error(Messages.UPLOAD_STREAM_FAILED_TO_CLOSE, e);
}
}
}
}

protected abstract InputStream getContentBinaryStream(ResultSet resultSet, String columnName) throws SQLException;

protected abstract InputStream getContentBinaryStreamWithOffset(ResultSet resultSet, String columnName,
StreamFetchingOptions streamFetchingOptions)
throws SQLException;

private PreparedStatement getFilesStatementBasedOnNamespace(Connection connection, String space, String namespace) throws SQLException {
PreparedStatement statement;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.cloudfoundry.multiapps.controller.persistence.DataSourceWithDialect;
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry;
import org.cloudfoundry.multiapps.controller.persistence.query.options.StreamFetchingOptions;
import org.cloudfoundry.multiapps.controller.persistence.query.providers.BlobSqlFileQueryProvider;
import org.cloudfoundry.multiapps.controller.persistence.query.providers.SqlFileQueryProvider;

Expand All @@ -26,6 +27,20 @@ protected DatabaseFileService(DataSourceWithDialect dataSourceWithDialect, SqlFi
super(dataSourceWithDialect, sqlFileQueryProvider, null);
}

@Override
public <T> T processFileContentWithOffset(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
throws FileStorageException {
try {
return getSqlQueryExecutor().execute(getSqlFileQueryProvider().getProcessFileWithContentQueryWithOffsetQuery(fileContentToProcess.getSpaceGuid(),
fileContentToProcess.getGuid(),
new StreamFetchingOptions(fileContentToProcess.getStartOffset(),
fileContentToProcess.getEndOffset()),
fileContentProcessor));
} catch (SQLException e) {
throw new FileStorageException(e.getMessage(), e);
}
}

@Override
public <T> T processFileContent(String space, String id, FileContentProcessor<T> fileContentProcessor) throws FileStorageException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.cloudfoundry.multiapps.controller.persistence.services;

import org.immutables.value.Value;

@Value.Immutable
public interface FileContentToProcess {

String getGuid();

String getSpaceGuid();

long getStartOffset();

long getEndOffset();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.util.List;
import java.util.UUID;

import jakarta.xml.bind.DatatypeConverter;

import org.cloudfoundry.multiapps.controller.persistence.Constants;
import org.cloudfoundry.multiapps.controller.persistence.DataSourceWithDialect;
import org.cloudfoundry.multiapps.controller.persistence.Messages;
Expand All @@ -29,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.xml.bind.DatatypeConverter;

public class FileService {

protected static final String DEFAULT_TABLE_NAME = "LM_SL_PERSISTENCE_FILE";
Expand Down Expand Up @@ -128,6 +128,19 @@ public void consumeFileContent(String space, String id, FileContentConsumer file
});
}

public void consumeFileContentWithOffset(FileContentToProcess fileContentToProcess, FileContentConsumer fileContentConsumer)
throws FileStorageException {
processFileContentWithOffset(fileContentToProcess, inputStream -> {
fileContentConsumer.consume(inputStream);
return null;
});
}

public <T> T processFileContentWithOffset(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
throws FileStorageException {
return fileStorage.processArchiveEntryContent(fileContentToProcess, fileContentProcessor);
}

public <T> T processFileContent(String space, String id, FileContentProcessor<T> fileContentProcessor) throws FileStorageException {
return fileStorage.processFileContent(space, id, fileContentProcessor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public interface FileStorage {

void deleteFilesByIds(List<String> fileIds) throws FileStorageException;

<T> T processArchiveEntryContent(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
throws FileStorageException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.options.GetOptions;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.http.HttpResponseException;
Expand All @@ -37,7 +38,7 @@
public class ObjectStoreFileStorage implements FileStorage {

private static final Logger LOGGER = LoggerFactory.getLogger(ObjectStoreFileStorage.class);

private static final int MAX_RETRIES_COUNT = 3;
private static final long RETRY_BASE_WAIT_TIME_IN_MILLIS = 5000L;

private final BlobStore blobStore;
Expand Down Expand Up @@ -110,6 +111,44 @@ public <T> T processFileContent(String space, String id, FileContentProcessor<T>
}
}

@Override
public <T> T processArchiveEntryContent(FileContentToProcess fileContentToProcess, FileContentProcessor<T> fileContentProcessor)
throws FileStorageException {
FileEntry fileEntry = createFileEntry(fileContentToProcess.getSpaceGuid(), fileContentToProcess.getGuid());
try {
Payload payload = getBlobPayloadWithOffset(fileEntry, fileContentToProcess.getStartOffset(),
fileContentToProcess.getEndOffset());
return processContent(fileContentProcessor, payload);
} catch (Exception e) {
throw new FileStorageException(e);
}
}

private Payload getBlobPayloadWithOffset(FileEntry fileEntry, long startOffset, long endOffset) throws FileStorageException {
Blob blob = getBlobWithRetriesWithOffset(fileEntry, MAX_RETRIES_COUNT, startOffset, endOffset);
if (blob == null) {
throw new FileStorageException(MessageFormat.format(Messages.FILE_WITH_ID_AND_SPACE_DOES_NOT_EXIST, fileEntry.getId(),
fileEntry.getSpace()));
}
return blob.getPayload();
}

private Blob getBlobWithRetriesWithOffset(FileEntry fileEntry, int retries, long startOffset, long endOffset) {
GetOptions getOptions = new GetOptions().range(startOffset, endOffset);
for (int i = 1; i <= retries; i++) {
Blob blob = blobStore.getBlob(container, fileEntry.getId(), getOptions);
if (blob != null) {
return blob;
}
LOGGER.warn(MessageFormat.format(Messages.ATTEMPT_TO_DOWNLOAD_MISSING_BLOB, i, retries, fileEntry.getId()));
if (i == retries) {
break;
}
MiscUtil.sleep(i * getRetryWaitTime());
}
return null;
}

private Payload getBlobPayload(FileEntry fileEntry) throws FileStorageException {
Blob blob = getBlobWithRetries(fileEntry, 3);
if (blob == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void close() throws IOException {
try {
JdbcUtil.commit(connection);
} catch (SQLException e) {
throw new SLException(e.getMessage(), e);
throw new SLException(e, e.getMessage());
} finally {
setAutoCommit();
JdbcUtil.closeQuietly(connection);
Expand All @@ -48,7 +48,7 @@ private void setAutoCommit() {
try {
JdbcUtil.setAutoCommitSafely(connection);
} catch (SQLException e) {
throw new SLException(e.getMessage(), e);
throw new SLException(e, e.getMessage());
}
}
}
4 changes: 4 additions & 0 deletions multiapps-controller-process/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
</build>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
requires java.sql;
requires jakarta.xml.bind;
requires jakarta.inject;
requires org.apache.commons.compress;
requires org.apache.logging.log4j.core;
requires org.apache.logging.log4j;
requires org.apache.commons.collections4;
Expand Down
Loading
Loading