Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ If the bucket was created automatically, it will be deleted after the run finish
**Truncate Table:** Whether or not to truncate the table before writing to it.
Should only be used with the Insert operation.

**Write Disposition**: Describes whether a job should truncate table but preserve metadata or not.
For more details, see [here](https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.WriteDisposition).

**Location:** The location where the big query datasets will get created. This value is ignored
if the dataset or temporary bucket already exist.

Expand Down
3 changes: 3 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ will be dropped.
**Truncate Table**: Whether or not to truncate the table before writing to it.
Should only be used with the Insert operation.

**Write Disposition**: Describes whether a job should truncate table but preserve metadata or not.
For more details, see [here](https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.WriteDisposition).

**Table Key**: List of fields that determines relation between tables during Update and Upsert operations.

**Dedupe By**: Column names and sort order used to choose which input record to update/upsert when there are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
config.isAllowSchemaRelaxation());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
config.getWriteDisposition().name());
config.getWriteDisposition());
baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD);

public static final String NAME_TRUNCATE_TABLE = "truncateTable";
public static final String NAME_WRITE_DISPOSITION = "writeDisposition";
public static final String NAME_LOCATION = "location";
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
Expand Down Expand Up @@ -79,9 +80,19 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
@Macro
@Nullable
@Description("Whether or not to truncate the table before writing to it. "
+ "Should only be used with the Insert operation. This could overwrite the table schema")
+ "Should only be used with the Insert operation. This could overwrite the table schema based "
+ "on write disposition value.")
protected Boolean truncateTable;

@Name(NAME_WRITE_DISPOSITION)
@Macro
@Nullable
@Description("WRITE_TRUNCATE_DATA preserves the table metadata where as WRITE_TRUNCATE does not. "
+ "For more details, see "
+ "https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/"
+ "BigQueryAuditMetadata.WriteDisposition.")
protected String writeDisposition;

@Name(NAME_LOCATION)
@Macro
@Nullable
Expand Down Expand Up @@ -155,9 +166,16 @@ public boolean isAllowSchemaRelaxation() {
return allowSchemaRelaxation == null ? false : allowSchemaRelaxation;
}

