From 9f19ab95b869adea013a4e545f6a20610e516790 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 8 Apr 2026 01:16:24 +0800 Subject: [PATCH 1/5] [python] Fix decimal serialization for non-compact precision (> 18) --- .../pypaimon/table/row/generic_row.py | 115 +++++++--- paimon-python/pypaimon/tests/decimal_test.py | 209 ++++++++++++++++++ 2 files changed, 292 insertions(+), 32 deletions(-) create mode 100644 paimon-python/pypaimon/tests/decimal_test.py diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 4aa740de7219..27f757948b94 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -29,6 +29,38 @@ from pypaimon.table.row.blob import BlobData +def _decimal_to_unscaled(d: Decimal, scale: int) -> int: + """Convert a Decimal to its unscaled integer value without precision loss. + Raises ArithmeticError if the value has more fractional digits than scale.""" + sign, digits, exponent = d.as_tuple() + int_digits = int(''.join(str(x) for x in digits)) if digits != (0,) else 0 + shift = exponent + scale + if shift >= 0: + unscaled = int_digits * (10 ** shift) + else: + divisor = 10 ** (-shift) + if int_digits % divisor != 0: + raise ArithmeticError( + f"Decimal {d} has more fractional digits than scale {scale}") + unscaled = int_digits // divisor + return -unscaled if sign else unscaled + + +def _parse_type_precision_scale(data_type): + """Parse precision and scale from type string like DECIMAL(38, 10) or TIMESTAMP(6).""" + type_str = str(data_type) + if '(' in type_str and ')' in type_str: + try: + params_str = type_str.split('(')[1].split(')')[0] + parts = [p.strip() for p in params_str.split(',')] + precision = int(parts[0]) + scale = int(parts[1]) if len(parts) > 1 else 0 + return precision, scale + except (ValueError, IndexError): + return 0, 0 + return 0, 0 + + @dataclass class GenericRow(InternalRow): @@ -232,22 +264,28 @@ def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset: int) -> binary_data = cls._parse_binary(bytes_data, base_offset, field_offset) return BlobData.from_bytes(binary_data) + @classmethod + def _unscaled_to_decimal(cls, unscaled_value: int, scale: int) -> Decimal: + sign = 0 if unscaled_value >= 0 else 1 + digits = tuple(int(d) for d in str(abs(unscaled_value))) if unscaled_value != 0 else (0,) + return Decimal((sign, digits, -scale)) + @classmethod def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: - unscaled_long = struct.unpack('> 32) & 0xFFFFFFFF + byte_length = offset_and_len & 0xFFFFFFFF + var_offset = base_offset + cursor + unscaled_bytes = bytes_data[var_offset:var_offset + byte_length] + unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big', signed=True) + return cls._unscaled_to_decimal(unscaled_value, scale) @classmethod def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> datetime: @@ -301,9 +339,23 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: raise ValueError(f"BinaryRow only support AtomicType yet, meet {field.type.__class__}") type_name = field.type.type.upper() - if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', - 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']): - if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING']): + is_var_len_type = any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', + 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']) + is_decimal_type = type_name.startswith('DECIMAL') or type_name.startswith('NUMERIC') + decimal_precision, decimal_scale = _parse_type_precision_scale(field.type) if is_decimal_type else (0, 0) + is_high_precision_decimal = is_decimal_type and decimal_precision > 18 + + if is_var_len_type or is_high_precision_decimal: + if is_high_precision_decimal: + d = value if isinstance(value, Decimal) else Decimal(str(value)) + unscaled_value = _decimal_to_unscaled(d, decimal_scale) + # Convert to big-endian signed bytes (minimal representation) + if unscaled_value == 0: + value_bytes = b'\x00' + else: + byte_length = (unscaled_value.bit_length() + 8) // 8 # +8 for sign bit + value_bytes = unscaled_value.to_bytes(byte_length, byteorder='big', signed=True) + elif any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING']): value_bytes = str(value).encode('utf-8') elif type_name == 'BLOB': value_bytes = value.to_data() @@ -311,14 +363,18 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: value_bytes = bytes(value) length = len(value_bytes) - if length <= cls.MAX_FIX_PART_DATA_SIZE: + if length <= cls.MAX_FIX_PART_DATA_SIZE and not is_high_precision_decimal: fixed_part[field_fixed_offset: field_fixed_offset + length] = value_bytes for j in range(length, 7): fixed_part[field_fixed_offset + j] = 0 header_byte = 0x80 | length fixed_part[field_fixed_offset + 7] = header_byte else: - var_length = cls._round_number_of_bytes_to_nearest_word(len(value_bytes)) + # Non-compact decimal uses fixed 16 bytes, others use 8-byte alignment + if is_high_precision_decimal: + var_length = 16 + else: + var_length = cls._round_number_of_bytes_to_nearest_word(len(value_bytes)) var_value_bytes = value_bytes + b'\x00' * (var_length - length) offset_in_variable_part = current_variable_offset variable_part_data.append(var_value_bytes) @@ -365,6 +421,11 @@ def _serialize_field_value(cls, value: Any, data_type: AtomicType) -> bytes: elif type_name in ['DOUBLE']: return cls._serialize_double(value) elif type_name.startswith('DECIMAL') or type_name.startswith('NUMERIC'): + precision, _ = _parse_type_precision_scale(data_type) + if precision > 18: + raise ValueError( + f"Non-compact decimal (precision={precision}) must be serialized " + f"via the variable-length path in to_bytes(), not _serialize_field_value()") return cls._serialize_decimal(value, data_type) elif type_name.startswith('TIMESTAMP'): return cls._serialize_timestamp(value) @@ -405,20 +466,10 @@ def _serialize_double(cls, value: float) -> bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: - type_str = str(data_type) - if '(' in type_str and ')' in type_str: - try: - precision_scale = type_str.split('(')[1].split(')')[0] - if ',' in precision_scale: - scale = int(precision_scale.split(',')[1]) - else: - scale = 0 - except: - scale = 0 - else: - scale = 0 - - unscaled_value = int(value * (10 ** scale)) + """Serialize compact decimal (precision <= 18) as unscaled long in fixed part.""" + _, scale = _parse_type_precision_scale(data_type) + d = value if isinstance(value, Decimal) else Decimal(str(value)) + unscaled_value = _decimal_to_unscaled(d, scale) return struct.pack(' 0.05 + fields = [ + DataField(0, "d", AtomicType("DECIMAL(4, 2)")), + DataField(1, "d2", AtomicType("DECIMAL(4, 2)")), + ] + row = GenericRow([Decimal("0.05"), None], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + + self.assertEqual(str(result.values[0]), "0.05") + self.assertIsNone(result.values[1]) + + # Another compact value: 0.06 + row2 = GenericRow([Decimal("0.06"), None], fields, RowKind.INSERT) + serialized2 = GenericRowSerializer.to_bytes(row2) + result2 = GenericRowDeserializer.from_bytes(serialized2, fields) + self.assertEqual(str(result2.values[0]), "0.06") + + def test_decimal_not_compact(self): + """Test non-compact decimal (precision > 18) round-trip.""" + # precision=25, scale=5 + fields = [ + DataField(0, "d", AtomicType("DECIMAL(25, 5)")), + DataField(1, "d2", AtomicType("DECIMAL(25, 5)")), + ] + row = GenericRow([Decimal("5.55000"), None], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + + self.assertEqual(str(result.values[0]), "5.55000") + self.assertIsNone(result.values[1]) + + # Another value: 6.55 + row2 = GenericRow([Decimal("6.55000"), None], fields, RowKind.INSERT) + serialized2 = GenericRowSerializer.to_bytes(row2) + result2 = GenericRowDeserializer.from_bytes(serialized2, fields) + self.assertEqual(str(result2.values[0]), "6.55000") + + # Negative value + row3 = GenericRow([Decimal("-123.45000"), None], fields, RowKind.INSERT) + serialized3 = GenericRowSerializer.to_bytes(row3) + result3 = GenericRowDeserializer.from_bytes(serialized3, fields) + self.assertEqual(str(result3.values[0]), "-123.45000") + + def test_decimal_high_precision_large_value(self): + """Test high-precision decimal with large values that exceed long range.""" + fields = [DataField(0, "d", AtomicType("DECIMAL(38, 10)"))] + + test_values = [ + Decimal("12345678901234567890.1234567890"), + Decimal("-99999999999999999999.9999999999"), + Decimal("0E-10"), + ] + + for val in test_values: + with self.subTest(value=val): + row = GenericRow([val], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + self.assertEqual(result.values[0], val) + + def test_decimal_mixed_with_other_types(self): + """Test decimal fields mixed with other types in a single row.""" + fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + DataField(2, "compact_dec", AtomicType("DECIMAL(10, 2)")), + DataField(3, "high_dec", AtomicType("DECIMAL(38, 2)")), + DataField(4, "score", AtomicType("DOUBLE")), + ] + + row = GenericRow( + [42, "test_row", Decimal("12345.67"), Decimal("12312455.22"), 3.14], + fields, RowKind.INSERT + ) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + + self.assertEqual(result.values[0], 42) + self.assertEqual(result.values[1], "test_row") + self.assertEqual(result.values[2], Decimal("12345.67")) + self.assertEqual(result.values[3], Decimal("12312455.22")) + self.assertAlmostEqual(result.values[4], 3.14) + + + def test_decimal_compact_binary_format(self): + """Verify compact decimal binary layout: unscaled long in fixed part.""" + fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))] + row = GenericRow([Decimal("0.05")], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + + # Skip 4-byte arity prefix + data = serialized[4:] + null_bits_size = 8 # ((1 + 63 + 8) // 64) * 8 + field_offset = null_bits_size + unscaled_long = struct.unpack(' unscaled = 5 + self.assertEqual(unscaled_long, 5) + + def test_decimal_not_compact_binary_format(self): + """Verify non-compact decimal binary layout: (offset << 32 | length) in fixed part, + 16-byte big-endian unscaled bytes in variable part. + """ + fields = [DataField(0, "d", AtomicType("DECIMAL(25, 5)"))] + row = GenericRow([Decimal("5.55000")], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + + # Skip 4-byte arity prefix + data = serialized[4:] + null_bits_size = 8 + field_offset = null_bits_size + fixed_part_size = null_bits_size + 1 * 8 + + offset_and_len = struct.unpack('> 32) & 0xFFFFFFFF + byte_length = offset_and_len & 0xFFFFFFFF + + # cursor should point to the variable area (== fixed_part_size) + self.assertEqual(cursor, fixed_part_size) + # variable area should be exactly 16 bytes (matching Java's cursor += 16) + var_area = data[cursor:] + self.assertEqual(len(var_area), 16) + # unscaled bytes are big-endian signed + unscaled_bytes = data[cursor:cursor + byte_length] + unscaled_value = int.from_bytes(unscaled_bytes, byteorder='big', signed=True) + # Decimal("5.55000") with scale=5 => unscaled = 555000 + self.assertEqual(unscaled_value, 555000) + + + def test_decimal_boundary_precision(self): + """Test boundary: DECIMAL(18, ...) is compact, DECIMAL(19, ...) is non-compact.""" + # precision=18: last compact + fields_18 = [DataField(0, "d", AtomicType("DECIMAL(18, 4)"))] + row_18 = GenericRow([Decimal("12345678901234.5678")], fields_18, RowKind.INSERT) + s_18 = GenericRowSerializer.to_bytes(row_18) + r_18 = GenericRowDeserializer.from_bytes(s_18, fields_18) + self.assertEqual(r_18.values[0], Decimal("12345678901234.5678")) + # verify compact: no variable area beyond fixed part + data_18 = s_18[4:] + null_bits_size = 8 + fixed_part_size = null_bits_size + 1 * 8 + self.assertEqual(len(data_18), fixed_part_size) + + # precision=19: first non-compact + fields_19 = [DataField(0, "d", AtomicType("DECIMAL(19, 4)"))] + row_19 = GenericRow([Decimal("12345678901234.5678")], fields_19, RowKind.INSERT) + s_19 = GenericRowSerializer.to_bytes(row_19) + r_19 = GenericRowDeserializer.from_bytes(s_19, fields_19) + self.assertEqual(r_19.values[0], Decimal("12345678901234.5678")) + # verify non-compact: has 16-byte variable area + data_19 = s_19[4:] + self.assertEqual(len(data_19), fixed_part_size + 16) + + def test_decimal_zero_different_scales(self): + """Test zero value with different precisions and scales.""" + test_cases = [ + ("DECIMAL(38, 0)", Decimal("0")), + ("DECIMAL(38, 10)", Decimal("0E-10")), + ("DECIMAL(10, 2)", Decimal("0.00")), + ] + for type_str, val in test_cases: + with self.subTest(type=type_str): + fields = [DataField(0, "d", AtomicType(type_str))] + row = GenericRow([val], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + self.assertEqual(result.values[0], val) + + def test_decimal_truncation_raises(self): + """Serializing a value with more fractional digits than scale should raise.""" + fields = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))] + row = GenericRow([Decimal("1.999")], fields, RowKind.INSERT) + with self.assertRaises(ArithmeticError): + GenericRowSerializer.to_bytes(row) + + +if __name__ == '__main__': + unittest.main() From 9f7d9de6bd3312a7f6e1cb99a82374ad713abd9c Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 8 Apr 2026 01:24:50 +0800 Subject: [PATCH 2/5] fix --- paimon-python/pypaimon/table/row/generic_row.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 27f757948b94..14b17aa92636 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -47,7 +47,7 @@ def _decimal_to_unscaled(d: Decimal, scale: int) -> int: def _parse_type_precision_scale(data_type): - """Parse precision and scale from type string like DECIMAL(38, 10) or TIMESTAMP(6).""" + """Parse precision and scale from type string like DECIMAL(38, 10).""" type_str = str(data_type) if '(' in type_str and ')' in type_str: try: From a2bf4f49d09a54e82ba80b6a39b8887e876b38e5 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 8 Apr 2026 07:51:42 +0800 Subject: [PATCH 3/5] fix --- paimon-python/pypaimon/table/row/generic_row.py | 4 ++-- paimon-python/pypaimon/tests/decimal_test.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 14b17aa92636..9fe4632d2b72 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -339,8 +339,8 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: raise ValueError(f"BinaryRow only support AtomicType yet, meet {field.type.__class__}") type_name = field.type.type.upper() - is_var_len_type = any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', - 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']) + is_var_len_type = any(type_name.startswith(p) for p in [ + 'CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']) is_decimal_type = type_name.startswith('DECIMAL') or type_name.startswith('NUMERIC') decimal_precision, decimal_scale = _parse_type_precision_scale(field.type) if is_decimal_type else (0, 0) is_high_precision_decimal = is_decimal_type and decimal_precision > 18 diff --git a/paimon-python/pypaimon/tests/decimal_test.py b/paimon-python/pypaimon/tests/decimal_test.py index 1da2c7560905..7a27ed128982 100644 --- a/paimon-python/pypaimon/tests/decimal_test.py +++ b/paimon-python/pypaimon/tests/decimal_test.py @@ -113,7 +113,6 @@ def test_decimal_mixed_with_other_types(self): self.assertEqual(result.values[3], Decimal("12312455.22")) self.assertAlmostEqual(result.values[4], 3.14) - def test_decimal_compact_binary_format(self): """Verify compact decimal binary layout: unscaled long in fixed part.""" fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))] @@ -157,7 +156,6 @@ def test_decimal_not_compact_binary_format(self): # Decimal("5.55000") with scale=5 => unscaled = 555000 self.assertEqual(unscaled_value, 555000) - def test_decimal_boundary_precision(self): """Test boundary: DECIMAL(18, ...) is compact, DECIMAL(19, ...) is non-compact.""" # precision=18: last compact From 1af6f0d3b748c1cb9c6dcc06a2779950835a86aa Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 8 Apr 2026 20:55:36 +0800 Subject: [PATCH 4/5] Align decimal handling with Java --- .../pypaimon/table/row/generic_row.py | 50 +++++++---- paimon-python/pypaimon/tests/decimal_test.py | 85 +++++++++++++++++-- 2 files changed, 113 insertions(+), 22 deletions(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 9fe4632d2b72..29afa80d9cf1 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################ +import decimal import struct from datetime import date, datetime, time, timedelta from decimal import Decimal @@ -29,21 +30,24 @@ from pypaimon.table.row.blob import BlobData -def _decimal_to_unscaled(d: Decimal, scale: int) -> int: - """Convert a Decimal to its unscaled integer value without precision loss. - Raises ArithmeticError if the value has more fractional digits than scale.""" - sign, digits, exponent = d.as_tuple() +_DECIMAL_CTX = decimal.Context(prec=100, rounding=decimal.ROUND_HALF_UP) + + +def _decimal_to_unscaled_with_check(d: Decimal, precision: int, scale: int): + """Round decimal with HALF_UP, check precision overflow, and return unscaled value. + Returns (unscaled_int, True) on overflow, (unscaled_int, False) on success.""" + rounded = d.quantize(Decimal(10) ** -scale, context=_DECIMAL_CTX) + sign, digits, exponent = rounded.as_tuple() + # Precision overflow check + if rounded != 0 and len(digits) > precision: + return 0, True int_digits = int(''.join(str(x) for x in digits)) if digits != (0,) else 0 shift = exponent + scale if shift >= 0: unscaled = int_digits * (10 ** shift) else: - divisor = 10 ** (-shift) - if int_digits % divisor != 0: - raise ArithmeticError( - f"Decimal {d} has more fractional digits than scale {scale}") - unscaled = int_digits // divisor - return -unscaled if sign else unscaled + unscaled = int_digits // (10 ** (-shift)) + return (-unscaled if sign else unscaled), False def _parse_type_precision_scale(data_type): @@ -271,8 +275,10 @@ def _unscaled_to_decimal(cls, unscaled_value: int, scale: int) -> Decimal: return Decimal((sign, digits, -scale)) @classmethod - def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: + def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType): precision, scale = _parse_type_precision_scale(data_type) + if precision <= 0: + raise ValueError(f"Decimal requires precision > 0, got {precision}") if precision <= 18: # Compact format: unscaled long stored directly in fixed part unscaled_long = struct.unpack(' precision: + return None + return result @classmethod def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> datetime: @@ -345,10 +356,17 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: decimal_precision, decimal_scale = _parse_type_precision_scale(field.type) if is_decimal_type else (0, 0) is_high_precision_decimal = is_decimal_type and decimal_precision > 18 + # Precision overflow -> null + if is_decimal_type and value is not None: + d = value if isinstance(value, Decimal) else Decimal(str(value)) + unscaled_value, overflow = _decimal_to_unscaled_with_check(d, decimal_precision, decimal_scale) + if overflow: + cls._set_null_bit(fixed_part, 0, i) + struct.pack_into(' bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: """Serialize compact decimal (precision <= 18) as unscaled long in fixed part.""" - _, scale = _parse_type_precision_scale(data_type) + precision, scale = _parse_type_precision_scale(data_type) d = value if isinstance(value, Decimal) else Decimal(str(value)) - unscaled_value = _decimal_to_unscaled(d, scale) + unscaled_value, _ = _decimal_to_unscaled_with_check(d, precision, scale) return struct.pack(' max 99.99 + fields = [DataField(0, "d", AtomicType("DECIMAL(4, 2)"))] + + # 999.99 needs 5 digits total, exceeds precision=4 + row = GenericRow([Decimal("999.99")], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + self.assertIsNone(result.values[0]) + + # 99.999 rounds to 100.00 (5 digits), also overflows + row2 = GenericRow([Decimal("99.999")], fields, RowKind.INSERT) + serialized2 = GenericRowSerializer.to_bytes(row2) + result2 = GenericRowDeserializer.from_bytes(serialized2, fields) + self.assertIsNone(result2.values[0]) + + # 99.99 fits exactly in DECIMAL(4, 2) + row3 = GenericRow([Decimal("99.99")], fields, RowKind.INSERT) + serialized3 = GenericRowSerializer.to_bytes(row3) + result3 = GenericRowDeserializer.from_bytes(serialized3, fields) + self.assertEqual(result3.values[0], Decimal("99.99")) + + def test_decimal_precision_overflow_high_precision(self): + """Precision overflow check also works for non-compact decimals.""" + # DECIMAL(20, 5) can hold 15 integer + 5 fractional digits + fields = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))] + + # This value fits: 15 integer digits + 5 fractional + row = GenericRow([Decimal("123456789012345.12345")], fields, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields) + self.assertEqual(result.values[0], Decimal("123456789012345.12345")) + + # This value overflows: 16 integer digits + 5 fractional = 21 > 20 + row2 = GenericRow([Decimal("1234567890123456.12345")], fields, RowKind.INSERT) + serialized2 = GenericRowSerializer.to_bytes(row2) + result2 = GenericRowDeserializer.from_bytes(serialized2, fields) + self.assertIsNone(result2.values[0]) + + def test_decimal_deserialization_precision_overflow_non_compact(self): + """Non-compact decimal deserialization returns None if precision overflows.""" + # Serialize with DECIMAL(38, 5) which fits, then deserialize as DECIMAL(20, 5) + fields_wide = [DataField(0, "d", AtomicType("DECIMAL(38, 5)"))] + fields_narrow = [DataField(0, "d", AtomicType("DECIMAL(20, 5)"))] + + # 21 digits total exceeds precision=20 + row = GenericRow([Decimal("1234567890123456.12345")], fields_wide, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + result = GenericRowDeserializer.from_bytes(serialized, fields_narrow) + self.assertIsNone(result.values[0]) + + def test_decimal_deserialization_invalid_precision(self): + """Deserialization with precision <= 0 raises ValueError.""" + fields_valid = [DataField(0, "d", AtomicType("DECIMAL(10, 2)"))] + row = GenericRow([Decimal("1.23")], fields_valid, RowKind.INSERT) + serialized = GenericRowSerializer.to_bytes(row) + + fields_bad = [DataField(0, "d", AtomicType("DECIMAL(0, 2)"))] + with self.assertRaises(ValueError): + GenericRowDeserializer.from_bytes(serialized, fields_bad) if __name__ == '__main__': From c5a393bdca1f68f4d271926425b713351f3e01ba Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 8 Apr 2026 22:38:42 +0800 Subject: [PATCH 5/5] fix timestamp --- .../pypaimon/table/row/generic_row.py | 71 ++++++- .../pypaimon/tests/timestamp_test.py | 200 ++++++++++++++++++ 2 files changed, 262 insertions(+), 9 deletions(-) create mode 100644 paimon-python/pypaimon/tests/timestamp_test.py diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 29afa80d9cf1..1460f649d1c2 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################ +import calendar import decimal import struct from datetime import date, datetime, time, timedelta @@ -65,6 +66,28 @@ def _parse_type_precision_scale(data_type): return 0, 0 +_EPOCH = datetime(1970, 1, 1) + + +def _datetime_to_millis_and_nanos(value: datetime): + """Convert datetime to (epoch_millis, nano_of_millisecond) without float arithmetic.""" + epoch_seconds = calendar.timegm(value.timetuple()) + millis = epoch_seconds * 1000 + value.microsecond // 1000 + nano_of_millisecond = (value.microsecond % 1000) * 1000 + return millis, nano_of_millisecond + + +def _millis_nanos_to_datetime(millis: int, nano_of_millisecond: int = 0) -> datetime: + """Convert (epoch_millis, nano_of_millisecond) to datetime. Nanos truncated to micros.""" + total_micros = millis * 1000 + nano_of_millisecond // 1000 + seconds = total_micros // 1_000_000 + micros = total_micros % 1_000_000 + if micros < 0: + seconds -= 1 + micros += 1_000_000 + return _EPOCH + timedelta(seconds=seconds, microseconds=micros) + + @dataclass class GenericRow(InternalRow): @@ -280,11 +303,11 @@ def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, if precision <= 0: raise ValueError(f"Decimal requires precision > 0, got {precision}") if precision <= 18: - # Compact format: unscaled long stored directly in fixed part + # Compact: unscaled long in fixed part unscaled_long = struct.unpack('> 32) & 0xFFFFFFFF byte_length = offset_and_len & 0xFFFFFFFF @@ -300,8 +323,18 @@ def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, @classmethod def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> datetime: - millis = struct.unpack('> 32) & 0xFFFFFFFF + millis = struct.unpack(' date: @@ -353,8 +386,11 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: is_var_len_type = any(type_name.startswith(p) for p in [ 'CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']) is_decimal_type = type_name.startswith('DECIMAL') or type_name.startswith('NUMERIC') + is_timestamp_type = type_name.startswith('TIMESTAMP') decimal_precision, decimal_scale = _parse_type_precision_scale(field.type) if is_decimal_type else (0, 0) is_high_precision_decimal = is_decimal_type and decimal_precision > 18 + timestamp_precision = _parse_type_precision_scale(field.type)[0] if is_timestamp_type else 0 + is_non_compact_timestamp = is_timestamp_type and timestamp_precision > 3 # Precision overflow -> null if is_decimal_type and value is not None: @@ -365,9 +401,21 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: struct.pack_into(' bytes: header_byte = 0x80 | length fixed_part[field_fixed_offset + 7] = header_byte else: - # Non-compact decimal uses fixed 16 bytes, others use 8-byte alignment + # Non-compact decimal: fixed 16 bytes; others: 8-byte aligned if is_high_precision_decimal: var_length = 16 else: @@ -446,6 +494,11 @@ def _serialize_field_value(cls, value: Any, data_type: AtomicType) -> bytes: f"via the variable-length path in to_bytes(), not _serialize_field_value()") return cls._serialize_decimal(value, data_type) elif type_name.startswith('TIMESTAMP'): + precision = _parse_type_precision_scale(data_type)[0] + if precision > 3: + raise ValueError( + f"Non-compact timestamp (precision={precision}) must be serialized " + f"via the variable-length path in to_bytes(), not _serialize_field_value()") return cls._serialize_timestamp(value) elif type_name in ['DATE']: return cls._serialize_date(value) + b'\x00' * 4 @@ -484,7 +537,7 @@ def _serialize_double(cls, value: float) -> bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: - """Serialize compact decimal (precision <= 18) as unscaled long in fixed part.""" + """Compact decimal: unscaled long in fixed part.""" precision, scale = _parse_type_precision_scale(data_type) d = value if isinstance(value, Decimal) else Decimal(str(value)) unscaled_value, _ = _decimal_to_unscaled_with_check(d, precision, scale) @@ -494,7 +547,7 @@ def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: def _serialize_timestamp(cls, value: datetime) -> bytes: if value.tzinfo is not None: raise RuntimeError("datetime tzinfo not supported yet") - millis = int(value.timestamp() * 1000) + millis, _ = _datetime_to_millis_and_nanos(value) return struct.pack('> 32) & 0xFFFFFFFF + nano_of_millisecond = offset_and_nano & 0xFFFFFFFF + + # cursor should point to variable area + self.assertEqual(cursor, fixed_part_size) + # 123456 us = 123 ms + 456 us = 123 ms + 456000 ns + self.assertEqual(nano_of_millisecond, 456000) + + # Variable area contains epoch millis + var_millis = struct.unpack('