-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathAbstractBigQuerySink.java
More file actions
413 lines (375 loc) · 18.5 KB
/
AbstractBigQuerySink.java
File metadata and controls
413 lines (375 loc) · 18.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.gcp.bigquery.sink;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
/**
* Base class for Big Query batch sink plugins.
*/
public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, StructuredRecord, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractBigQuerySink.class);
private static final String gcsPathFormat = "gs://%s/%s";
public static final String RECORDS_UPDATED_METRIC = "records.updated";
// UUID for the run. Will be used as bucket name if bucket is not provided.
// UUID is used since GCS bucket names must be globally unique.
private final UUID runUUID = UUID.randomUUID();
protected Configuration baseConfiguration;
protected BigQuery bigQuery;
/**
* Executes main prepare run logic. Child classes cannot override this method,
* instead they should implement two methods {@link #prepareRunValidation(BatchSinkContext)}
* and {@link #prepareRunInternal(BatchSinkContext, BigQuery, String)} in order to add custom logic.
*
* @param context batch sink context
*/
@Override
public final void prepareRun(BatchSinkContext context) throws Exception {
prepareRunValidation(context);
AbstractBigQuerySinkConfig config = getConfig();
String serviceAccount = config.getServiceAccount();
Credentials credentials = serviceAccount == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
String project = config.getProject();
String datasetProjectId = config.getDatasetProject();
bigQuery = GCPUtils.getBigQuery(datasetProjectId, credentials);
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
baseConfiguration = getBaseConfiguration(cmekKey);
String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, config.getBucket(), runUUID.toString());
if (!context.isPreviewEnabled()) {
BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials), config.getDataset(),
bucket, config.getLocation(), cmekKey);
}
prepareRunInternal(context, bigQuery, bucket);
}
@Override
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = String.format("gs://%s", runUUID.toString());
} else {
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
}
try {
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
}
}
@Override
public void transform(StructuredRecord input, Emitter<KeyValue<StructuredRecord, NullWritable>> emitter) {
emitter.emit(new KeyValue<>(input, NullWritable.get()));
}
/**
* Initializes output along with lineage recording for given table and its schema.
*
* @param context batch sink context
* @param bigQuery big query client for the configured project
* @param outputName output name
* @param tableName table name
* @param tableSchema table schema
* @param bucket bucket name
*/
protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String tableName,
@Nullable Schema tableSchema, String bucket,
FailureCollector collector) throws IOException {
LOG.debug("Init output for table '{}' with schema: {}", tableName, tableSchema);
List<BigQueryTableFieldSchema> fields = getBigQueryTableFields(bigQuery, tableName, tableSchema,
getConfig().isAllowSchemaRelaxation(), collector);
Configuration configuration = new Configuration(baseConfiguration);
// Build GCS storage path for this bucket output.
String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, runUUID.toString(), tableName);
BigQuerySinkUtils.configureOutput(configuration,
getConfig().getDatasetProject(),
getConfig().getDataset(),
tableName,
temporaryGcsPath,
fields);
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exist.
// We call emitLineage before since it creates the dataset with schema which is used.
List<String> fieldNames = fields.stream()
.map(BigQueryTableFieldSchema::getName)
.collect(Collectors.toList());
recordLineage(context, outputName, tableSchema, fieldNames);
context.addOutput(Output.of(outputName, getOutputFormatProvider(configuration, tableName, tableSchema)));
}
/**
* Child classes must provide configuration based on {@link AbstractBigQuerySinkConfig}.
*
* @return config instance
*/
protected abstract AbstractBigQuerySinkConfig getConfig();
/**
* Child classes must override this method to provide specific validation logic to executed before
* actual {@link #prepareRun(BatchSinkContext)} method execution.
* For example, Batch Sink plugin can validate schema right away,
* Batch Multi Sink does not have information at this point to do the validation.
*
* @param context batch sink context
*/
protected abstract void prepareRunValidation(BatchSinkContext context);
/**
* Executes main prepare run logic, i.e. prepares output for given table (for Batch Sink plugin)
* or for a number of tables (for Batch Multi Sink plugin).
*
* @param context batch sink context
* @param bigQuery a big query client for the configured project
* @param bucket bucket name
*/
protected abstract void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery,
String bucket) throws IOException;
/**
* Returns output format provider instance specific to the child classes that extend this class.
*
* @param configuration Hadoop configuration
* @param tableName table name
* @param tableSchema table schema
* @return output format provider
*/
protected abstract OutputFormatProvider getOutputFormatProvider(Configuration configuration,
String tableName,
Schema tableSchema);
/**
* Initialized base configuration needed to load data into BigQuery table.
*
* @return base configuration
*/
private Configuration getBaseConfiguration(@Nullable String cmekKey) throws IOException {
AbstractBigQuerySinkConfig config = getConfig();
Configuration baseConfiguration = BigQueryUtil.getBigQueryConfig(config.getServiceAccount(), config.getProject(),
cmekKey, config.getServiceAccountType());
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
config.isAllowSchemaRelaxation());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
config.getWriteDisposition().name());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
String gcsChunkSize = "8388608";
if (!Strings.isNullOrEmpty(config.getGcsChunkSize())) {
gcsChunkSize = config.getGcsChunkSize();
}
baseConfiguration.set("fs.gs.outputstream.upload.chunk.size", gcsChunkSize);
return baseConfiguration;
}
protected void validateInsertSchema(Table table, @Nullable Schema tableSchema, FailureCollector collector) {
com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema();
if (bqSchema == null || bqSchema.getFields().isEmpty()) {
// Table is created without schema, so no further validation is required.
return;
}
if (getConfig().isTruncateTableSet() || tableSchema == null) {
//no validation required for schema if truncate table is set.
// BQ will overwrite the schema for normal tables when write disposition is WRITE_TRUNCATE
//note - If write to single partition is supported in future, schema validation will be necessary
return;
}
FieldList bqFields = bqSchema.getFields();
List<Schema.Field> outputSchemaFields = Objects.requireNonNull(tableSchema.getFields());
List<String> remainingBQFields = BigQueryUtil.getBqFieldsMinusSchema(bqFields, outputSchemaFields);
for (String field : remainingBQFields) {
if (bqFields.get(field).getMode() != Field.Mode.NULLABLE) {
collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
String.format("Add '%s' to the schema.", field));
}
}
String tableName = table.getTableId().getTable();
List<String> missingBQFields = BigQueryUtil.getSchemaMinusBqFields(outputSchemaFields, bqFields);
// Match output schema field type with BigQuery column type
for (Schema.Field field : tableSchema.getFields()) {
String fieldName = field.getName();
// skip checking schema if field is missing in BigQuery
if (!missingBQFields.contains(fieldName)) {
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(
bqFields.get(field.getName()), field, getConfig().getDataset(), tableName,
AbstractBigQuerySinkConfig.SUPPORTED_TYPES, collector);
if (failure != null) {
failure.withInputSchemaField(fieldName).withOutputSchemaField(fieldName);
}
BigQueryUtil.validateFieldModeMatches(bqFields.get(fieldName), field,
getConfig().isAllowSchemaRelaxation(),
collector);
}
}
collector.getOrThrowException();
}
/**
* Validates output schema against Big Query table schema. It throws {@link IllegalArgumentException}
* if the output schema has more fields than Big Query table or output schema field types does not match
* Big Query column types unless schema relaxation policy is allowed.
*
* @param tableName big query table
* @param bqSchema BigQuery table schema
* @param tableSchema Configured table schema
* @param allowSchemaRelaxation allows schema relaxation policy
* @param collector failure collector
*/
protected void validateSchema(
String tableName,
com.google.cloud.bigquery.Schema bqSchema,
@Nullable Schema tableSchema,
boolean allowSchemaRelaxation,
FailureCollector collector) {
if (bqSchema == null || bqSchema.getFields().isEmpty() || tableSchema == null) {
// Table is created without schema, so no further validation is required.
return;
}
FieldList bqFields = bqSchema.getFields();
List<Schema.Field> outputSchemaFields = Objects.requireNonNull(tableSchema.getFields());
List<String> missingBQFields = BigQueryUtil.getSchemaMinusBqFields(outputSchemaFields, bqFields);
if (allowSchemaRelaxation && !getConfig().isTruncateTableSet()) {
// Required fields can be added only if truncate table option is set.
List<String> nonNullableFields = missingBQFields.stream()
.map(tableSchema::getField)
.filter(Objects::nonNull)
.filter(field -> !field.getSchema().isNullable())
.map(Schema.Field::getName)
.collect(Collectors.toList());
for (String nonNullableField : nonNullableFields) {
collector.addFailure(
String.format("Required field '%s' does not exist in BigQuery table '%s.%s'.",
nonNullableField, getConfig().getDataset(), tableName),
"Change the field to be nullable.")
.withInputSchemaField(nonNullableField).withOutputSchemaField(nonNullableField);
}
}
if (!allowSchemaRelaxation) {
// schema should not have fields that are not present in BigQuery table,
for (String missingField : missingBQFields) {
collector.addFailure(
String.format("Field '%s' does not exist in BigQuery table '%s.%s'.",
missingField, getConfig().getDataset(), tableName),
String.format("Remove '%s' from the input, or add a column to the BigQuery table.", missingField))
.withInputSchemaField(missingField).withOutputSchemaField(missingField);
}
// validate the missing columns in output schema are nullable fields in BigQuery
List<String> remainingBQFields = BigQueryUtil.getBqFieldsMinusSchema(bqFields, outputSchemaFields);
for (String field : remainingBQFields) {
Field.Mode mode = bqFields.get(field).getMode();
// Mode is optional. If the mode is unspecified, the column defaults to NULLABLE.
if (mode != null && mode != Field.Mode.NULLABLE) {
collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field),
String.format("Add '%s' to the schema.", field));
}
}
}
// column type changes should be disallowed if either allowSchemaRelaxation or truncate table are not set.
if (!allowSchemaRelaxation || !getConfig().isTruncateTableSet()) {
// Match output schema field type with BigQuery column type
for (Schema.Field field : tableSchema.getFields()) {
String fieldName = field.getName();
// skip checking schema if field is missing in BigQuery
if (!missingBQFields.contains(fieldName)) {
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(
bqFields.get(field.getName()), field, getConfig().getDataset(), tableName,
AbstractBigQuerySinkConfig.SUPPORTED_TYPES, collector);
if (failure != null) {
failure.withInputSchemaField(fieldName).withOutputSchemaField(fieldName);
}
BigQueryUtil.validateFieldModeMatches(bqFields.get(fieldName), field,
allowSchemaRelaxation,
collector);
}
}
}
collector.getOrThrowException();
}
/**
* Generates Big Query field instances based on given CDAP table schema after schema validation.
*
* @param bigQuery big query object
* @param tableName table name
* @param tableSchema table schema
* @param allowSchemaRelaxation if schema relaxation policy is allowed
* @param collector failure collector
* @return list of Big Query fields
*/
private List<BigQueryTableFieldSchema> getBigQueryTableFields(BigQuery bigQuery, String tableName,
@Nullable Schema tableSchema,
boolean allowSchemaRelaxation,
FailureCollector collector) {
if (tableSchema == null) {
return Collections.emptyList();
}
TableId tableId = TableId.of(getConfig().getProject(), getConfig().getDataset(), tableName);
try {
Table table = bigQuery.getTable(tableId);
// if table is null that mean it does not exist. So there is no need to perform validation
if (table != null) {
com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema();
validateSchema(tableName, bqSchema, tableSchema, allowSchemaRelaxation, collector);
}
} catch (BigQueryException e) {
collector.addFailure("Unable to get details about the BigQuery table: " + e.getMessage(), null)
.withConfigProperty("table");
throw collector.getOrThrowException();
}
return BigQuerySinkUtils.getBigQueryTableFieldsFromSchema(tableSchema);
}
/**
* Creates Hadoop configuration instance
*
* @return Hadoop configuration
*/
protected Configuration getOutputConfiguration() throws IOException {
Configuration configuration = new Configuration(baseConfiguration);
return configuration;
}
private void recordLineage(BatchSinkContext context,
String outputName,
Schema tableSchema,
List<String> fieldNames) {
LineageRecorder lineageRecorder = new LineageRecorder(context, outputName);
lineageRecorder.createExternalDataset(tableSchema);
if (!fieldNames.isEmpty()) {
lineageRecorder.recordWrite("Write", "Wrote to BigQuery table.", fieldNames);
}
}
}