diff --git a/CHANGELOG.md b/CHANGELOG.md index c021310..662293e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Bug Fixes 1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode. +1. [#197](https://github.com/InfluxCommunity/influxdb3-python/pull/197): InfluxDB 3 Core/Enterprise write errors details handling. ## 0.17.0 [2026-01-08] diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index 4e2141f..d2b4b2d 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -63,9 +63,41 @@ def _get_message(self, response): def get(d, key): if not key or d is None: return d + if not isinstance(d, dict): + return None return get(d.get(key[0]), key[1:]) try: node = json.loads(response.data) + if isinstance(node, dict): + # InfluxDB v3 error format: { "code": "...", "message": "..." } + code = node.get("code") + message = node.get("message") + if message: + return f"{code}: {message}" if code else message + # InfluxDB v3 write error format: + # { + # "error": "...", + # "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ] + # } + error_text = node.get("error") + data = node.get("data") + if error_text and isinstance(data, list): + details = [] + for item in data: + if not isinstance(item, dict): + continue + line_number = item.get("line_number") + error_message = item.get("error_message") + original_line = item.get("original_line") + if line_number is not None and error_message and original_line: + details.append( + f"\tline {line_number}: {error_message} ({original_line})" + ) + elif error_message: + details.append(f"\t{error_message}") + if details: + return error_text + ":\n" + "\n".join(details) + return error_text for key in [['message'], ['data', 'error_message'], ['error']]: value = get(node, key) if value is not None: diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 22a8b60..411c9aa 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -124,6 +124,58 @@ def test_api_error_unknown(self): self._test_api_error(response_body) self.assertEqual(response_body, err.exception.message) + def test_api_error_v3_with_detail(self): + cases = [ + # all details available + ( + "two-line details", + '{"error":"partial write of line protocol occurred","data":[' + '{"error_message":"invalid column type for column \'v\', expected iox::column_type::field::float, ' + 'got iox::column_type::field::uinteger","line_number":2,"original_line":"**.DBG.remote_***"},' + '{"error_message":"invalid column type for column \'v\', expected iox::column_type::field::float, ' + 'got iox::column_type::field::uinteger","line_number":3,"original_line":"***.INF.remote_***"}' + ']}', + "partial write of line protocol occurred:\n" + "\tline 2: invalid column type for column 'v', expected iox::column_type::field::float, " + "got iox::column_type::field::uinteger (**.DBG.remote_***)\n" + "\tline 3: invalid column type for column 'v', expected iox::column_type::field::float, " + "got iox::column_type::field::uinteger (***.INF.remote_***)", + ), + # error_message only (no line_number/original_line) + ( + "message-only detail", + '{"error":"partial write of line protocol occurred","data":[' + '{"error_message":"only error message"}]}', + "partial write of line protocol occurred:\n" + "\tonly error message", + ), + # non-dict item in data list is skipped + ( + "non-dict item skipped", + '{"error":"partial write of line protocol occurred","data":[null,' + '{"error_message":"bad line","line_number":2,"original_line":"bad lp"}]}', + "partial write of line protocol occurred:\n" + "\tline 2: bad line (bad lp)", + ), + # details empty -> return error_text + ( + "no detail fields", + '{"error":"partial write of line protocol occurred","data":[{"line_number":2}]}', + "partial write of line protocol occurred", + ), + # data is not a dict when resolving fallback keys + ( + "data not dict for fallback", + '{"error":"data not list","data":"oops"}', + "data not list", + ), + ] + for name, response_body, expected in cases: + with self.subTest(name): + with self.assertRaises(InfluxDBError) as err: + self._test_api_error(response_body) + self.assertEqual(expected, err.exception.message) + def test_api_error_headers(self): body = '{"error": "test error"}' body_dic = json.loads(body) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 4f7b7ef..df700ee 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -12,6 +12,7 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ WriteType, InfluxDB3ClientQueryError +from influxdb_client_3.write_client.rest import ApiException from influxdb_client_3.exceptions import InfluxDBError from tests.util import asyncio_run, lp_to_py_object @@ -124,6 +125,39 @@ def test_write_and_query(self): self.assertEqual(test_id, df['test_id'][0]) self.assertEqual(123.0, df['value'][0]) + def test_v3_error(self): + measurement = f'test{random_hex(3)}'.lower() + lp = "\n".join([ + f"{measurement} v=1i 1770291280", + f"{measurement} v=1 1770291281", + ]) + + with InfluxDBClient3( + host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options( + write_options=WriteOptions( + write_type=WriteType.synchronous, + no_sync=True + ) + ) + ) as client: + try: + client.write(lp) + self.fail("Expected InfluxDBError from invalid line protocol.") + except ApiException as err: + if "Server doesn't support write with no_sync=true" in str(err): + self.skipTest("no_sync not supported by this server.") + msg = err.message + self.assertIn("partial write of line protocol occurred", msg) + self.assertIn(( + "invalid column type for column 'v', expected iox::column_type::field::integer, " + "got iox::column_type::field::float" + ), msg) + self.assertIn("line 2", msg) + self.assertIn(measurement, msg) + def test_auth_error_token(self): self.client = InfluxDBClient3(host=self.host, database=self.database, token='fake token') test_id = time.time_ns()