Skip to content

Commit ba4bd14

Browse files
committed
Added execution metrics for BQ Pushdown jobs
1 parent 9f76f9c commit ba4bd14

4 files changed

Lines changed: 72 additions & 1 deletion

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,14 @@ public void executeJoin() {
150150
if (queryJob == null) {
151151
throw new SQLEngineException("BigQuery job not found: " + jobId);
152152
} else if (queryJob.getStatus().getError() != null) {
153+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
153154
throw new SQLEngineException(String.format(
154155
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
155156
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
156157
}
157158

158159
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
160+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
159161
}
160162

161163
@Override

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,14 @@ public BigQuerySelectDataset execute() {
124124
if (queryJob == null) {
125125
throw new SQLEngineException("BigQuery job not found: " + jobId);
126126
} else if (queryJob.getStatus().getError() != null) {
127+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
127128
throw new SQLEngineException(String.format(
128129
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
129130
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
130131
}
131132

132133
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
134+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
133135
return this;
134136
}
135137

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
228228

229229
// Check for errors
230230
if (queryJob.getStatus().getError() != null) {
231+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
231232
LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}",
232233
jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(),
233234
queryJob.getStatus().getError().toString());
@@ -241,6 +242,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
241242
LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows,
242243
sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
243244
destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());
245+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
244246

245247
return SQLWriteResult.success(datasetName, numRows);
246248
}

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.DatasetId;
21+
import com.google.cloud.bigquery.Job;
22+
import com.google.cloud.bigquery.JobStatistics;
2123
import com.google.cloud.bigquery.StandardTableDefinition;
2224
import com.google.cloud.bigquery.Table;
2325
import com.google.cloud.bigquery.TableDefinition;
2426
import com.google.cloud.bigquery.TableId;
2527
import com.google.cloud.bigquery.TableInfo;
28+
import com.google.gson.Gson;
2629
import io.cdap.cdap.api.data.schema.Schema;
2730
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
2831
import io.cdap.cdap.etl.api.join.JoinCondition;
@@ -49,6 +52,7 @@
4952
public class BigQuerySQLEngineUtils {
5053

5154
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngineUtils.class);
55+
private static final Gson GSON = new Gson();
5256

5357
public static final String GCS_PATH_FORMAT = BigQuerySinkUtils.GS_PATH_FORMAT + "/%s";
5458
public static final String BQ_TABLE_NAME_FORMAT = "%s_%s";
@@ -218,7 +222,7 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression
218222

219223
/**
220224
* Validates stages for a Join on Key operation
221-
*
225+
* <p>
222226
* TODO: Update logic once BQ SQL engine joins support multiple outer join tables
223227
*
224228
* @param joinDefinition Join Definition to validate
@@ -292,4 +296,65 @@ public static Map<String, String> getJobTags(String operation) {
292296
labels.put("pushdown_operation", operation);
293297
return Collections.unmodifiableMap(labels);
294298
}
299+
300+
/**
301+
* Logs information about a BigQUery Job execution using a specified Logger instance
302+
*
303+
* @param job BigQuery Job
304+
*/
305+
public static void logJobMetrics(Job job) {
306+
// Ensure job has statistics information
307+
if (job.getStatistics() == null) {
308+
LOG.warn("No statistics were found for BigQuery job {}", job.getJobId());
309+
}
310+
311+
String startTimeStr = getISODateTimeString(job.getStatistics().getStartTime());
312+
String endTimeStr = getISODateTimeString(job.getStatistics().getEndTime());
313+
314+
// Print detailed query statistics if available
315+
if (job.getStatistics() instanceof JobStatistics.QueryStatistics) {
316+
JobStatistics.QueryStatistics queryStatistics = (JobStatistics.QueryStatistics) job.getStatistics();
317+
LOG.info("Metrics for job {}:\n" +
318+
" Start: {} ,\n" +
319+
" End: {} ,\n" +
320+
" Processed Bytes: {} ,\n" +
321+
" Billed Bytes: {} ,\n" +
322+
" Total Slot ms: {}",
323+
job.getJobId().getJob(),
324+
startTimeStr,
325+
endTimeStr,
326+
queryStatistics.getTotalBytesProcessed(),
327+
queryStatistics.getTotalBytesBilled(),
328+
queryStatistics.getTotalSlotMs());
329+
330+
if (LOG.isTraceEnabled()) {
331+
LOG.trace("Additional Metrics for job {}:\n" +
332+
" Query Plan: {} ,\n" +
333+
" Query Timeline: {} \n",
334+
job.getJobId().getJob(),
335+
GSON.toJson(queryStatistics.getQueryPlan()),
336+
GSON.toJson(queryStatistics.getTimeline()));
337+
}
338+
339+
return;
340+
}
341+
342+
// Print basic metrics
343+
JobStatistics statistics = job.getStatistics();
344+
LOG.info("Metrics for job: {}\n" +
345+
" Start: {} ,\n" +
346+
" End: {}",
347+
job.getJobId().getJob(),
348+
startTimeStr,
349+
endTimeStr);
350+
351+
}
352+
353+
private static String getISODateTimeString(Long epoch) {
354+
if (epoch == null) {
355+
return "N/A";
356+
}
357+
358+
return Instant.ofEpochMilli(epoch).toString();
359+
}
295360
}

0 commit comments

Comments
 (0)