Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ pipeline:
<td>sink.connect.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>Integer</td>
<td>与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。</td>
</tr>
<tr>
<td>sink.wait-for-continue.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。</td>
<td>Integer</td>
<td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 600000]。</td>
</tr>
<tr>
<td>sink.buffer-flush.max-bytes</td>
Expand Down Expand Up @@ -174,6 +174,13 @@ pipeline:
<td>Boolean</td>
<td>at-least-once 下是否使用 transaction stream load。</td>
</tr>
<tr>
<td>sink.metric.histogram-window-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>直方图指标的窗口大小。</td>
</tr>
<tr>
<td>sink.properties.*</td>
<td>optional</td>
Expand Down Expand Up @@ -297,6 +304,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME</td>
<td>VARCHAR</td>
<td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为 "HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.&lt;p 位小数&gt;"(例如 p = 3 时为 "HH:mm:ss.SSS")。</td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
Expand Down
16 changes: 14 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ pipeline:
<td>sink.connect.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>Integer</td>
<td>The timeout for establishing HTTP connection. Valid values: 100 to 60000.</td>
</tr>
<tr>
<td>sink.wait-for-continue.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
<td>String</td>
<td>Integer</td>
<td>Timeout in millisecond to wait for 100-continue response from FE http server.
Valid values: 3000 to 600000.</td>
</tr>
Expand Down Expand Up @@ -177,6 +177,13 @@ pipeline:
<td>Boolean</td>
<td>Whether to use transaction stream load for at-least-once when it's available.</td>
</tr>
<tr>
<td>sink.metric.histogram-window-size</td>
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Window size of histogram metrics.</td>
</tr>
<tr>
<td>sink.properties.*</td>
<td>optional</td>
Expand Down Expand Up @@ -306,6 +313,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIME</td>
<td>VARCHAR</td>
<td>StarRocks does not support TIME type, so it is mapped to VARCHAR. TIME values are stored as strings in format "HH:mm:ss" when the precision p = 0, or "HH:mm:ss.&lt;p digits&gt;" when p &gt; 0 (for example, p = 3 uses "HH:mm:ss.SSS").</td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -117,7 +119,8 @@ public class FlussPipelineITCase {
protected TableEnvironment tBatchEnv;

@BeforeEach
void before() {
void before() throws Exception {
waitForFlussClusterReady();
// open a catalog so that we can get table from the catalog
String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers();

Expand All @@ -137,6 +140,27 @@ void before() {
tBatchEnv.useDatabase(DEFAULT_DB);
}

private void waitForFlussClusterReady() throws Exception {
int maxRetries = 30;
int retryIntervalMs = 1000;
Exception lastException = null;

for (int i = 0; i < maxRetries; i++) {
try (Connection connection =
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
// Connection successful, cluster is ready
return;
} catch (Exception e) {
lastException = e;
Thread.sleep(retryIntervalMs);
Comment thread
Hisoka-X marked this conversation as resolved.
}
}

throw new IllegalStateException(
"Failed to connect to Fluss cluster after " + maxRetries + " attempts",
lastException);
}

@AfterEach
void after() {
tBatchEnv.useDatabase(BUILTIN_DATABASE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarCharType;
Expand All @@ -43,6 +44,8 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -132,6 +135,35 @@ public static void toStarRocksDataType(
private static final DateTimeFormatter DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/** Format TIME type data. */
private static final DateTimeFormatter TIME_FORMATTER =
new DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();

private static final DateTimeFormatter[] TIME_FORMATTERS = new DateTimeFormatter[10];

private static DateTimeFormatter timeFormatter(int precision) {
if (precision <= 0) {
return TIME_FORMATTER;
}
if (precision < TIME_FORMATTERS.length) {
DateTimeFormatter formatter = TIME_FORMATTERS[precision];
if (formatter == null) {
formatter =
new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.appendFraction(
ChronoField.NANO_OF_SECOND, precision, precision, true)
.toFormatter();
TIME_FORMATTERS[precision] = formatter;
}
return formatter;
}
return new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.appendFraction(ChronoField.NANO_OF_SECOND, precision, precision, true)
.toFormatter();
}

/**
* Creates an accessor for getting elements in an internal RecordData structure at the given
* position.
Expand Down Expand Up @@ -183,6 +215,13 @@ record ->
fieldGetter =
record -> record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
break;
case TIME_WITHOUT_TIME_ZONE:
fieldGetter =
record ->
record.getTime(fieldPos)
.toLocalTime()
.format(timeFormatter(getPrecision(fieldType)));
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
fieldGetter =
record ->
Expand Down Expand Up @@ -374,6 +413,21 @@ public StarRocksColumn.Builder visit(DateType dateType) {
return builder;
}

@Override
public StarRocksColumn.Builder visit(TimeType timeType) {
// StarRocks does not support TIME type, so map it to VARCHAR.
// Format: HH:mm:ss for precision 0, HH:mm:ss.<p digits> for precision > 0
// Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 + precision
// For precision 0: "HH:mm:ss" = 8 characters
// For precision > 0: "HH:mm:ss." + precision digits
builder.setDataType(VARCHAR);
Comment thread
Hisoka-X marked this conversation as resolved.
builder.setNullable(timeType.isNullable());
int precision = timeType.getPrecision();
int length = precision > 0 ? 8 + 1 + precision : 8;
builder.setColumnSize(length);
return builder;
}

@Override
public StarRocksColumn.Builder visit(TimestampType timestampType) {
builder.setDataType(DATETIME);
Expand Down Expand Up @@ -404,7 +458,8 @@ public static String convertInvalidTimestampDefaultValue(
|| dataType instanceof org.apache.flink.cdc.common.types.TimestampType
|| dataType instanceof org.apache.flink.cdc.common.types.ZonedTimestampType) {

if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)
|| defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
return DEFAULT_DATETIME;
}
}
Expand Down
Loading
Loading