From b4bec2ecffd044863ccb389122c4a32b8c1d8091 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 16 Jan 2026 17:40:39 +0800 Subject: [PATCH 01/14] add mysql pg doc --- .../streaming-job-multi-table.md | 289 +++++++++++++++++ .../streaming-job-tvf.md} | 0 .../job/CREATE-STREAMING-JOB.md | 139 +++++++-- .../streaming-job-multi-table.md | 293 ++++++++++++++++++ .../streaming-job/streaming-job-tvf.md} | 18 +- .../job/CREATE-STREAMING-JOB.md | 100 +++++- .../streaming-job-multi-table.md | 293 ++++++++++++++++++ .../streaming-job/streaming-job-tvf.md} | 0 .../job/CREATE-STREAMING-JOB.md | 98 +++++- sidebars.ts | 9 +- .../streaming-job-multi-table.md | 289 +++++++++++++++++ .../streaming-job-tvf.md} | 0 versioned_sidebars/version-4.x-sidebars.json | 9 +- 13 files changed, 1487 insertions(+), 50 deletions(-) create mode 100644 docs/data-operate/import/streaming-job/streaming-job-multi-table.md rename docs/data-operate/import/{streaming-job.md => streaming-job/streaming-job-tvf.md} (100%) create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md rename i18n/zh-CN/docusaurus-plugin-content-docs/{version-4.x/data-operate/import/streaming-job.md => current/data-operate/import/streaming-job/streaming-job-tvf.md} (93%) create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md rename i18n/zh-CN/docusaurus-plugin-content-docs/{current/data-operate/import/streaming-job.md => version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md} (100%) create mode 100644 versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md rename versioned_docs/version-4.x/data-operate/import/{streaming-job.md => streaming-job/streaming-job-tvf.md} (100%) diff --git a/docs/data-operate/import/streaming-job/streaming-job-multi-table.md b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..5c4978546f540 --- /dev/null +++ b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,289 @@ +--- +{ + "title": "Multi-table Continuous Import", + "language": "en", + "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." +} +--- + +## Overview + +Supports using Job to continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris via Streaming Job. Suitable for scenarios requiring real-time multi-table data synchronization to Doris. + +## Supported Data Sources + +- MySQL +- Postgres + +## Basic Principles + +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL, Postgres, etc., enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. + +**Notes:** + +1. Currently only at-least-once semantics are guaranteed. +2. Only primary key tables are supported for synchronization. +3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required. + +## Quick Start + +### Prerequisites + +#### MySQL +Enable Binlog on MySQL by adding the following to my.cnf: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +Enable logical replication on Postgres by adding the following to postgresql.conf: +```ini +wal_level=logical +``` + +### Creating an Import Job + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### Check Import Status + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### Pause Import Job + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### Resume Import Job + +```sql +RESUME JOB where jobName = ; +``` + +### Modify Import Job + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### Delete Import Job + +```sql +DROP JOB where jobName = ; +``` + +## Reference Manual + +### Import Command + +Syntax for creating a multi-table synchronization job: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| Module | Description | +| ------------------ | --------------------------- | +| job_name | Job name | +| job_properties | General import parameters | +| comment | Job comment | +| source_properties | Source (MySQL/PG) parameters| +| target_properties | Doris target DB parameters | + +### Import Parameters + +#### FE Configuration Parameters + +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| + +#### General Job Import Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| max_interval | 10s | Idle scheduling interval when no new data | + +#### Source Configuration Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables| - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental, latest: incremental only | + +#### Doris Target DB Parameters + +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | + +### Import Status + +#### Job + +After submitting a job, you can run the following SQL to check the job status: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| ID | Job ID | +| NAME | Job name | +| Definer | Job definer | +| ExecuteType | Job type: ONE_TIME/RECURRING/STREAMING/MANUAL| +| RecurringStrategy | Recurring strategy, empty for Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Current offset, only for Streaming jobs | +| EndOffset | Max end offset from source, only for Streaming| +| LoadStatistic | Job statistics | +| ErrorMsg | Job error message | +| JobRuntimeMsg | Job runtime info | + +#### Task + +You can run the following SQL to check the status of each Task: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| TaskId | Task ID | +| JobID | Job ID | +| JobName | Job name | +| Label | Task label | +| Status | Task status | +| ErrorMsg | Task error message | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task finish time | +| LoadStatistic | Task statistics | +| User | Task executor | +| RunningOffset | Current offset, only for Streaming jobs | \ No newline at end of file diff --git a/docs/data-operate/import/streaming-job.md b/docs/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 100% rename from docs/data-operate/import/streaming-job.md rename to docs/data-operate/import/streaming-job/streaming-job-tvf.md diff --git a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 09a951f53bf2e..050c1aef68234 100644 --- a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,55 +2,97 @@ { "title": "CREATE STREAMING JOB", "language": "en", - "description": "Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted," + "description": "Doris Streaming Job is a continuous import task based on the Job approach. After the Job is submitted, Doris continuously runs the import job, querying TVF or upstream data sources in real time and writing the data into Doris tables." } --- ## Description -Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted, Doris will continuously run the import job, querying the data in TVF and writing it into the Doris table in real time. +Doris Streaming Job is a continuous import task based on the Job approach. After the Job is submitted, Doris continuously runs the import job, querying TVF or upstream data sources in real time and writing the data into Doris tables. ## Syntax - ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + [ , ... ]) + ] +) ``` -## Required parameters +## Required Parameters **1. ``** -> The job name is used to uniquely identify an event within a database. The job name must be globally unique; an error will occur if a job with the same name already exists. +> The job name, which uniquely identifies an event in a database. The job name must be globally unique; if a job with the same name already exists, an error will be reported. + +**2. ``** +> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, only S3 TVF is supported. + +**3. ``** +> Supported data sources: currently only MySQL and Postgres. + +**4. ``** +| Parameter | Default | Description | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables | - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental sync, latest: incremental only | + +**5. ``** +> Doris target database name to import into. -**3. ``** -> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, it only supports S3 TVF. +**6. ``** +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | -## Optional parameters -**1. ``** -| Parameters | Default Values ​​| Description | -| ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties | -| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | -| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | -| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. -## Access Control Requirements +## Optional Parameters + +**1. ``** +| Parameter | Default | Description | +| ------------------ | ------- | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | +| max_interval | 10s | Idle scheduling interval when there are no new files or data upstream | + +## Privilege Control The user executing this SQL command must have at least the following privileges: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| LOAD_PRIV | Database (DB) | Currently, only the **LOAD** privilege is supported to perform this operation | -## Usage Notes +| Privilege | Object | Notes | +|:------------|:------------|:--------------------------------------| +| LOAD_PRIV | Database | Currently, only the **LOAD** privilege is supported for this operation | -- The TASK only retains the latest 100 records. -- Currently, only the **INSERT internal table Select * From S3(...)** operation is supported; more operations will be supported in the future. +## Notes + +- TASK only retains the latest 100 records. +- Currently, Insert_Command only supports **INSERT internal table Select * From S3(...)**; more operations will be supported in the future. ## Examples @@ -73,12 +115,53 @@ The user executing this SQL command must have at least the following privileges: ); ``` +- Create a job named multi_table_sync to synchronize user_info and order_info tables from MySQL upstream to the target_test_db database from the beginning. + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- Create a job named test_postgres_job to continuously synchronize incremental data from the test_tbls table in Postgres upstream to the target_test_db database. + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** -| Parameters | Default Values ​​| | -| ------------------------------------ | ------ | ------------------------------------------- | -| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | -| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | -| max_streaming_task_show_count | 100 | Maximum number of task execution records that a StreamingTask keeps in memory | \ No newline at end of file +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..3075733a2b2a2 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,293 @@ +--- +{ + "title": "多表持续导入", + "language": "zh-CN", + "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL、Postgres 等多张表的全量和增量数据持续同步到 Doris 中。" +} +--- + +## 概述 + +支持通过 Job 将 MySQL、Postgres 等数据库的多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步多表数据到 Doris 的场景。 + +## 支持的数据源 + +- MySQL +- Postgres + +## 基本原理 + +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL、Postgres 等数据库读取变更日志,实现多表的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 + +**注意事项:** + +1. 目前只能保证 at-least-once 语义。 +2. 目前只支持主键表同步。 +3. 需要 Load 权限,若下游表不存在还需有 Create 权限。 + +## 快速上手 + +### 前提条件 + +#### MySQL +需要在 MySQL 端开启 Binlog,即 my.cnf 配置文件中增加: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +需要在 Postgres 端配置逻辑复制,即 postgresql.conf 增加: +```ini +wal_level=logical +``` + +### 创建导入作业 + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### 查看导入状态 + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### 暂停导入作业 + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```sql +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### 删除导入作业 + +```sql +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个多表同步作业语法如下: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| 模块 | 说明 | +| ------------------ | ------------------------- | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| source_properties | 源端(MySQL/PG 等)相关参数 | +| target_properties | Doris 目标库相关参数 | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------------------------ | ------ | -------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### Job 通用导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------ | ------ | -------------------------------------- | +| max_interval | 10s | 当上游没有新增数据时,空闲的调度间隔。 | + +#### 数据源配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +#### Doris 目标库端配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行如下 SQL 查看 Job 当前状态: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行如下 SQL 查看每次 Task 的运行状态: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + + + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Task 导入 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 93% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md index e77f5b09f0308..091e18405a32b 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "持续导入", + "title": "TVF 持续导入", "language": "zh-CN", "description": "Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" } @@ -8,17 +8,19 @@ ## 概述 -Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 +Doris 可以通过 Job 创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询数据源中的数据写入到 Doris 表中。 -## 支持的 TVF +## 支持的数据源 -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +- [S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +- MySQL +- Postgres ## 基本原理 ### S3 -遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 +通过 S3 TVF 遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 **增量读取方式** @@ -26,10 +28,16 @@ Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提 注意:新文件的名称必须按字典序大于上一次已导入的文件名,否则 Doris 不会将其作为新文件处理。比如,文件命名为 file1、file2、file3 时会按顺序导入;如果随后新增一个 file0,由于它在字典序上小于最后已导入的文件 file3,Doris 将不会导入该文件。 +### MySQL / Postgres + +集成了 Flink CDC 摄入数据的能力,可以将 MySQL 和 Postgres 里面的数据全量和增量数据同步到 Doris 中。 + + ## 快速上手 ### 创建导入作业 +#### S3 假设 S3 的目录下,会定期的产生以 CSV 结尾的文件。此时可以创建 Job ```SQL diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 033c4ff1fb1bb..24ddd922d4680 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,13 +2,13 @@ { "title": "CREATE STREAMING JOB", "language": "zh-CN", - "description": "Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" + "description": "Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。" } --- ## 描述 -Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。。 ## 语法 @@ -16,9 +16,26 @@ Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任 ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + -- Other属性 + [ , ... ]) + ] +) ``` @@ -27,15 +44,41 @@ DO **1. ``** > 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 -**3. ``** -> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF +**2. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF。 + +**3. ``** +> 支持的数据源,目前只支持 MySQL 和 Postgres。 + +**4. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +**5. ``** +> 需要导入的 Doris 目标库名称。 + +**6. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num + + ## 可选参数 -**1. ``** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | @@ -52,7 +95,7 @@ DO - TASK 只保留最新的 100 条记录。 -- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 +- 目前 Insert_Command 仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 ## 示例 @@ -75,6 +118,47 @@ DO ); ``` +- 创建一个名为 my_job 的作业,从头开始同步 MySQL 上游的 user_info,order_info 表的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- 创建一个名为 my_job 的作业,持续同步 Postgrees 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..3075733a2b2a2 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,293 @@ +--- +{ + "title": "多表持续导入", + "language": "zh-CN", + "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL、Postgres 等多张表的全量和增量数据持续同步到 Doris 中。" +} +--- + +## 概述 + +支持通过 Job 将 MySQL、Postgres 等数据库的多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步多表数据到 Doris 的场景。 + +## 支持的数据源 + +- MySQL +- Postgres + +## 基本原理 + +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL、Postgres 等数据库读取变更日志,实现多表的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 + +**注意事项:** + +1. 目前只能保证 at-least-once 语义。 +2. 目前只支持主键表同步。 +3. 需要 Load 权限,若下游表不存在还需有 Create 权限。 + +## 快速上手 + +### 前提条件 + +#### MySQL +需要在 MySQL 端开启 Binlog,即 my.cnf 配置文件中增加: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +需要在 Postgres 端配置逻辑复制,即 postgresql.conf 增加: +```ini +wal_level=logical +``` + +### 创建导入作业 + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### 查看导入状态 + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### 暂停导入作业 + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```sql +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### 删除导入作业 + +```sql +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个多表同步作业语法如下: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| 模块 | 说明 | +| ------------------ | ------------------------- | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| source_properties | 源端(MySQL/PG 等)相关参数 | +| target_properties | Doris 目标库相关参数 | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------------------------ | ------ | -------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### Job 通用导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------ | ------ | -------------------------------------- | +| max_interval | 10s | 当上游没有新增数据时,空闲的调度间隔。 | + +#### 数据源配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +#### Doris 目标库端配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行如下 SQL 查看 Job 当前状态: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行如下 SQL 查看每次 Task 的运行状态: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + + + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Task 导入 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 100% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 37b21d2b0e6e9..24ddd922d4680 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,13 +2,13 @@ { "title": "CREATE STREAMING JOB", "language": "zh-CN", - "description": "Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" + "description": "Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。" } --- ## 描述 -Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。。 ## 语法 @@ -16,9 +16,26 @@ Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任 ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + -- Other属性 + [ , ... ]) + ] +) ``` @@ -27,12 +44,38 @@ DO **1. ``** > 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 -**3. ``** -> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF +**2. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF。 + +**3. ``** +> 支持的数据源,目前只支持 MySQL 和 Postgres。 + +**4. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +**5. ``** +> 需要导入的 Doris 目标库名称。 + +**6. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num + + ## 可选参数 -**1. ``** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | @@ -52,7 +95,7 @@ DO - TASK 只保留最新的 100 条记录。 -- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 +- 目前 Insert_Command 仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 ## 示例 @@ -75,6 +118,47 @@ DO ); ``` +- 创建一个名为 my_job 的作业,从头开始同步 MySQL 上游的 user_info,order_info 表的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- 创建一个名为 my_job 的作业,持续同步 Postgrees 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** diff --git a/sidebars.ts b/sidebars.ts index 7b1387f77d694..2b51aaa48c36d 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -224,7 +224,14 @@ const sidebars: SidebarsConfig = { 'data-operate/import/load-internals/routine-load-internals', ], }, - "data-operate/import/streaming-job" + { + type: 'category', + label: 'Continuous Load', + items: [ + 'data-operate/import/streaming-job/streaming-job-tvf', + 'data-operate/import/streaming-job/streaming-job-multi-table', + ], + } ], }, { diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..5c4978546f540 --- /dev/null +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,289 @@ +--- +{ + "title": "Multi-table Continuous Import", + "language": "en", + "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." +} +--- + +## Overview + +Supports using Job to continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris via Streaming Job. Suitable for scenarios requiring real-time multi-table data synchronization to Doris. + +## Supported Data Sources + +- MySQL +- Postgres + +## Basic Principles + +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL, Postgres, etc., enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. + +**Notes:** + +1. Currently only at-least-once semantics are guaranteed. +2. Only primary key tables are supported for synchronization. +3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required. + +## Quick Start + +### Prerequisites + +#### MySQL +Enable Binlog on MySQL by adding the following to my.cnf: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +Enable logical replication on Postgres by adding the following to postgresql.conf: +```ini +wal_level=logical +``` + +### Creating an Import Job + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### Check Import Status + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### Pause Import Job + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### Resume Import Job + +```sql +RESUME JOB where jobName = ; +``` + +### Modify Import Job + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### Delete Import Job + +```sql +DROP JOB where jobName = ; +``` + +## Reference Manual + +### Import Command + +Syntax for creating a multi-table synchronization job: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| Module | Description | +| ------------------ | --------------------------- | +| job_name | Job name | +| job_properties | General import parameters | +| comment | Job comment | +| source_properties | Source (MySQL/PG) parameters| +| target_properties | Doris target DB parameters | + +### Import Parameters + +#### FE Configuration Parameters + +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| + +#### General Job Import Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| max_interval | 10s | Idle scheduling interval when no new data | + +#### Source Configuration Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables| - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental, latest: incremental only | + +#### Doris Target DB Parameters + +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | + +### Import Status + +#### Job + +After submitting a job, you can run the following SQL to check the job status: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| ID | Job ID | +| NAME | Job name | +| Definer | Job definer | +| ExecuteType | Job type: ONE_TIME/RECURRING/STREAMING/MANUAL| +| RecurringStrategy | Recurring strategy, empty for Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Current offset, only for Streaming jobs | +| EndOffset | Max end offset from source, only for Streaming| +| LoadStatistic | Job statistics | +| ErrorMsg | Job error message | +| JobRuntimeMsg | Job runtime info | + +#### Task + +You can run the following SQL to check the status of each Task: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| TaskId | Task ID | +| JobID | Job ID | +| JobName | Job name | +| Label | Task label | +| Status | Task status | +| ErrorMsg | Task error message | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task finish time | +| LoadStatistic | Task statistics | +| User | Task executor | +| RunningOffset | Current offset, only for Streaming jobs | \ No newline at end of file diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 100% rename from versioned_docs/version-4.x/data-operate/import/streaming-job.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index ee402b0ae9aee..ac3c418e4761f 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -229,7 +229,14 @@ "data-operate/import/load-internals/routine-load-internals" ] }, - "data-operate/import/streaming-job" + { + "type": "category", + "label": "Continuous Load", + "items": [ + "data-operate/import/streaming-job/streaming-job-tvf", + "data-operate/import/streaming-job/streaming-job-multi-table", + ] + } ] }, { From 6b6821542c71963e02ed19b2bfc6db601c603ab3 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 20 Jan 2026 11:58:10 +0800 Subject: [PATCH 02/14] fix --- .../import/streaming-job/streaming-job-multi-table.md | 2 +- docs/data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- .../data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- .../import/streaming-job/streaming-job-multi-table.md | 2 +- .../data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- versioned_sidebars/version-4.x-sidebars.json | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/data-operate/import/streaming-job/streaming-job-multi-table.md b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md index 5c4978546f540..f97f6b86d57b3 100644 --- a/docs/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -1,6 +1,6 @@ --- { - "title": "Multi-table Continuous Import", + "title": "Multi-table Continuous Load", "language": "en", "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." } diff --git a/docs/data-operate/import/streaming-job/streaming-job-tvf.md b/docs/data-operate/import/streaming-job/streaming-job-tvf.md index 3ad9f085c81eb..a528f285d5e99 100644 --- a/docs/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/docs/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "Continuous Load", + "title": "TVF Continuous Load", "language": "en", "description": "Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job," } diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index e77f5b09f0308..0379b34a28372 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "持续导入", + "title": "TVF 持续导入", "language": "zh-CN", "description": "Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" } diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md index 5c4978546f540..f97f6b86d57b3 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -1,6 +1,6 @@ --- { - "title": "Multi-table Continuous Import", + "title": "Multi-table Continuous Load", "language": "en", "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." } diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index 3ad9f085c81eb..a528f285d5e99 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "Continuous Load", + "title": "TVF Continuous Load", "language": "en", "description": "Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job," } diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index ac3c418e4761f..547471107ef83 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -234,7 +234,7 @@ "label": "Continuous Load", "items": [ "data-operate/import/streaming-job/streaming-job-tvf", - "data-operate/import/streaming-job/streaming-job-multi-table", + "data-operate/import/streaming-job/streaming-job-multi-table" ] } ] From 513662b0e9b83141e2a340dce5e4f6dd5d2bb0a0 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 20 Jan 2026 14:09:47 +0800 Subject: [PATCH 03/14] fix --- .../import/streaming-job/streaming-job-tvf.md | 2 +- .../import/streaming-job/streaming-job-tvf.md | 16 ++++------------ .../import/streaming-job/streaming-job-tvf.md | 2 +- .../import/streaming-job/streaming-job-tvf.md | 2 +- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/docs/data-operate/import/streaming-job/streaming-job-tvf.md b/docs/data-operate/import/streaming-job/streaming-job-tvf.md index a528f285d5e99..28314353e2bec 100644 --- a/docs/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/docs/data-operate/import/streaming-job/streaming-job-tvf.md @@ -12,7 +12,7 @@ Doris allows you to create a continuous import task using a Job + TVF approach. ## Supported TVFs -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## Basic Principles diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md index 091e18405a32b..f711895b57cc2 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md @@ -8,19 +8,17 @@ ## 概述 -Doris 可以通过 Job 创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询数据源中的数据写入到 Doris 表中。 +Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 -## 支持的数据源 +## 支持的 TVF -- [S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF -- MySQL -- Postgres +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## 基本原理 ### S3 -通过 S3 TVF 遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 +遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 **增量读取方式** @@ -28,16 +26,10 @@ Doris 可以通过 Job 创建一个持续导入任务。在提交 Job 作业后 注意:新文件的名称必须按字典序大于上一次已导入的文件名,否则 Doris 不会将其作为新文件处理。比如,文件命名为 file1、file2、file3 时会按顺序导入;如果随后新增一个 file0,由于它在字典序上小于最后已导入的文件 file3,Doris 将不会导入该文件。 -### MySQL / Postgres - -集成了 Flink CDC 摄入数据的能力,可以将 MySQL 和 Postgres 里面的数据全量和增量数据同步到 Doris 中。 - - ## 快速上手 ### 创建导入作业 -#### S3 假设 S3 的目录下,会定期的产生以 CSV 结尾的文件。此时可以创建 Job ```SQL diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index 0379b34a28372..f711895b57cc2 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -12,7 +12,7 @@ Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提 ## 支持的 TVF -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## 基本原理 diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index a528f285d5e99..28314353e2bec 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -12,7 +12,7 @@ Doris allows you to create a continuous import task using a Job + TVF approach. ## Supported TVFs -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## Basic Principles From e850e0e1c9ecb6367c382a79f7e87fae2524a74d Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 20 Jan 2026 14:12:09 +0800 Subject: [PATCH 04/14] fix deadline link --- docs/data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- .../data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- .../data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- .../data-operate/import/streaming-job/streaming-job-tvf.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/data-operate/import/streaming-job/streaming-job-tvf.md b/docs/data-operate/import/streaming-job/streaming-job-tvf.md index 28314353e2bec..4948f23c44df0 100644 --- a/docs/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/docs/data-operate/import/streaming-job/streaming-job-tvf.md @@ -156,7 +156,7 @@ The module description is as follows: | Parameter | Default Value | Description | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | | s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | | s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | | max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md index f711895b57cc2..6bc6b3c98a6a8 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md @@ -155,7 +155,7 @@ DO | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index f711895b57cc2..6bc6b3c98a6a8 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -155,7 +155,7 @@ DO | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index 28314353e2bec..4948f23c44df0 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -156,7 +156,7 @@ The module description is as follows: | Parameter | Default Value | Description | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | | s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | | s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | | max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | From b112e68c76b1a894888892af8ac21129a6958e04 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:22:00 +0800 Subject: [PATCH 05/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 24ddd922d4680..ae8d8619fbf45 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -8,7 +8,7 @@ ## 描述 -Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。 ## 语法 From 52a78a4174d4a7138d049ef3bb15fb438b9b8ea8 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:22:12 +0800 Subject: [PATCH 06/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 24ddd922d4680..6b9f58f3c88aa 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -138,7 +138,7 @@ DO ) ``` -- 创建一个名为 my_job 的作业,持续同步 Postgrees 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 +- 创建一个名为 my_job 的作业,持续同步 Postgres 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 ```sql CREATE JOB test_postgres_job From 4ae52174a2b31263628aad97b203122573919a84 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:22:19 +0800 Subject: [PATCH 07/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index ae8d8619fbf45..ff00e4a7b1567 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -138,7 +138,7 @@ DO ) ``` -- 创建一个名为 my_job 的作业,持续同步 Postgrees 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 +- 创建一个名为 my_job 的作业,持续同步 Postgres 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 ```sql CREATE JOB test_postgres_job From 7569b3083d071b1b1ecc780327cff3a857fdef72 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:23:13 +0800 Subject: [PATCH 08/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index ff00e4a7b1567..caea0b8c191b6 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -69,7 +69,7 @@ DO **6. ``** | 参数 | 默认值 | 说明 | | -------------- | ------- | ------------------------------------------------------------ | -| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | From 369e158d4e0645c9fd679904152e05e5dad2d86e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:23:32 +0800 Subject: [PATCH 09/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../import/streaming-job/streaming-job-multi-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md index 3075733a2b2a2..19f04ccb034dc 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -216,7 +216,7 @@ select * from jobs(type=insert) where ExecuteType = "STREAMING" ExecuteType: STREAMING RecurringStrategy: \N Status: RUNNING - ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') CreateTime: 2025-12-10 10:19:35 SucceedTaskCount: 2 FailedTaskCount: 0 From 459d2a8dd172aeadd51756325703767665c59d61 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:23:40 +0800 Subject: [PATCH 10/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 6b9f58f3c88aa..9bb7816e1f890 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -69,7 +69,7 @@ DO **6. ``** | 参数 | 默认值 | 说明 | | -------------- | ------- | ------------------------------------------------------------ | -| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | From f2c119436ab66f6719c1cd1a54041e15df8522f4 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:23:49 +0800 Subject: [PATCH 11/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 9bb7816e1f890..0727dbeaf535e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -8,7 +8,7 @@ ## 描述 -Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。 ## 语法 From 861f72544e3992eeed4b965f2a8b5b6ae05f9fe9 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:23:54 +0800 Subject: [PATCH 12/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../import/streaming-job/streaming-job-multi-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md index 19f04ccb034dc..d5f00102616fa 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -96,7 +96,7 @@ select * from jobs(type=insert) where ExecuteType = "STREAMING" ExecuteType: STREAMING RecurringStrategy: \N Status: RUNNING - ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') CreateTime: 2025-12-10 10:19:35 SucceedTaskCount: 1 FailedTaskCount: 0 From 6c96141924e60ba4fd6f59151d124579df14ff80 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:24:03 +0800 Subject: [PATCH 13/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../import/streaming-job/streaming-job-multi-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md index 3075733a2b2a2..e6100ad0aba45 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -96,7 +96,7 @@ select * from jobs(type=insert) where ExecuteType = "STREAMING" ExecuteType: STREAMING RecurringStrategy: \N Status: RUNNING - ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') CreateTime: 2025-12-10 10:19:35 SucceedTaskCount: 1 FailedTaskCount: 0 From 87774f7df2649ccddeff4f5470b00fb0156dbc21 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 22 Jan 2026 11:24:09 +0800 Subject: [PATCH 14/14] Update i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../import/streaming-job/streaming-job-multi-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md index e6100ad0aba45..d5f00102616fa 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -216,7 +216,7 @@ select * from jobs(type=insert) where ExecuteType = "STREAMING" ExecuteType: STREAMING RecurringStrategy: \N Status: RUNNING - ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABSE target_test_db ('table.create.properties.replication_num'='1') + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') CreateTime: 2025-12-10 10:19:35 SucceedTaskCount: 2 FailedTaskCount: 0