public JobInfo.WriteDisposition getWriteDisposition() {
return isTruncateTableSet() ? JobInfo.WriteDisposition.WRITE_TRUNCATE
: JobInfo.WriteDisposition.WRITE_APPEND;
private String getTruncateTableWriteDisposition() {
if (writeDisposition == null) {
return JobInfo.WriteDisposition.WRITE_TRUNCATE.name();
}
return writeDisposition;
}

public String getWriteDisposition() {
return isTruncateTableSet() ? getTruncateTableWriteDisposition()
: JobInfo.WriteDisposition.WRITE_APPEND.name();
}

public boolean isTruncateTableSet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable

Map<String, String> fieldDescriptions = new HashMap<>();
if (JobInfo.WriteDisposition.WRITE_TRUNCATE
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition)) && tableExists) {
.equals(writeDisposition.toUpperCase()) && tableExists) {
List<TableFieldSchema> tableFieldSchemas = Optional.ofNullable(bigQueryHelper.getTable(tableRef))
.map(it -> it.getSchema())
.map(it -> it.getFields())
Expand Down Expand Up @@ -411,8 +411,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
// Schema update options should only be specified with WRITE_APPEND disposition,
// or with WRITE_TRUNCATE disposition on a table partition - The logic below should change when we support
// insertion into single partition
if (allowSchemaRelaxation && !JobInfo.WriteDisposition.WRITE_TRUNCATE
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition))) {
if (allowSchemaRelaxation && !JobInfo.WriteDisposition.WRITE_TRUNCATE.name()
.equals(writeDisposition.toUpperCase())) {
loadConfig.setSchemaUpdateOptions(Arrays.asList(
JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION.name(),
JobInfo.SchemaUpdateOption.ALLOW_FIELD_RELAXATION.name()));
Expand Down Expand Up @@ -440,7 +440,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
if (operation.equals(Operation.INSERT) && gcsPaths.size() <= BQ_IMPORT_MAX_BATCH_SIZE) {
// Directly load data into destination table when total no of input paths is loadable into BQ
loadConfig.setSourceUris(gcsPaths);
loadConfig.setWriteDisposition(writeDisposition);
loadConfig.setWriteDisposition(writeDisposition.toUpperCase());
loadConfig.setDestinationTable(tableRef);

JobConfiguration config = new JobConfiguration();
Expand Down Expand Up @@ -499,7 +499,7 @@ private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationL
.setTableId(temporaryTableName);

loadConfig.setDestinationTable(temporaryTableReference);
loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND.toString());
loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND.name());

// Split the list of files in batches 10000 (current bq load job limit) and import /append onto a temp table
List<List<String>> gcsPathsInBatches = Lists.partition(gcsPaths, BQ_IMPORT_MAX_BATCH_SIZE);
Expand Down Expand Up @@ -747,7 +747,7 @@ private static TableSchema createTableSchemaFromFields(String fieldsJson) throws
private void updateFieldDescriptions(String writeDisposition, TableReference tableRef,
Map<String, String> fieldDescriptions) throws IOException {
if (JobInfo.WriteDisposition.WRITE_TRUNCATE
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition))) {
.equals(writeDisposition.toUpperCase())) {

Table table = bigQueryHelper.getTable(tableRef);
List<TableFieldSchema> tableFieldSchemas = Optional.ofNullable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,17 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj
@Nullable String serviceAccountJson,
@Nullable String dataset, @Nullable String table, @Nullable String location,
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue,
@Nullable String timePartitioningType) {
@Nullable String timePartitioningType, @Nullable String writeDisposition,
@Nullable Boolean truncateTable) {
super(new BigQueryConnectorConfig(project, project, serviceAccountType,
serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket);
this.referenceName = referenceName;
this.table = table;
this.location = location;
this.jobLabelKeyValue = jobLabelKeyValue;
this.timePartitioningType = timePartitioningType;
this.writeDisposition = writeDisposition;
this.truncateTable = truncateTable;
}

public String getTable() {
Expand Down Expand Up @@ -734,6 +737,9 @@ public static class Builder {
private String bucket;
private String jobLabelKeyValue;
private String timePartitioningType;
private String writeDisposition;
private Boolean truncateTable;


public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) {
this.referenceName = referenceName;
Expand Down Expand Up @@ -794,6 +800,16 @@ public BigQuerySinkConfig.Builder setTimePartitioningType(@Nullable String timeP
return this;
}

public BigQuerySinkConfig.Builder setWriteDisposition(@Nullable String writeDisposition) {
this.writeDisposition = writeDisposition;
return this;
}

public BigQuerySinkConfig.Builder setTruncateTable(@Nullable Boolean truncateTable) {
this.truncateTable = truncateTable;
return this;
}

public BigQuerySinkConfig build() {
return new BigQuerySinkConfig(
referenceName,
Expand All @@ -807,7 +823,9 @@ public BigQuerySinkConfig build() {
cmekKey,
bucket,
jobLabelKeyValue,
timePartitioningType
timePartitioningType,
writeDisposition,
truncateTable
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.TimePartitioning;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
Expand Down Expand Up @@ -56,6 +57,27 @@ public void setup() throws NoSuchMethodException {
arguments = new HashMap<>();
}

@Test
public void testValidateWriteDisposition() {
BigQuerySinkConfig bigQuerySinkConfig =
BigQuerySinkConfig.builder()
.setTruncateTable(true)
.setWriteDisposition("WRITE_TRUNCATE_DATA")
.build();
Assert.assertEquals("WRITE_TRUNCATE_DATA", bigQuerySinkConfig.getWriteDisposition());

bigQuerySinkConfig = BigQuerySinkConfig.builder().setWriteDisposition("WRITE_APPEND").build();
Assert.assertEquals(bigQuerySinkConfig.getWriteDisposition(),
JobInfo.WriteDisposition.WRITE_APPEND.name());

bigQuerySinkConfig = BigQuerySinkConfig.builder()
.setTruncateTable(true)
.setWriteDisposition("WRITE_TRUNCATE")
.build();
Assert.assertEquals(bigQuerySinkConfig.getWriteDisposition(),
JobInfo.WriteDisposition.WRITE_TRUNCATE.name());
}

@Test
public void testValidateTimePartitioningColumnWithHourAndDate() throws
InvocationTargetException, IllegalAccessException {
Expand Down
31 changes: 31 additions & 0 deletions widgets/BigQueryMultiTable-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@
"label": "False"
}
}
},
{
"name": "writeDisposition",
"widget-type": "radio-group",
"label": "Write Disposition",
"widget-attributes": {
"layout": "inline",
"default": "WRITE_TRUNCATE",
"options": [
{
"id": "WRITE_TRUNCATE",
"label": "WRITE_TRUNCATE"
},
{
"id": "WRITE_TRUNCATE_DATA",
"label": "WRITE_TRUNCATE_DATA"
}
]
}
}
]
},
Expand Down Expand Up @@ -296,6 +315,18 @@
"name": "connection"
}
]
},
{
"name": "showWriteDisposition",
"condition": {
"expression": "truncateTable == true"
},
"show": [
{
"type": "property",
"name": "writeDisposition"
}
]
}
],
"jump-config": {
Expand Down
31 changes: 31 additions & 0 deletions widgets/BigQueryTable-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,25 @@
}
}
},
{
"name": "writeDisposition",
"widget-type": "radio-group",
"label": "Write Disposition",
"widget-attributes": {
"layout": "inline",
"default": "WRITE_TRUNCATE",
"options": [
{
"id": "WRITE_TRUNCATE",
"label": "WRITE_TRUNCATE"
},
{
"id": "WRITE_TRUNCATE_DATA",
"label": "WRITE_TRUNCATE_DATA"
}
]
}
},
{
"name": "allowSchemaRelaxation",
"widget-type": "toggle",
Expand Down Expand Up @@ -551,6 +570,18 @@
"name": "relationTableKey"
}
]
},
{
"name": "showWriteDisposition",
"condition": {
"expression": "truncateTable == true"
},
"show": [
{
"type": "property",
"name": "writeDisposition"
}
]
}
],
"jump-config": {
Expand Down
Loading