diff --git a/docs/data-operate/import/streaming-job/streaming-job-multi-table.md b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..f97f6b86d57b3 --- /dev/null +++ b/docs/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,289 @@ +--- +{ + "title": "Multi-table Continuous Load", + "language": "en", + "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." +} +--- + +## Overview + +Supports using Job to continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris via Streaming Job. Suitable for scenarios requiring real-time multi-table data synchronization to Doris. + +## Supported Data Sources + +- MySQL +- Postgres + +## Basic Principles + +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL, Postgres, etc., enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. + +**Notes:** + +1. Currently only at-least-once semantics are guaranteed. +2. Only primary key tables are supported for synchronization. +3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required. + +## Quick Start + +### Prerequisites + +#### MySQL +Enable Binlog on MySQL by adding the following to my.cnf: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +Enable logical replication on Postgres by adding the following to postgresql.conf: +```ini +wal_level=logical +``` + +### Creating an Import Job + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### Check Import Status + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### Pause Import Job + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### Resume Import Job + +```sql +RESUME JOB where jobName = ; +``` + +### Modify Import Job + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### Delete Import Job + +```sql +DROP JOB where jobName = ; +``` + +## Reference Manual + +### Import Command + +Syntax for creating a multi-table synchronization job: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| Module | Description | +| ------------------ | --------------------------- | +| job_name | Job name | +| job_properties | General import parameters | +| comment | Job comment | +| source_properties | Source (MySQL/PG) parameters| +| target_properties | Doris target DB parameters | + +### Import Parameters + +#### FE Configuration Parameters + +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| + +#### General Job Import Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| max_interval | 10s | Idle scheduling interval when no new data | + +#### Source Configuration Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables| - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental, latest: incremental only | + +#### Doris Target DB Parameters + +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | + +### Import Status + +#### Job + +After submitting a job, you can run the following SQL to check the job status: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| ID | Job ID | +| NAME | Job name | +| Definer | Job definer | +| ExecuteType | Job type: ONE_TIME/RECURRING/STREAMING/MANUAL| +| RecurringStrategy | Recurring strategy, empty for Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Current offset, only for Streaming jobs | +| EndOffset | Max end offset from source, only for Streaming| +| LoadStatistic | Job statistics | +| ErrorMsg | Job error message | +| JobRuntimeMsg | Job runtime info | + +#### Task + +You can run the following SQL to check the status of each Task: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| TaskId | Task ID | +| JobID | Job ID | +| JobName | Job name | +| Label | Task label | +| Status | Task status | +| ErrorMsg | Task error message | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task finish time | +| LoadStatistic | Task statistics | +| User | Task executor | +| RunningOffset | Current offset, only for Streaming jobs | \ No newline at end of file diff --git a/docs/data-operate/import/streaming-job.md b/docs/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 97% rename from docs/data-operate/import/streaming-job.md rename to docs/data-operate/import/streaming-job/streaming-job-tvf.md index 3ad9f085c81eb..4948f23c44df0 100644 --- a/docs/data-operate/import/streaming-job.md +++ b/docs/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "Continuous Load", + "title": "TVF Continuous Load", "language": "en", "description": "Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job," } @@ -12,7 +12,7 @@ Doris allows you to create a continuous import task using a Job + TVF approach. ## Supported TVFs -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## Basic Principles @@ -156,7 +156,7 @@ The module description is as follows: | Parameter | Default Value | Description | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | | s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | | s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | | max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | diff --git a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 09a951f53bf2e..050c1aef68234 100644 --- a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,55 +2,97 @@ { "title": "CREATE STREAMING JOB", "language": "en", - "description": "Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted," + "description": "Doris Streaming Job is a continuous import task based on the Job approach. After the Job is submitted, Doris continuously runs the import job, querying TVF or upstream data sources in real time and writing the data into Doris tables." } --- ## Description -Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted, Doris will continuously run the import job, querying the data in TVF and writing it into the Doris table in real time. +Doris Streaming Job is a continuous import task based on the Job approach. After the Job is submitted, Doris continuously runs the import job, querying TVF or upstream data sources in real time and writing the data into Doris tables. ## Syntax - ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + [ , ... ]) + ] +) ``` -## Required parameters +## Required Parameters **1. ``** -> The job name is used to uniquely identify an event within a database. The job name must be globally unique; an error will occur if a job with the same name already exists. +> The job name, which uniquely identifies an event in a database. The job name must be globally unique; if a job with the same name already exists, an error will be reported. + +**2. ``** +> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, only S3 TVF is supported. + +**3. ``** +> Supported data sources: currently only MySQL and Postgres. + +**4. ``** +| Parameter | Default | Description | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables | - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental sync, latest: incremental only | + +**5. ``** +> Doris target database name to import into. -**3. ``** -> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, it only supports S3 TVF. +**6. ``** +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | -## Optional parameters -**1. ``** -| Parameters | Default Values ​​| Description | -| ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties | -| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | -| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | -| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. -## Access Control Requirements +## Optional Parameters + +**1. ``** +| Parameter | Default | Description | +| ------------------ | ------- | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | +| max_interval | 10s | Idle scheduling interval when there are no new files or data upstream | + +## Privilege Control The user executing this SQL command must have at least the following privileges: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| LOAD_PRIV | Database (DB) | Currently, only the **LOAD** privilege is supported to perform this operation | -## Usage Notes +| Privilege | Object | Notes | +|:------------|:------------|:--------------------------------------| +| LOAD_PRIV | Database | Currently, only the **LOAD** privilege is supported for this operation | -- The TASK only retains the latest 100 records. -- Currently, only the **INSERT internal table Select * From S3(...)** operation is supported; more operations will be supported in the future. +## Notes + +- TASK only retains the latest 100 records. +- Currently, Insert_Command only supports **INSERT internal table Select * From S3(...)**; more operations will be supported in the future. ## Examples @@ -73,12 +115,53 @@ The user executing this SQL command must have at least the following privileges: ); ``` +- Create a job named multi_table_sync to synchronize user_info and order_info tables from MySQL upstream to the target_test_db database from the beginning. + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- Create a job named test_postgres_job to continuously synchronize incremental data from the test_tbls table in Postgres upstream to the target_test_db database. + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** -| Parameters | Default Values ​​| | -| ------------------------------------ | ------ | ------------------------------------------- | -| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | -| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | -| max_streaming_task_show_count | 100 | Maximum number of task execution records that a StreamingTask keeps in memory | \ No newline at end of file +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..d5f00102616fa --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,293 @@ +--- +{ + "title": "多表持续导入", + "language": "zh-CN", + "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL、Postgres 等多张表的全量和增量数据持续同步到 Doris 中。" +} +--- + +## 概述 + +支持通过 Job 将 MySQL、Postgres 等数据库的多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步多表数据到 Doris 的场景。 + +## 支持的数据源 + +- MySQL +- Postgres + +## 基本原理 + +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL、Postgres 等数据库读取变更日志,实现多表的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 + +**注意事项:** + +1. 目前只能保证 at-least-once 语义。 +2. 目前只支持主键表同步。 +3. 需要 Load 权限,若下游表不存在还需有 Create 权限。 + +## 快速上手 + +### 前提条件 + +#### MySQL +需要在 MySQL 端开启 Binlog,即 my.cnf 配置文件中增加: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +需要在 Postgres 端配置逻辑复制,即 postgresql.conf 增加: +```ini +wal_level=logical +``` + +### 创建导入作业 + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### 查看导入状态 + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### 暂停导入作业 + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```sql +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### 删除导入作业 + +```sql +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个多表同步作业语法如下: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| 模块 | 说明 | +| ------------------ | ------------------------- | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| source_properties | 源端(MySQL/PG 等)相关参数 | +| target_properties | Doris 目标库相关参数 | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------------------------ | ------ | -------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### Job 通用导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------ | ------ | -------------------------------------- | +| max_interval | 10s | 当上游没有新增数据时,空闲的调度间隔。 | + +#### 数据源配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +#### Doris 目标库端配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行如下 SQL 查看 Job 当前状态: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行如下 SQL 查看每次 Task 的运行状态: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + + + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Task 导入 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 98% rename from i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md index e77f5b09f0308..6bc6b3c98a6a8 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "持续导入", + "title": "TVF 持续导入", "language": "zh-CN", "description": "Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" } @@ -12,7 +12,7 @@ Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提 ## 支持的 TVF -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## 基本原理 @@ -155,7 +155,7 @@ DO | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 033c4ff1fb1bb..0727dbeaf535e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,13 +2,13 @@ { "title": "CREATE STREAMING JOB", "language": "zh-CN", - "description": "Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" + "description": "Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。" } --- ## 描述 -Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。 ## 语法 @@ -16,9 +16,26 @@ Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任 ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + -- Other属性 + [ , ... ]) + ] +) ``` @@ -27,15 +44,41 @@ DO **1. ``** > 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 -**3. ``** -> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF +**2. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF。 + +**3. ``** +> 支持的数据源,目前只支持 MySQL 和 Postgres。 + +**4. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +**5. ``** +> 需要导入的 Doris 目标库名称。 + +**6. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + + ## 可选参数 -**1. ``** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | @@ -52,7 +95,7 @@ DO - TASK 只保留最新的 100 条记录。 -- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 +- 目前 Insert_Command 仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 ## 示例 @@ -75,6 +118,47 @@ DO ); ``` +- 创建一个名为 my_job 的作业,从头开始同步 MySQL 上游的 user_info,order_info 表的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- 创建一个名为 my_job 的作业,持续同步 Postgres 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..d5f00102616fa --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,293 @@ +--- +{ + "title": "多表持续导入", + "language": "zh-CN", + "description": "Doris 可以通过 Streaming Job 的方式,将 MySQL、Postgres 等多张表的全量和增量数据持续同步到 Doris 中。" +} +--- + +## 概述 + +支持通过 Job 将 MySQL、Postgres 等数据库的多张表的全量和增量数据,通过 Stream Load 的方式持续同步到 Doris 中。适用于需要实时同步多表数据到 Doris 的场景。 + +## 支持的数据源 + +- MySQL +- Postgres + +## 基本原理 + +通过集成 [Flink CDC](https://github.com/apache/flink-cdc) 能力,Doris 支持从 MySQL、Postgres 等数据库读取变更日志,实现多表的全量和增量数据同步。首次同步时会自动创建 Doris 下游表(主键表),并保持主键与上游一致。 + +**注意事项:** + +1. 目前只能保证 at-least-once 语义。 +2. 目前只支持主键表同步。 +3. 需要 Load 权限,若下游表不存在还需有 Create 权限。 + +## 快速上手 + +### 前提条件 + +#### MySQL +需要在 MySQL 端开启 Binlog,即 my.cnf 配置文件中增加: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +需要在 Postgres 端配置逻辑复制,即 postgresql.conf 增加: +```ini +wal_level=logical +``` + +### 创建导入作业 + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### 查看导入状态 + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### 暂停导入作业 + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```sql +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### 删除导入作业 + +```sql +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个多表同步作业语法如下: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| 模块 | 说明 | +| ------------------ | ------------------------- | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| source_properties | 源端(MySQL/PG 等)相关参数 | +| target_properties | Doris 目标库相关参数 | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------------------------ | ------ | -------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### Job 通用导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------ | ------ | -------------------------------------- | +| max_interval | 10s | 当上游没有新增数据时,空闲的调度间隔。 | + +#### 数据源配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +#### Doris 目标库端配置参数 + +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行如下 SQL 查看 Job 当前状态: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行如下 SQL 查看每次 Task 的运行状态: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + + + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Task 导入 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 98% rename from i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md rename to i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index e77f5b09f0308..6bc6b3c98a6a8 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "持续导入", + "title": "TVF 持续导入", "language": "zh-CN", "description": "Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" } @@ -12,7 +12,7 @@ Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提 ## 支持的 TVF -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## 基本原理 @@ -155,7 +155,7 @@ DO | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | | s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | | s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | | max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 37b21d2b0e6e9..caea0b8c191b6 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -2,13 +2,13 @@ { "title": "CREATE STREAMING JOB", "language": "zh-CN", - "description": "Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。" + "description": "Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。" } --- ## 描述 -Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 +Doris Streaming Job 是基于 Job 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 或上游数据源中的数据写入到 Doris 表中。 ## 语法 @@ -16,9 +16,26 @@ Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任 ```SQL CREATE JOB ON STREAMING -[job_properties] +[ PROPERTIES ( + + [ , ... ] + ) +] [ COMMENT ] +( DO +| +( + FROM ( + + [ , ... ]) + TO DATABASE + [ PROPERTIES ( + + -- Other属性 + [ , ... ]) + ] +) ``` @@ -27,12 +44,38 @@ DO **1. ``** > 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 -**3. ``** -> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF +**2. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF。 + +**3. ``** +> 支持的数据源,目前只支持 MySQL 和 Postgres。 + +**4. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| jdbc_url | - | JDBC 连接串(MySQL/PG) | +| driver_url | - | JDBC 驱动 jar 包路径 | +| driver_class | - | JDBC 驱动类名 | +| user | - | 数据库用户名 | +| password | - | 数据库密码 | +| database | - | 数据库名 | +| schema | - | schema 名称 | +| include_tables | - | 需要同步的表名,多个表用逗号分隔 | +| offset | initial | initial: 全量 + 增量同步,latest: 仅增量同步 | + +**5. ``** +> 需要导入的 Doris 目标库名称。 + +**6. ``** +| 参数 | 默认值 | 说明 | +| -------------- | ------- | ------------------------------------------------------------ | +| table.create.properties.* | - | 支持创建表的时候指定 table 的 properties,比如 replication_num | + + ## 可选参数 -**1. ``** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | @@ -52,7 +95,7 @@ DO - TASK 只保留最新的 100 条记录。 -- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 +- 目前 Insert_Command 仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 ## 示例 @@ -75,6 +118,47 @@ DO ); ``` +- 创建一个名为 my_job 的作业,从头开始同步 MySQL 上游的 user_info,order_info 表的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB multi_table_sync + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + +- 创建一个名为 my_job 的作业,持续同步 Postgres 上游的 test_tbls 表的增量的数据,到 target_test_db 库下面。 + + ```sql + CREATE JOB test_postgres_job + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" + ) + TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" + ) + ``` + ## CONFIG **fe.conf** diff --git a/sidebars.ts b/sidebars.ts index 7b1387f77d694..2b51aaa48c36d 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -224,7 +224,14 @@ const sidebars: SidebarsConfig = { 'data-operate/import/load-internals/routine-load-internals', ], }, - "data-operate/import/streaming-job" + { + type: 'category', + label: 'Continuous Load', + items: [ + 'data-operate/import/streaming-job/streaming-job-tvf', + 'data-operate/import/streaming-job/streaming-job-multi-table', + ], + } ], }, { diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md new file mode 100644 index 0000000000000..f97f6b86d57b3 --- /dev/null +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-multi-table.md @@ -0,0 +1,289 @@ +--- +{ + "title": "Multi-table Continuous Load", + "language": "en", + "description": "Doris can continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris using Streaming Job." +} +--- + +## Overview + +Supports using Job to continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris via Streaming Job. Suitable for scenarios requiring real-time multi-table data synchronization to Doris. + +## Supported Data Sources + +- MySQL +- Postgres + +## Basic Principles + +By integrating [Flink CDC](https://github.com/apache/flink-cdc), Doris supports reading change logs from MySQL, Postgres, etc., enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream. + +**Notes:** + +1. Currently only at-least-once semantics are guaranteed. +2. Only primary key tables are supported for synchronization. +3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required. + +## Quick Start + +### Prerequisites + +#### MySQL +Enable Binlog on MySQL by adding the following to my.cnf: +```ini +log-bin=mysql-bin +binlog_format=ROW +server-id=1 +``` + +#### Postgres +Enable logical replication on Postgres by adding the following to postgresql.conf: +```ini +wal_level=logical +``` + +### Creating an Import Job + +#### MySQL + +```sql +CREATE JOB multi_table_sync +ON STREAMING +FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", + "driver_url" = "mysql-connector-j-8.0.31.jar", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "test", + "include_tables" = "user_info,order_info", + "offset" = "initial" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +#### Postgres + +```sql +CREATE JOB test_postgres_job +ON STREAMING +FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", + "driver_url" = "postgresql-42.5.0.jar", + "driver_class" = "org.postgresql.Driver", + "user" = "postgres", + "password" = "postgres", + "database" = "postgres", + "schema" = "public", + "include_tables" = "test_tbls", + "offset" = "latest" +) +TO DATABASE target_test_db ( + "table.create.properties.replication_num" = "1" +) +``` + +### Check Import Status + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: \N + LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +### Pause Import Job + +```sql +PAUSE JOB WHERE jobname = ; +``` + +### Resume Import Job + +```sql +RESUME JOB where jobName = ; +``` + +### Modify Import Job + +```sql +ALTER JOB +FROM MYSQL ( + "user" = "root", + "password" = "123456" +) +TO DATABASE target_test_db +``` + +### Delete Import Job + +```sql +DROP JOB where jobName = ; +``` + +## Reference Manual + +### Import Command + +Syntax for creating a multi-table synchronization job: + +```sql +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +FROM ( + [source_properties] +) +TO DATABASE ( + [target_properties] +) +``` + +| Module | Description | +| ------------------ | --------------------------- | +| job_name | Job name | +| job_properties | General import parameters | +| comment | Job comment | +| source_properties | Source (MySQL/PG) parameters| +| target_properties | Doris target DB parameters | + +### Import Parameters + +#### FE Configuration Parameters + +| Parameter | Default | Description | +| -------------------------------------- | ------- | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads for StreamingTask | +| max_streaming_task_show_count | 100 | Max number of StreamingTask records in memory| + +#### General Job Import Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| max_interval | 10s | Idle scheduling interval when no new data | + +#### Source Configuration Parameters + +| Parameter | Default | Description | +| ------------- | ------- | ------------------------------------------- | +| jdbc_url | - | JDBC connection string (MySQL/PG) | +| driver_url | - | JDBC driver jar path | +| driver_class | - | JDBC driver class name | +| user | - | Database username | +| password | - | Database password | +| database | - | Database name | +| schema | - | Schema name | +| include_tables| - | Tables to synchronize, comma separated | +| offset | initial | initial: full + incremental, latest: incremental only | + +#### Doris Target DB Parameters + +| Parameter | Default | Description | +| ------------------------------- | ------- | ------------------------------------------- | +| table.create.properties.* | - | Table properties when creating, e.g. replication_num | + +### Import Status + +#### Job + +After submitting a job, you can run the following SQL to check the job status: + +```sql +select * from jobs(type=insert) where ExecuteType = "STREAMING" +*************************** 1. row *************************** + Id: 1765332859199 + Name: mysql_db_sync + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1') + CreateTime: 2025-12-10 10:19:35 + SucceedTaskCount: 2 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: \N + CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"} + EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"} + LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| ID | Job ID | +| NAME | Job name | +| Definer | Job definer | +| ExecuteType | Job type: ONE_TIME/RECURRING/STREAMING/MANUAL| +| RecurringStrategy | Recurring strategy, empty for Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Current offset, only for Streaming jobs | +| EndOffset | Max end offset from source, only for Streaming| +| LoadStatistic | Job statistics | +| ErrorMsg | Job error message | +| JobRuntimeMsg | Job runtime info | + +#### Task + +You can run the following SQL to check the status of each Task: + +```sql +select * from tasks(type='insert') where jobId='1765336137066' +*************************** 1. row *************************** + TaskId: 1765336137066 + JobId: 1765332859199 + JobName: mysql_db_sync + Label: 1765332859199_1765336137066 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-12-10 11:09:06 + StartTime: 2025-12-10 11:09:16 + FinishTime: 2025-12-10 11:09:18 + TrackingUrl: \N +LoadStatistic: {"scannedRows":1,"loadBytes":333} + User: root +FirstErrorMsg: +RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"} +``` + +| Result Column | Description | +| ------------------ | ------------------------------------------- | +| TaskId | Task ID | +| JobID | Job ID | +| JobName | Job name | +| Label | Task label | +| Status | Task status | +| ErrorMsg | Task error message | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task finish time | +| LoadStatistic | Task statistics | +| User | Task executor | +| RunningOffset | Current offset, only for Streaming jobs | \ No newline at end of file diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job.md b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md similarity index 97% rename from versioned_docs/version-4.x/data-operate/import/streaming-job.md rename to versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md index 3ad9f085c81eb..4948f23c44df0 100644 --- a/versioned_docs/version-4.x/data-operate/import/streaming-job.md +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job/streaming-job-tvf.md @@ -1,6 +1,6 @@ --- { - "title": "Continuous Load", + "title": "TVF Continuous Load", "language": "en", "description": "Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job," } @@ -12,7 +12,7 @@ Doris allows you to create a continuous import task using a Job + TVF approach. ## Supported TVFs -[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF +[S3](../../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF ## Basic Principles @@ -156,7 +156,7 @@ The module description is as follows: | Parameter | Default Value | Description | | ------------------ | ------ | ------------------------------------------------------------ | -| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | | s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | | s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | | max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index ee402b0ae9aee..547471107ef83 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -229,7 +229,14 @@ "data-operate/import/load-internals/routine-load-internals" ] }, - "data-operate/import/streaming-job" + { + "type": "category", + "label": "Continuous Load", + "items": [ + "data-operate/import/streaming-job/streaming-job-tvf", + "data-operate/import/streaming-job/streaming-job-multi-table" + ] + } ] }, {