diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index e3d6056a5de9..99a8fc8ff6d5 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 14 } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java index dcdbdb44c00c..2cc32c44a625 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java @@ -28,11 +28,13 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import java.math.BigDecimal; +import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -102,6 +104,11 @@ public static Mutation createMutationFromBeamRows( return mutationBuilder.build(); } + private static Timestamp toSpannerTimestamp(Instant instant) { + long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L; + return Timestamp.ofTimeMicroseconds(micros); + } + private static void setBeamValueToKey( Key.Builder keyBuilder, Schema.FieldType field, String columnName, Row row) { switch (field.getTypeName()) { @@ -147,6 +154,21 @@ private static void setBeamValueToKey( keyBuilder.append(row.getDecimal(columnName)); break; // TODO: Implement logical date and datetime + case LOGICAL_TYPE: + Schema.LogicalType logicalType = checkNotNull(field.getLogicalType()); + String identifier = logicalType.getIdentifier(); + if (identifier.equals(MicrosInstant.IDENTIFIER)) { + Instant instant = row.getValue(columnName); + if (instant == null) { + keyBuilder.append((Timestamp) null); + } else { + keyBuilder.append(toSpannerTimestamp(instant)); + } + } else { + throw new IllegalArgumentException( + String.format("Unsupported logical type in key: %s", identifier)); + } + break; case DATETIME: @Nullable ReadableDateTime dateTime = row.getDateTime(columnName); if (dateTime == null) { @@ -224,7 +246,21 @@ private static void setBeamValueToMutation( mutationBuilder.set(columnName).to(decimal); } break; - // TODO: Implement logical date and datetime + case LOGICAL_TYPE: + Schema.LogicalType logicalType = checkNotNull(fieldType.getLogicalType()); + String identifier = logicalType.getIdentifier(); + if (identifier.equals(MicrosInstant.IDENTIFIER)) { + @Nullable Instant instant = row.getValue(columnName); + if (instant == null) { + mutationBuilder.set(columnName).to((Timestamp) null); + } else { + mutationBuilder.set(columnName).to(toSpannerTimestamp(instant)); + } + } else { + throw new IllegalArgumentException( + String.format("Unsupported logical type: %s", identifier)); + } + break; case DATETIME: @Nullable ReadableDateTime dateTime = row.getDateTime(columnName); if (dateTime == null) { @@ -335,6 +371,27 @@ private static void addIterableToMutationBuilder( case STRING: mutationBuilder.set(column).toStringArray((Iterable) ((Object) iterable)); break; + case LOGICAL_TYPE: + String identifier = checkNotNull(beamIterableType.getLogicalType()).getIdentifier(); + if (identifier.equals(MicrosInstant.IDENTIFIER)) { + if (iterable == null) { + mutationBuilder.set(column).toTimestampArray(null); + } else { + mutationBuilder + .set(column) + .toTimestampArray( + StreamSupport.stream(iterable.spliterator(), false) + .map( + instant -> { + return toSpannerTimestamp((java.time.Instant) instant); + }) + .collect(toList())); + } + } else { + throw new IllegalArgumentException( + String.format("Unsupported logical type in iterable: %s", identifier)); + } + break; case DATETIME: if (iterable == null) { mutationBuilder.set(column).toDateArray(null); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java index 51eda7d16eb9..ac8f4becbd0c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.values.Row; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; @@ -352,6 +353,11 @@ private static void addIterableToStructBuilder( } } + private static java.time.Instant fromSpannerTimestamp(Timestamp spannerTimestamp) { + long micros = spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L; + return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1_000L); + } + private static @Nullable Object getStructValue(Struct struct, Schema.Field field) { String column = field.getName(); Type.Code typeCode = struct.getColumnType(column).getCode(); @@ -365,7 +371,19 @@ private static void addIterableToStructBuilder( return struct.getBytes(column).toByteArray(); // TODO: implement logical datetime case TIMESTAMP: - return Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime(); + Timestamp spannerTimestamp = struct.getTimestamp(column); + + // Check if the Beam schema expects MicrosInstant logical type + Schema.FieldType fieldType = field.getType(); + if (fieldType.getTypeName().isLogicalType()) { + Schema.@Nullable LogicalType logicalType = fieldType.getLogicalType(); + if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) { + return fromSpannerTimestamp(spannerTimestamp); + } + } + // Default DATETIME behavior: convert to Joda DateTime + return Instant.ofEpochSecond(spannerTimestamp.getSeconds()).toDateTime(); + // TODO: implement logical date case DATE: return DateTime.parse(struct.getDate(column).toString()); @@ -407,11 +425,26 @@ private static void addIterableToStructBuilder( return struct.getBooleanList(column); case BYTES: return struct.getBytesList(column); - // TODO: implement logical datetime case TIMESTAMP: + // Check if expects MicrosInstant in arrays + Schema.@Nullable FieldType elementType = field.getType().getCollectionElementType(); + if (elementType != null && elementType.getTypeName().isLogicalType()) { + Schema.@Nullable LogicalType logicalType = elementType.getLogicalType(); + if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) { + // Return List for MicrosInstant arrays + return struct.getTimestampList(column).stream() + .map( + timestamp -> { + return fromSpannerTimestamp(timestamp); + }) + .collect(toList()); + } + } + // Default: return List for DATETIME type return struct.getTimestampList(column).stream() .map(timestamp -> Instant.ofEpochSecond(timestamp.getSeconds()).toDateTime()) .collect(toList()); + // TODO: implement logical date case DATE: return struct.getDateList(column).stream() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java index 6a0a1787deca..c68c2d3a0216 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java @@ -28,8 +28,10 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; import java.math.BigDecimal; +import java.time.Instant; import java.util.List; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; @@ -44,6 +46,7 @@ public class MutationUtilsTest { private static final Struct EMPTY_STRUCT = Struct.newBuilder().build(); private static final Struct INT64_STRUCT = Struct.newBuilder().set("int64").to(3L).build(); private static final String TABLE = "some_table"; + private static final Instant TEST_INSTANT = Instant.parse("2024-01-15T10:30:00.123456Z"); private static final Schema WRITE_ROW_SCHEMA = Schema.builder() @@ -71,6 +74,10 @@ public class MutationUtilsTest { .addNullableField("f_decimal", Schema.FieldType.DECIMAL) .addNullableField("f_byte", Schema.FieldType.BYTE) .addNullableField("f_iterable", Schema.FieldType.iterable(Schema.FieldType.INT64)) + .addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant())) + .addNullableField( + "f_micros_instant_array", + Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant()))) .build(); private static final Row WRITE_ROW = @@ -107,6 +114,8 @@ public class MutationUtilsTest { .withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE)) .withFieldValue("f_byte", Byte.parseByte("127")) .withFieldValue("f_iterable", ImmutableList.of(2L, 3L)) + .withFieldValue("f_micros_instant", TEST_INSTANT) + .withFieldValue("f_micros_instant_array", ImmutableList.of(TEST_INSTANT, TEST_INSTANT)) .build(); private static final Schema WRITE_ROW_SCHEMA_NULLS = @@ -123,6 +132,10 @@ public class MutationUtilsTest { .addNullableField("f_array", Schema.FieldType.array(Schema.FieldType.INT64)) .addNullableField( "f_struct_array", Schema.FieldType.array(Schema.FieldType.row(INT64_SCHEMA))) + .addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant())) + .addNullableField( + "f_micros_instant_array", + Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant()))) .build(); private static final Row WRITE_ROW_NULLS = @@ -138,6 +151,8 @@ public class MutationUtilsTest { .addValue(null) .addValue(null) .addValue(null) + .addValue(null) + .addValue(null) .build(); private static final Schema KEY_SCHEMA = @@ -153,6 +168,7 @@ public class MutationUtilsTest { .addNullableField("f_int32", Schema.FieldType.INT32) .addNullableField("f_decimal", Schema.FieldType.DECIMAL) .addNullableField("f_byte", Schema.FieldType.BYTE) + .addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant())) .build(); private static final Row KEY_ROW = @@ -168,6 +184,7 @@ public class MutationUtilsTest { .withFieldValue("f_int32", 0x7fffffff) .withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE)) .withFieldValue("f_byte", Byte.parseByte("127")) + .withFieldValue("f_micros_instant", TEST_INSTANT) .build(); private static final Schema KEY_SCHEMA_NULLS = @@ -178,6 +195,7 @@ public class MutationUtilsTest { .addNullableField("f_bytes", Schema.FieldType.BYTES) .addNullableField("f_date_time", Schema.FieldType.DATETIME) .addNullableField("f_bool", Schema.FieldType.BOOLEAN) + .addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant())) .build(); private static final Row KEY_ROW_NULLS = @@ -188,6 +206,7 @@ public class MutationUtilsTest { .addValue(null) .addValue(null) .addValue(null) + .addValue(null) .build(); @Test @@ -264,6 +283,7 @@ public void testCreateDeleteMutationFromRowWithNulls() { } private static Mutation createDeleteMutation() { + long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L; Key key = Key.newBuilder() .append(1L) @@ -277,6 +297,7 @@ private static Mutation createDeleteMutation() { .append(0x7fffffff) .append(BigDecimal.valueOf(Long.MIN_VALUE)) .append(Byte.parseByte("127")) + .append(Timestamp.ofTimeMicroseconds(micros)) .build(); return Mutation.delete(TABLE, key); } @@ -290,12 +311,14 @@ private static Mutation createDeleteMutationNulls() { .append((ByteArray) null) .append((Timestamp) null) .append((Boolean) null) + .append((Timestamp) null) .build(); return Mutation.delete(TABLE, key); } private static Mutation createMutation(Mutation.Op operation) { Mutation.WriteBuilder builder = chooseBuilder(operation); + long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L; return builder .set("f_int64") .to(1L) @@ -353,6 +376,12 @@ private static Mutation createMutation(Mutation.Op operation) { .to(Byte.parseByte("127")) .set("f_iterable") .toInt64Array(ImmutableList.of(2L, 3L)) + .set("f_micros_instant") + .to(Timestamp.ofTimeMicroseconds(micros)) + .set("f_micros_instant_array") + .toTimestampArray( + ImmutableList.of( + Timestamp.ofTimeMicroseconds(micros), Timestamp.ofTimeMicroseconds(micros))) .build(); } @@ -381,6 +410,10 @@ private static Mutation createMutationNulls(Mutation.Op operation) { .toInt64Array((List) null) .set("f_struct_array") .toStructArray(Type.struct(Type.StructField.of("int64", Type.int64())), null) + .set("f_micros_instant") + .to((Timestamp) null) + .set("f_micros_instant_array") + .toTimestampArray(null) .build(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java index 1cdf9afa7de1..9a378b015182 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java @@ -33,8 +33,10 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.TypeCode; import java.math.BigDecimal; +import java.time.Instant; import java.util.List; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -45,6 +47,10 @@ public class StructUtilsTest { private static final Schema EMPTY_SCHEMA = Schema.builder().build(); private static final Schema INT64_SCHEMA = Schema.builder().addInt64Field("int64").build(); + private static final Timestamp TIMESTAMP = Timestamp.ofTimeMicroseconds(1234567890123456L); + private static final Instant INSTANT = + Instant.ofEpochSecond( + 1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 1_000L); @Test public void testStructToBeamRow() { @@ -286,6 +292,39 @@ public void testStructTypeToBeamRowSchemaFailsTypeNotSupported() { "Error processing struct to row: Unsupported type 'STRUCT'.", exception.getMessage()); } + @Test + public void testStructToBeamRowWithMicrosInstant() { + Schema schema = + Schema.builder() + .addInt64Field("f_int64") + .addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant())) + .addNullableField( + "f_micros_instant_array", + Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant()))) + .build(); + + Struct struct = + Struct.newBuilder() + .set("f_int64") + .to(42L) + .set("f_micros_instant") + .to(TIMESTAMP) + .set("f_micros_instant_array") + .toTimestampArray(ImmutableList.of(TIMESTAMP, TIMESTAMP)) + .build(); + + Row result = StructUtils.structToBeamRow(struct, schema); + + assertEquals(42L, result.getInt64("f_int64").longValue()); + + assertEquals(INSTANT, result.getValue("f_micros_instant")); + + @SuppressWarnings("unchecked") + List instants = (List) result.getValue("f_micros_instant_array"); + assertEquals(2, instants.size()); + assertEquals(INSTANT, instants.get(0)); + } + private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) { return StructType.Field.newBuilder() .setName(name) diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py index 43a74f170531..b5d5304245c4 100644 --- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py @@ -26,6 +26,8 @@ from typing import NamedTuple from typing import Optional +import pytest + import apache_beam as beam from apache_beam import coders from apache_beam.io.gcp.spanner import ReadFromSpanner @@ -37,6 +39,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -50,6 +53,8 @@ DockerContainer = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports +TIMESTAMPS = [Timestamp.of(1234567890.0 + i) for i in range(1000)] + class SpannerTestKey(NamedTuple): f_string: str @@ -59,13 +64,20 @@ class SpannerTestRow(NamedTuple): f_string: str f_int64: Optional[int] f_boolean: Optional[bool] + f_timestamp: Optional[Timestamp] class SpannerPartTestRow(NamedTuple): f_string: str f_int64: Optional[int] + f_timestamp: Optional[Timestamp] +@pytest.mark.uses_gcp_java_expansion_service +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") @unittest.skipIf(spanner is None, 'GCP dependencies are not installed.') @unittest.skipIf( DockerContainer is None, 'testcontainers package is not installed.') @@ -118,76 +130,112 @@ def tearDown(self): def test_spanner_insert_or_update(self): self.spanner_helper.insert_values( - self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)]) + self.database_id, + [('or_update0', 5, False, TIMESTAMPS[1].to_rfc3339()), + ('or_update1', 9, False, TIMESTAMPS[0].to_rfc3339())]) def to_row_fn(i): return SpannerTestRow( - f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0) + f_int64=i, + f_string=f'or_update{i}', + f_boolean=i % 2 == 0, + f_timestamp=TIMESTAMPS[i]) self.run_write_pipeline(3, to_row_fn, SpannerTestRow, SpannerInsertOrUpdate) - self.assertEqual( - self.spanner_helper.read_data(self.database_id, prefix='or_update'), - [[f'or_update{i}', i, i % 2 == 0] for i in range(3)]) + results = self.spanner_helper.read_data( + self.database_id, prefix='or_update') + self.assertEqual(len(results), 3) + for i, row in enumerate(results): + self.assertEqual(row[0], f'or_update{i}') + self.assertEqual(row[1], i) + self.assertEqual(row[2], i % 2 == 0) + self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto()) def test_spanner_insert(self): def to_row_fn(num): return SpannerTestRow( - f_string=f'insert{num}', f_int64=num, f_boolean=None) + f_string=f'insert{num}', + f_int64=num, + f_boolean=None, + f_timestamp=TIMESTAMPS[num]) self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert) def compare_row(row): return row[1] - self.assertEqual( - sorted( - self.spanner_helper.read_data(self.database_id, 'insert'), - key=compare_row), [[f'insert{i}', i, None] for i in range(1000)]) + results = sorted( + self.spanner_helper.read_data(self.database_id, 'insert'), + key=compare_row) + + self.assertEqual(len(results), 1000) + for i, row in enumerate(results): + self.assertEqual(row[0], f'insert{i}') + self.assertEqual(row[1], i) + self.assertIsNone(row[2]) + self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto()) def test_spanner_replace(self): self.spanner_helper.insert_values( - self.database_id, [('replace0', 0, True), ('replace1', 1, False)]) + self.database_id, + [('replace0', 0, True, TIMESTAMPS[10].to_rfc3339()), + ('replace1', 1, False, TIMESTAMPS[11].to_rfc3339())]) def to_row_fn(num): - return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10) + return SpannerPartTestRow( + f_string=f'replace{num}', + f_int64=num + 10, + f_timestamp=TIMESTAMPS[num]) self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace) - + results = self.spanner_helper.read_data(self.database_id, prefix='replace') + for i in range(len(results)): + results[i][3] = results[i][3].timestamp_pb() self.assertEqual( - self.spanner_helper.read_data(self.database_id, prefix='replace'), - [['replace0', 10, None], ['replace1', 11, None]]) + results, + [['replace0', 10, None, TIMESTAMPS[0].to_proto()], + ['replace1', 11, None, TIMESTAMPS[1].to_proto()]]) def test_spanner_update(self): self.spanner_helper.insert_values( - self.database_id, [('update0', 5, False), ('update1', 9, False)]) + self.database_id, + [('update0', 5, False, TIMESTAMPS[10].to_rfc3339()), + ('update1', 9, False, TIMESTAMPS[100].to_rfc3339())]) def to_row_fn(num): - return SpannerPartTestRow(f_string=f'update{num}', f_int64=num + 10) + return SpannerPartTestRow( + f_string=f'update{num}', + f_int64=num + 10, + f_timestamp=TIMESTAMPS[num]) self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate) - + results = self.spanner_helper.read_data(self.database_id, 'update') + for i in range(len(results)): + results[i][3] = results[i][3].timestamp_pb() self.assertEqual( - self.spanner_helper.read_data(self.database_id, 'update'), - [['update0', 10, False], ['update1', 11, False]]) + results, + [['update0', 10, False, TIMESTAMPS[0].to_proto()], + ['update1', 11, False, TIMESTAMPS[1].to_proto()]]) def test_spanner_delete(self): self.spanner_helper.insert_values( self.database_id, values=[ - ('delete0', 0, None), - ('delete6', 6, False), - ('delete20', 20, True), + ('delete0', 0, None, TIMESTAMPS[0].to_rfc3339()), + ('delete6', 6, False, TIMESTAMPS[0].to_rfc3339()), + ('delete20', 20, True, TIMESTAMPS[0].to_rfc3339()), ]) def to_row_fn(num): return SpannerTestKey(f_string=f'delete{num}') self.run_write_pipeline(10, to_row_fn, SpannerTestKey, SpannerDelete) - + results = self.spanner_helper.read_data(self.database_id, prefix='delete') + for i in range(len(results)): + results[i][3] = results[i][3].timestamp_pb() self.assertEqual( - self.spanner_helper.read_data(self.database_id, prefix='delete'), - [['delete20', 20, True]]) + results, [['delete20', 20, True, TIMESTAMPS[0].to_proto()]]) def test_spanner_read_query(self): self.insert_read_values('query_read') @@ -215,9 +263,21 @@ def run_read_pipeline(self, prefix, table=None, query=None): assert_that( result, equal_to([ - SpannerTestRow(f_int64=0, f_string=f'{prefix}0', f_boolean=None), - SpannerTestRow(f_int64=1, f_string=f'{prefix}1', f_boolean=True), - SpannerTestRow(f_int64=2, f_string=f'{prefix}2', f_boolean=False), + SpannerTestRow( + f_int64=0, + f_string=f'{prefix}0', + f_boolean=None, + f_timestamp=TIMESTAMPS[0]), + SpannerTestRow( + f_int64=1, + f_string=f'{prefix}1', + f_boolean=True, + f_timestamp=TIMESTAMPS[1]), + SpannerTestRow( + f_int64=2, + f_string=f'{prefix}2', + f_boolean=False, + f_timestamp=TIMESTAMPS[2]), ])) def run_write_pipeline( @@ -242,9 +302,9 @@ def insert_read_values(self, prefix): self.spanner_helper.insert_values( self.database_id, values=[ - (f'{prefix}0', 0, None), - (f'{prefix}1', 1, True), - (f'{prefix}2', 2, False), + (f'{prefix}0', 0, None, TIMESTAMPS[0].to_rfc3339()), + (f'{prefix}1', 1, True, TIMESTAMPS[1].to_rfc3339()), + (f'{prefix}2', 2, False, TIMESTAMPS[2].to_rfc3339()), ]) @@ -288,14 +348,15 @@ def create_database(self, database_id): CREATE TABLE {self.table} ( f_string STRING(1024) NOT NULL, f_int64 INT64, - f_boolean BOOL + f_boolean BOOL, + f_timestamp TIMESTAMP ) PRIMARY KEY (f_string)''' ]) database.create().result(120) def insert_values(self, database_id, values, columns=None): values = values or [] - columns = columns or ('f_string', 'f_int64', 'f_boolean') + columns = columns or ('f_string', 'f_int64', 'f_boolean', 'f_timestamp') with self.instance.database(database_id).batch() as batch: batch.insert( table=self.table,