Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.catalog.StarRocksTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */
public class StarRocksEnrichedCatalog extends StarRocksCatalog {
Expand All @@ -38,6 +41,70 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password

private static final Logger LOG = LoggerFactory.getLogger(StarRocksEnrichedCatalog.class);

@Override
public void createTable(StarRocksTable table, boolean ignoreIfExists)
throws StarRocksCatalogException {
String createTableSql = buildCreateTableSql(table, ignoreIfExists);
try {
executeUpdateStatement(createTableSql);
LOG.info(
"Success to create table {}.{}, sql: {}",
table.getDatabaseName(),
table.getDatabaseName(),
createTableSql);
} catch (Exception e) {
LOG.error(
"Failed to create table {}.{}, sql: {}",
table.getDatabaseName(),
table.getDatabaseName(),
createTableSql,
e);
throw new StarRocksCatalogException(
String.format(
"Failed to create table %s.%s",
table.getDatabaseName(), table.getDatabaseName()),
e);
}
}

@Override
public void alterAddColumns(
String databaseName,
String tableName,
List<StarRocksColumn> addColumns,
long timeoutSecond)
throws StarRocksCatalogException {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(databaseName),
"database name cannot be null or empty.");
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tableName),
"table name cannot be null or empty.");
Preconditions.checkArgument(!addColumns.isEmpty(), "Added columns should not be empty.");

String alterSql =
buildAlterAddColumnsSql(databaseName, tableName, addColumns, timeoutSecond);
try {
long startTimeMillis = System.currentTimeMillis();
executeAlter(databaseName, tableName, alterSql, timeoutSecond);
LOG.info(
"Success to add columns to {}.{}, duration: {}ms, sql: {}",
databaseName,
tableName,
System.currentTimeMillis() - startTimeMillis,
alterSql);
} catch (Exception e) {
LOG.error(
"Failed to add columns to {}.{}, sql: {}",
databaseName,
tableName,
alterSql,
e);
throw new StarRocksCatalogException(
String.format("Failed to add columns to %s.%s ", databaseName, tableName), e);
}
}

public void truncateTable(String databaseName, String tableName)
throws StarRocksCatalogException {
checkTableArgument(databaseName, tableName);
Expand Down Expand Up @@ -137,6 +204,80 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu
}
}

private String buildAlterAddColumnsSql(
String databaseName,
String tableName,
List<StarRocksColumn> addColumns,
long timeoutSecond) {
StringBuilder builder = new StringBuilder();
builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName));
String columnsStmt =
addColumns.stream()
.map(col -> "ADD COLUMN " + buildColumnStmt(col))
.collect(Collectors.joining(", "));
builder.append(columnsStmt);
builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond));
builder.append(";");
return builder.toString();
}

private String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) {
StringBuilder builder = new StringBuilder();
builder.append(
String.format(
"CREATE TABLE %s`%s`.`%s`",
ignoreIfExists ? "IF NOT EXISTS " : "",
table.getDatabaseName(),
table.getTableName()));
builder.append(" (\n");
String columnsStmt =
table.getColumns().stream()
.map(this::buildColumnStmt)
.collect(Collectors.joining(",\n"));
builder.append(columnsStmt);
builder.append("\n) ");

Preconditions.checkArgument(
table.getTableType() == StarRocksTable.TableType.PRIMARY_KEY,
"Not support to build create table sql for table type " + table.getTableType());
Preconditions.checkArgument(
table.getTableKeys().isPresent(),
"Can't build create table sql because there is no table keys");
String tableKeys =
table.getTableKeys().get().stream()
.map(key -> "`" + key + "`")
.collect(Collectors.joining(", "));
builder.append(String.format("PRIMARY KEY (%s)\n", tableKeys));

Preconditions.checkArgument(
table.getDistributionKeys().isPresent(),
"Can't build create table sql because there is no distribution keys");
String distributionKeys =
table.getDistributionKeys().get().stream()
.map(key -> "`" + key + "`")
.collect(Collectors.joining(", "));
builder.append(String.format("DISTRIBUTED BY HASH (%s)", distributionKeys));
if (table.getNumBuckets().isPresent()) {
builder.append(" BUCKETS ");
builder.append(table.getNumBuckets().get());
}
if (!table.getProperties().isEmpty()) {
builder.append("\nPROPERTIES (\n");
String properties =
table.getProperties().entrySet().stream()
.map(
entry ->
String.format(
"\"%s\" = \"%s\"",
entry.getKey(), entry.getValue()))
.collect(Collectors.joining(",\n"));
builder.append(properties);
builder.append("\n)");
}
builder.append(";");
return builder.toString();
}

private String buildTruncateTableSql(String databaseName, String tableName) {
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName);
}
Expand Down Expand Up @@ -171,6 +312,26 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException
}
}

private void executeAlter(
String databaseName, String tableName, String alterSql, long timeoutSecond)
throws StarRocksCatalogException {
try {
Method m =
getClass()
.getSuperclass()
.getDeclaredMethod(
"executeAlter",
String.class,
String.class,
String.class,
long.class);
m.setAccessible(true);
m.invoke(this, databaseName, tableName, alterSql, timeoutSecond);
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

private void checkTableArgument(String databaseName, String tableName) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(databaseName),
Expand All @@ -191,15 +352,25 @@ private String buildColumnStmt(StarRocksColumn column) {
builder.append(" ");
builder.append(column.isNullable() ? "NULL" : "NOT NULL");
if (column.getDefaultValue().isPresent()) {
builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
builder.append(
String.format(
" DEFAULT \"%s\"",
escapeForDoubleQuotedSqlString(column.getDefaultValue().get())));
}

if (column.getColumnComment().isPresent()) {
builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
builder.append(
String.format(
" COMMENT \"%s\"",
escapeForDoubleQuotedSqlString(column.getColumnComment().get())));
}
return builder.toString();
}

private String escapeForDoubleQuotedSqlString(String value) {
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}

private String getFullColumnType(
String type, Optional<Integer> columnSize, Optional<Integer> decimalDigits) {
String dataType = type.toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ private List<Event> generateAddColumnEvents(TableId tableId) {
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.column(
new PhysicalColumn(
"name", DataTypes.VARCHAR(17), "\"name\"", "\"\""))
.primaryKey("id")
.build();

Expand All @@ -123,9 +125,16 @@ private List<Event> generateAddColumnEvents(TableId tableId) {
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_decimal",
DataTypes.DECIMAL(17, 0),
null)))));
"extra_decimal", DataTypes.DECIMAL(17, 0), null)))),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"extra_string",
DataTypes.VARCHAR(17),
"\"extra_string\"",
"\"\"")))));
}

private List<Event> generateDropColumnEvents(TableId tableId) {
Expand Down Expand Up @@ -285,10 +294,11 @@ void testStarRocksAddColumn() throws Exception {
Arrays.asList(
"id | int | NO | true | null",
"number | double | YES | false | null",
"name | varchar(51) | YES | false | null",
"name | varchar(51) | YES | false | \"\"",
"extra_date | date | YES | false | null",
"extra_bool | boolean | YES | false | null",
"extra_decimal | decimal(17,0) | YES | false | null");
"extra_decimal | decimal(17,0) | YES | false | null",
"extra_string | varchar(51) | YES | false | \"\"");

assertEqualsInOrder(expected, actual);
}
Expand Down
Loading