From 69f24ae25d9ed913d09f852c0e3cb48c18642c4a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 5 May 2026 09:55:10 +0200 Subject: [PATCH 01/30] feat: support partial writes --- README.md | 45 +++++ influxdb_client_3/__init__.py | 40 +++- influxdb_client_3/exceptions/__init__.py | 3 +- influxdb_client_3/exceptions/exceptions.py | 182 ++++++++++++++++-- .../write_client/client/write_api.py | 30 ++- .../write_client/service/write_service.py | 36 ++-- tests/test_api_client.py | 54 +++++- tests/test_influxdb_client_3.py | 29 +++ tests/test_influxdb_client_3_integration.py | 14 +- tests/test_write_local_server.py | 72 +++++-- 10 files changed, 434 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index b1f4734..59cc18d 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,51 @@ client.write_dataframe( ) ``` +### Accept partial writes and inspect failed lines +`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines. +On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`. + +```python +from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3.exceptions import InfluxDBPartialWriteError + +client = InfluxDBClient3(host="http://localhost:8181", token="token", database="db") +lp = "m v=1i 1\nm v=1.2 2" + +try: + client.write(lp) # accept_partial=True by default +except InfluxDBPartialWriteError as e: + for line_err in e.line_errors: + print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})") +``` + +Disable partial writes: +```python +from influxdb_client_3 import WriteOptions, write_client_options + +client = InfluxDBClient3( + host="http://localhost:8181", + token="token", + database="db", + write_client_options=write_client_options( + write_options=WriteOptions(accept_partial=False) + ), +) +``` + +### V2 compatibility mode (Clustered) +Set `use_v2_api=True` to route writes through `/api/v2/write` for Clustered/v2-compatible backends. + +`use_v2_api` can be configured by: +- `WriteOptions(use_v2_api=True)` +- constructor kwarg: `write_use_v2_api=True` +- env var: `INFLUX_WRITE_USE_V2_API=true` + +When `use_v2_api=True`: +- `accept_partial` is ignored by the backend +- `no_sync=True` is invalid and rejected before dispatch with: + `invalid write options: no_sync cannot be used with use_v2_api` + ## Querying ### Querying with SQL diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 888a455..43adf77 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -11,7 +11,7 @@ from pyarrow import ArrowException from influxdb_client_3.exceptions import InfluxDB3ClientQueryError -from influxdb_client_3.exceptions import InfluxDBError +from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder from influxdb_client_3.read_file import UploadFile from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point @@ -29,6 +29,8 @@ INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC" +INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL" +INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API" INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT" INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT" INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION" @@ -155,19 +157,23 @@ def _parse_gzip_threshold(threshold: str) -> int: return threshold -def _parse_write_no_sync(write_no_sync: str): +def _parse_write_bool(value): """ Parses and validates the provided write no sync value. This function ensures that the given value is a valid boolean, and it raises an appropriate error if the value is not valid. - :param write_no_sync: The input value to be parsed and validated. + :param value: The input value to be parsed and validated. :type write_no_sync: Any :return: The validated write no sync value as an boolean. :rtype: bool """ - return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes'] + return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes'] + + +def _parse_write_no_sync(write_no_sync: str): + return _parse_write_bool(write_no_sync) def _parse_timeout(to: str) -> int: @@ -233,6 +239,8 @@ def __init__( :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key str query_timeout: int value used to set the client query API timeout in milliseconds. :key str write_timeout: int value used to set the client write API timeout in milliseconds. + :key bool write_accept_partial: allow partial writes when some lines fail. + :key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint. :key list[str] profilers: list of enabled Flux profilers """ self._org = org if org is not None else "default" @@ -243,6 +251,8 @@ def __init__( write_type = DefaultWriteOptions.write_type.value write_precision = DefaultWriteOptions.write_precision.value write_no_sync = DefaultWriteOptions.no_sync.value + write_accept_partial = DefaultWriteOptions.accept_partial.value + write_use_v2_api = DefaultWriteOptions.use_v2_api.value write_timeout = DefaultWriteOptions.timeout.value if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: @@ -250,15 +260,25 @@ def __init__( write_type = getattr(write_opts, 'write_type', write_type) write_precision = getattr(write_opts, 'write_precision', write_precision) write_no_sync = getattr(write_opts, 'no_sync', write_no_sync) + write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial) + write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api) write_timeout = getattr(write_opts, 'timeout', write_timeout) if kw_keys.__contains__('write_timeout'): write_timeout = kwargs.get('write_timeout') + if kw_keys.__contains__('write_accept_partial'): + write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial')) + + if kw_keys.__contains__('write_use_v2_api'): + write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api')) + write_options = WriteOptions( write_type=write_type, write_precision=write_precision, no_sync=write_no_sync, + accept_partial=write_accept_partial, + use_v2_api=write_use_v2_api, ) self._write_client_options = { @@ -347,7 +367,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC) if write_no_sync is not None: - write_options.no_sync = _parse_write_no_sync(write_no_sync) + write_options.no_sync = _parse_write_bool(write_no_sync) + + write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL) + if write_accept_partial is not None: + write_options.accept_partial = _parse_write_bool(write_accept_partial) + + write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API) + if write_use_v2_api is not None: + write_options.use_v2_api = _parse_write_bool(write_use_v2_api) precision = os.getenv(INFLUX_PRECISION) if precision is not None: @@ -404,7 +432,7 @@ def write(self, record=None, database=None, **kwargs): try: return self._write_api.write(bucket=database, record=record, **kwargs) - except InfluxDBError as e: + except (InfluxDBError, InfluxDBPartialWriteError) as e: raise e def write_dataframe( diff --git a/influxdb_client_3/exceptions/__init__.py b/influxdb_client_3/exceptions/__init__.py index d725d03..eec96cb 100644 --- a/influxdb_client_3/exceptions/__init__.py +++ b/influxdb_client_3/exceptions/__init__.py @@ -1,3 +1,4 @@ # flake8: noqa -from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError +from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError, InfluxDBPartialWriteError, \ + InfluxDBPartialWriteLineError diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index d2b4b2d..f7004ea 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -1,6 +1,9 @@ """Exceptions utils for InfluxDB.""" +import json import logging +from dataclasses import dataclass +from typing import List, Optional, Tuple from urllib3 import HTTPResponse @@ -39,6 +42,114 @@ def __init__(self, error_message, *args, **kwargs): self.message = error_message +def _is_partial_write_error(error_message) -> bool: + if not isinstance(error_message, str) or len(error_message) == 0: + return False + normalized = error_message.lower() + return ( + "partial write of line protocol occurred" in normalized or + "parsing failed for write_lp endpoint" in normalized + ) + + +def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]: + if item is None: + return None + if not isinstance(item, dict): + raise ValueError("array item is not an object") + + error_message = item.get("error_message") + if not isinstance(error_message, str): + raise ValueError("error_message must be string") + if len(error_message) == 0: + return None + + line_number_raw = item.get("line_number") + if line_number_raw is None: + line_number = 0 + elif isinstance(line_number_raw, int): + line_number = line_number_raw + else: + raise ValueError("line_number must be int") + + original_line_raw = item.get("original_line") + if original_line_raw is None: + original_line = "" + elif isinstance(original_line_raw, str): + original_line = original_line_raw + else: + raise ValueError("original_line must be string") + + return error_message, line_number, original_line + + +def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str]]]: + if not isinstance(data, list): + return None + line_errors: List[Tuple[str, int, str]] = [] + try: + for item in data: + parsed = _parse_partial_write_data_item(item) + if parsed is None: + continue + line_errors.append(parsed) + except ValueError: + return None + return line_errors if len(line_errors) > 0 else None + + +def _parse_raw_array_details(data) -> Optional[List[str]]: + if not isinstance(data, list): + return None + details: List[str] = [] + for item in data: + if item is None: + continue + raw = json.dumps(item, separators=(',', ':')) + if raw and raw.lower() != "null": + details.append(raw) + return details + + +def _parse_typed_partial_write_object(data) -> Optional[Tuple[str, int, str]]: + if data is None: + return None + try: + return _parse_partial_write_data_item(data) + except ValueError: + return None + + +def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]: + details: List[str] = [] + for error_message, line_number, original_line in line_errors: + if line_number != 0 and original_line != "": + details.append(f"\tline {line_number}: {error_message} ({original_line})") + elif error_message: + details.append(f"\t{error_message}") + return details + + +def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str]], List[str]]: + if data is None: + return [], [] + + typed_array = _parse_typed_partial_write_array(data) + if typed_array is not None: + return typed_array, _format_partial_write_details(typed_array) + + raw_details = _parse_raw_array_details(data) + if raw_details is not None: + return [], raw_details + + typed_single = _parse_typed_partial_write_object(data) + if typed_single is not None: + line_errors = [typed_single] + return line_errors, _format_partial_write_details(line_errors) + + return [], [] + + # This error is for all write operations class InfluxDBError(InfluxDB3ClientError): """Raised when a server error occurs.""" @@ -56,10 +167,7 @@ def __init__(self, response: HTTPResponse = None, message: str = None): super().__init__(self.message) def _get_message(self, response): - # Body if response.data: - import json - def get(d, key): if not key or d is None: return d @@ -80,23 +188,15 @@ def get(d, key): # "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ] # } error_text = node.get("error") - data = node.get("data") - if error_text and isinstance(data, list): - details = [] - for item in data: - if not isinstance(item, dict): - continue - line_number = item.get("line_number") - error_message = item.get("error_message") - original_line = item.get("original_line") - if line_number is not None and error_message and original_line: - details.append( - f"\tline {line_number}: {error_message} ({original_line})" - ) - elif error_message: - details.append(f"\t{error_message}") + if error_text and _is_partial_write_error(error_text): + _, details = _parse_partial_write_line_error_info(node.get("data")) if details: - return error_text + ":\n" + "\n".join(details) + return error_text + ":\n" + "\n".join( + detail if detail.startswith("\t") else f"\t{detail}" + for detail in details + ) + return error_text + if error_text: return error_text for key in [['message'], ['data', 'error_message'], ['error']]: value = get(node, key) @@ -119,3 +219,47 @@ def get(d, key): def getheaders(self): """Helper method to make response headers more accessible.""" return self.response.getheaders() + + +@dataclass(frozen=True) +class InfluxDBPartialWriteLineError: + line_number: int + error_message: str + original_line: str + + +class InfluxDBPartialWriteError(InfluxDBError): + """Structured partial-write error with per-line failures.""" + + def __init__(self, response: HTTPResponse, message: str, line_errors: List[InfluxDBPartialWriteLineError]): + super().__init__(response=response) + self.message = message + self.line_errors = line_errors + self.args = (self.message,) + + @classmethod + def from_response(cls, response: HTTPResponse): + if response is None or not response.data: + return None + try: + node = json.loads(response.data) + except Exception: + return None + if not isinstance(node, dict): + return None + error_text = node.get("error") + if not _is_partial_write_error(error_text): + return None + parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data")) + if len(parsed_line_errors) == 0: + return None + line_errors = [ + InfluxDBPartialWriteLineError( + line_number=line_number, + error_message=error_message, + original_line=original_line, + ) + for error_message, line_number, original_line in parsed_line_errors + ] + message = InfluxDBError(response=response).message + return cls(response=response, message=message, line_errors=line_errors) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 250a07e..5c67875 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -29,6 +29,8 @@ DEFAULT_WRITE_NO_SYNC = False DEFAULT_WRITE_TIMEOUT = 10_000 +DEFAULT_WRITE_ACCEPT_PARTIAL = True +DEFAULT_WRITE_USE_V2_API = False # Kwargs consumed during serialization that should not be passed to _post_write SERIALIZER_KWARGS = { @@ -66,6 +68,8 @@ class DefaultWriteOptions(Enum): write_type = WriteType.synchronous write_precision = DEFAULT_WRITE_PRECISION no_sync = DEFAULT_WRITE_NO_SYNC + accept_partial = DEFAULT_WRITE_ACCEPT_PARTIAL + use_v2_api = DEFAULT_WRITE_USE_V2_API timeout = DEFAULT_WRITE_TIMEOUT @@ -84,6 +88,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, write_precision=DEFAULT_WRITE_PRECISION, no_sync=DEFAULT_WRITE_NO_SYNC, tag_order=None, + accept_partial=DEFAULT_WRITE_ACCEPT_PARTIAL, + use_v2_api=DEFAULT_WRITE_USE_V2_API, timeout=DEFAULT_WRITE_TIMEOUT, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ @@ -103,7 +109,9 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_precision: precision to use when writing points to InfluxDB :param no_sync: skip waiting for WAL persistence on write + :param accept_partial: allow partial writes when some lines fail :param tag_order: optional list of tag names used to prioritize tag serialization order + :param use_v2_api: use /api/v2/write compatibility endpoint :param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000 :param write_scheduler: """ @@ -121,8 +129,14 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_precision = write_precision self.timeout = timeout self.no_sync = no_sync + self.accept_partial = accept_partial + self.use_v2_api = use_v2_api self.tag_order = sanitize_tag_order(tag_order) + def validate(self): + if self.use_v2_api and self.no_sync: + raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") + def to_retry_strategy(self, **kwargs): """ Create a Retry strategy from write options. @@ -385,6 +399,8 @@ def write(self, bucket: str, org: str = None, if write_precision is None: write_precision = self._write_options.write_precision + self._write_options.validate() + if 'tag_order' in kwargs: kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order')) else: @@ -395,6 +411,8 @@ def write(self, bucket: str, org: str = None, write_precision, **kwargs) no_sync = self._write_options.no_sync + accept_partial = self._write_options.accept_partial + use_v2_api = self._write_options.use_v2_api payloads = defaultdict(list) self._serialize(record, write_precision, payloads, **kwargs) @@ -403,7 +421,8 @@ def write(self, bucket: str, org: str = None, def write_payload(payload): final_string = b'\n'.join(payload[1]) - return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs) + return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, + accept_partial, use_v2_api, **kwargs) results = list(map(write_payload, payloads.items())) if not _async_req: @@ -584,21 +603,26 @@ def _retry_callback_delegate(exception): _retry_callback_delegate = None no_sync = self._write_options.no_sync + accept_partial = self._write_options.accept_partial + use_v2_api = self._write_options.use_v2_api retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate) self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, - batch_item.key.precision, no_sync, urlopen_kw={'retries': retry}, **kwargs) + batch_item.key.precision, no_sync, accept_partial, use_v2_api, + urlopen_kw={'retries': retry}, **kwargs) logger.debug("Write request finished %s", batch_item) return _BatchResponse(data=batch_item) - def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs): + def _post_write(self, _async_req, bucket, org, body, precision, no_sync, accept_partial, use_v2_api, **kwargs): # Filter out serializer-specific kwargs before passing to _post_write http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, no_sync=no_sync, + accept_partial=accept_partial, + use_v2_api=use_v2_api, async_req=_async_req, content_type="text/plain; charset=utf-8", **http_kwargs) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index b349b41..c0f954a 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -8,6 +8,7 @@ from influxdb_client_3.write_client.domain.write_precision_converter import WritePrecisionConverter from influxdb_client_3.write_client.rest import ApiException from influxdb_client_3.write_client.service._base_service import _BaseService +from influxdb_client_3.exceptions import InfluxDBPartialWriteError class WriteService(_BaseService): @@ -158,11 +159,13 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, collection_formats={}, urlopen_kw=kwargs.get('urlopen_kw', None)) except ApiException as e: - no_sync = 'no_sync' in local_var_params and local_var_params['no_sync'] - if no_sync and e.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = "Server doesn't support write with no_sync=true " \ - "(supported by InfluxDB 3 Core/Enterprise servers only)." + use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] + if not use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = "Server doesn't support v3 write API. Set use_v2_api=True for v2 compatibility endpoint." raise ApiException(status=0, reason=message) + partial = InfluxDBPartialWriteError.from_response(e.response) + if partial is not None: + raise partial raise e async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 @@ -210,7 +213,8 @@ async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D40 def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 local_var_params = dict(locals()) - all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', 'accept', 'org_id', 'precision', 'no_sync'] # noqa: E501 + all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', + 'accept', 'org_id', 'precision', 'no_sync', 'accept_partial', 'use_v2_api'] # noqa: E501 self._check_operation_params('post_write', all_params, local_var_params) # verify the required parameter 'org' is set if ('org' not in local_var_params or @@ -228,26 +232,30 @@ def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D path_params = {} query_params = [] + use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] no_sync = 'no_sync' in local_var_params and local_var_params['no_sync'] + accept_partial = local_var_params['accept_partial'] if 'accept_partial' in local_var_params else True if 'org' in local_var_params: query_params.append(('org', local_var_params['org'])) # noqa: E501 if 'org_id' in local_var_params: query_params.append(('orgID', local_var_params['org_id'])) # noqa: E501 if 'bucket' in local_var_params: - query_params.append(('db' if no_sync else 'bucket', local_var_params['bucket'])) # noqa: E501 - if no_sync: - # Setting no_sync=true is supported only in the v3 API. - path = '/api/v3/write_lp' + query_params.append(('bucket' if use_v2_api else 'db', local_var_params['bucket'])) # noqa: E501 + + if use_v2_api: + path = '/api/v2/write' if 'precision' in local_var_params: precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 - query_params.append(('no_sync', 'true')) + query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 else: - # By default, use the v2 API. - path = '/api/v2/write' + path = '/api/v3/write_lp' if 'precision' in local_var_params: precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 + query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 + if no_sync: + query_params.append(('no_sync', 'true')) + if accept_partial is False: + query_params.append(('accept_partial', 'false')) header_params = {} if 'zap_trace_span' in local_var_params: diff --git a/tests/test_api_client.py b/tests/test_api_client.py index e5600b5..85e77d0 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -9,7 +9,7 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.exceptions import InfluxDBError +from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -114,7 +114,8 @@ def test_api_error_oss_with_detail(self): 'in line protocol for field \'val\' on line 1"}}') with self.assertRaises(InfluxDBError) as err: self._test_api_error(response_body) - self.assertEqual('invalid field value in line protocol for field \'val\' on line 1', err.exception.message) + self.assertEqual("parsing failed for write_lp endpoint:\n\tinvalid field value in line protocol for field " + "'val' on line 1", err.exception.message) def test_api_error_unknown(self): response_body = '{"detail":"no info"}' @@ -138,6 +139,7 @@ def test_api_error_v3_with_detail(self): "got iox::column_type::field::uinteger (**.DBG.remote_***)\n" "\tline 3: invalid column type for column 'v', expected iox::column_type::field::float, " "got iox::column_type::field::uinteger (***.INF.remote_***)", + True, ), # error_message only (no line_number/original_line) ( @@ -146,6 +148,7 @@ def test_api_error_v3_with_detail(self): '{"error_message":"only error message"}]}', "partial write of line protocol occurred:\n" "\tonly error message", + True, ), # non-dict item in data list is skipped ( @@ -154,25 +157,68 @@ def test_api_error_v3_with_detail(self): '{"error_message":"bad line","line_number":2,"original_line":"bad lp"}]}', "partial write of line protocol occurred:\n" "\tline 2: bad line (bad lp)", + True, ), # details empty -> return error_text ( "no detail fields", '{"error":"partial write of line protocol occurred","data":[{"line_number":2}]}', - "partial write of line protocol occurred", + "partial write of line protocol occurred:\n" + "\t{\"line_number\":2}", + False, + ), + # typed parse fails due line_number type -> raw fallback details + ( + "textual line_number falls back to raw", + '{"error":"partial write of line protocol occurred","data":[{"error_message":"bad line","line_number":"x","original_line":"bad lp"}]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}", + False, + ), + # mixed valid + malformed in array -> raw fallback for whole array + ( + "mixed array malformed item falls back to raw", + '{"error":"partial write of line protocol occurred","data":[{"error_message":"bad line","line_number":2,"original_line":"bad lp"},1]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" + "\t1", + False, ), # data is not a dict when resolving fallback keys ( "data not dict for fallback", '{"error":"data not list","data":"oops"}', "data not list", + False, ), ] - for name, response_body, expected in cases: + for name, response_body, expected, is_partial in cases: with self.subTest(name): with self.assertRaises(InfluxDBError) as err: self._test_api_error(response_body) self.assertEqual(expected, err.exception.message) + if is_partial: + self.assertIsInstance(err.exception, InfluxDBPartialWriteError) + self.assertGreaterEqual(len(err.exception.line_errors), 1) + else: + self.assertNotIsInstance(err.exception, InfluxDBPartialWriteError) + + def test_api_error_v3_parsing_failed_object_returns_partial_error(self): + response_body = ('{"error":"parsing failed for write_lp endpoint","data":' + '{"error_message":"invalid field value","line_number":2,"original_line":"m,t=a f=bad"}}') + with self.assertRaises(InfluxDBPartialWriteError) as err: + self._test_api_error(response_body) + self.assertEqual(1, len(err.exception.line_errors)) + self.assertEqual(2, err.exception.line_errors[0].line_number) + + def test_api_error_v3_partial_write_with_message_only_object_returns_partial_error(self): + response_body = ('{"error":"partial write of line protocol occurred","data":' + '{"error_message":"only error message"}}') + with self.assertRaises(InfluxDBPartialWriteError) as err: + self._test_api_error(response_body) + self.assertEqual(1, len(err.exception.line_errors)) + self.assertEqual(0, err.exception.line_errors[0].line_number) + self.assertEqual("", err.exception.line_errors[0].original_line) def test_api_error_headers(self): body = '{"error": "test error"}' diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index a37339f..9e38b40 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -123,6 +123,10 @@ def test_default_write_options(self): self.assertEqual(DefaultWriteOptions.write_type.value, client._write_client_options["write_options"].write_type) self.assertEqual(DefaultWriteOptions.no_sync.value, client._write_client_options["write_options"].no_sync) + self.assertEqual(DefaultWriteOptions.accept_partial.value, + client._write_client_options["write_options"].accept_partial) + self.assertEqual(DefaultWriteOptions.use_v2_api.value, + client._write_client_options["write_options"].use_v2_api) self.assertEqual(DefaultWriteOptions.write_precision.value, client._write_client_options["write_options"].write_precision) self.assertEqual(DefaultWriteOptions.timeout.value, client._write_client_options["write_options"].timeout) @@ -215,6 +219,8 @@ def test_default_client(self): expected_precision = DefaultWriteOptions.write_precision.value expected_write_type = DefaultWriteOptions.write_type.value expected_no_sync = DefaultWriteOptions.no_sync.value + expected_accept_partial = DefaultWriteOptions.accept_partial.value + expected_use_v2_api = DefaultWriteOptions.use_v2_api.value import os try: @@ -237,11 +243,15 @@ def verify_client_write_options(c): self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) self.assertEqual(write_options.no_sync, expected_no_sync) + self.assertEqual(write_options.accept_partial, expected_accept_partial) + self.assertEqual(write_options.use_v2_api, expected_use_v2_api) self.assertEqual(write_options.tag_order, []) self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) self.assertEqual(c._write_api._write_options.no_sync, expected_no_sync) + self.assertEqual(c._write_api._write_options.accept_partial, expected_accept_partial) + self.assertEqual(c._write_api._write_options.use_v2_api, expected_use_v2_api) self.assertEqual(c._write_api._write_options.tag_order, []) env_client = InfluxDBClient3.from_env() @@ -254,6 +264,7 @@ def verify_client_write_options(c): 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_AUTH_SCHEME': 'custom_scheme', 'INFLUX_GZIP_THRESHOLD': '2000', 'INFLUX_WRITE_NO_SYNC': 'true', + 'INFLUX_WRITE_ACCEPT_PARTIAL': 'false', 'INFLUX_WRITE_USE_V2_API': 'true', 'INFLUX_WRITE_TIMEOUT': '1234', 'INFLUX_QUERY_TIMEOUT': '5678'}) def test_from_env_all_env_vars_set(self): client = InfluxDBClient3.from_env() @@ -268,6 +279,8 @@ def test_from_env_all_env_vars_set(self): write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.write_precision, WritePrecision.MS) self.assertEqual(write_options.no_sync, True) + self.assertEqual(write_options.accept_partial, False) + self.assertEqual(write_options.use_v2_api, True) self.assertEqual(1234, write_options.timeout) self.assertEqual(5.678, client._query_api._default_timeout) @@ -340,6 +353,22 @@ def test_parse_write_no_sync_anything_else_is_false(self): write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.no_sync, False) + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}) + def test_parse_write_accept_partial_false(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.accept_partial, False) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_USE_V2_API': 'true'}) + def test_parse_write_use_v2_api_true(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.use_v2_api, True) + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_TIMEOUT': '6789'}) def test_parse_valid_write_timeout(self): diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index df700ee..e39b88a 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -12,8 +12,7 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ WriteType, InfluxDB3ClientQueryError -from influxdb_client_3.write_client.rest import ApiException -from influxdb_client_3.exceptions import InfluxDBError +from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError from tests.util import asyncio_run, lp_to_py_object @@ -136,19 +135,12 @@ def test_v3_error(self): host=self.host, database=self.database, token=self.token, - write_client_options=write_client_options( - write_options=WriteOptions( - write_type=WriteType.synchronous, - no_sync=True - ) - ) + write_client_options=write_client_options(write_options=WriteOptions(write_type=WriteType.synchronous)) ) as client: try: client.write(lp) self.fail("Expected InfluxDBError from invalid line protocol.") - except ApiException as err: - if "Server doesn't support write with no_sync=true" in str(err): - self.skipTest("no_sync not supported by this server.") + except InfluxDBPartialWriteError as err: msg = err.message self.assertIn("partial write of line protocol occurred", msg) self.assertIn(( diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index ab71ff2..5cdb7ff 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -38,8 +38,8 @@ def test_write_default_params(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "ns"})) + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) def test_write_with_write_options(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -56,8 +56,8 @@ def test_write_with_write_options(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond"})) def test_write_with_no_sync_true(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -77,23 +77,55 @@ def test_write_with_no_sync_true(self, httpserver: HTTPServer): method="POST", uri="/api/v3/write_lp", query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"})) - def test_write_with_no_sync_true_on_v2_server(self, httpserver: HTTPServer): - self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) + def test_write_with_accept_partial_false(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) - client = InfluxDBClient3( + InfluxDBClient3( host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", write_client_options=write_client_options( write_options=WriteOptions( write_type=WriteType.synchronous, - no_sync=True))) + accept_partial=False + ) + ) + ).write(self.SAMPLE_RECORD) - with pytest.raises(ApiException, match=r".*Server doesn't support write with no_sync=true " - r"\(supported by InfluxDB 3 Core/Enterprise servers only\)."): + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond", "accept_partial": "false"})) + + def test_write_with_use_v2_api_true(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions( + write_type=WriteType.synchronous, + write_precision=WritePrecision.US, + use_v2_api=True, + accept_partial=False + ) + ) + ).write(self.SAMPLE_RECORD) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + + def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): + self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) + + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN") + + with pytest.raises(ApiException, match=r".*Server doesn't support v3 write API\. " + r"Set use_v2_api=True for v2 compatibility endpoint\."): client.write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( method="POST", uri="/api/v3/write_lp", - query_string={"org": "ORG", "db": "DB", "precision": "nanosecond", "no_sync": "true"})) + query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -111,8 +143,8 @@ def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v2/write", - query_string={"org": "ORG", "bucket": "DB", "precision": "us"}, + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond"}, headers={"Content-Encoding": "gzip"}, )) def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): @@ -135,6 +167,20 @@ def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"}, headers={"Content-Encoding": "gzip"}, )) + def test_write_invalid_use_v2_api_and_no_sync(self, httpserver: HTTPServer): + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=True, + no_sync=True + ) + ) + ) + with pytest.raises(ValueError, match=r".*invalid write options: no_sync cannot be used with use_v2_api.*"): + client.write(self.SAMPLE_RECORD) + def test_write_with_timeout_in_write_options(self, httpserver: HTTPServer): self.delay_response(httpserver, 0.5) From f5d0ee50cd639ae999635351d383cfd84834ba46 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 5 May 2026 10:09:31 +0200 Subject: [PATCH 02/30] test: fix E501 --- tests/test_api_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 85e77d0..a0184c0 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -170,7 +170,8 @@ def test_api_error_v3_with_detail(self): # typed parse fails due line_number type -> raw fallback details ( "textual line_number falls back to raw", - '{"error":"partial write of line protocol occurred","data":[{"error_message":"bad line","line_number":"x","original_line":"bad lp"}]}', + '{"error":"partial write of line protocol occurred","data":' + '[{"error_message":"bad line","line_number":"x","original_line":"bad lp"}]}', "partial write of line protocol occurred:\n" "\t{\"error_message\":\"bad line\",\"line_number\":\"x\",\"original_line\":\"bad lp\"}", False, @@ -178,7 +179,8 @@ def test_api_error_v3_with_detail(self): # mixed valid + malformed in array -> raw fallback for whole array ( "mixed array malformed item falls back to raw", - '{"error":"partial write of line protocol occurred","data":[{"error_message":"bad line","line_number":2,"original_line":"bad lp"},1]}', + '{"error":"partial write of line protocol occurred","data":' + '[{"error_message":"bad line","line_number":2,"original_line":"bad lp"},1]}', "partial write of line protocol occurred:\n" "\t{\"error_message\":\"bad line\",\"line_number\":2,\"original_line\":\"bad lp\"}\n" "\t1", From 9b0efb4b09a5225ebb4c7d2190e505cb871ba449 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 5 May 2026 12:21:58 +0200 Subject: [PATCH 03/30] test: fix stale test --- tests/test_polars.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_polars.py b/tests/test_polars.py index 44350e1..4272546 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -207,6 +207,8 @@ def test_write_polars_batching(self): bucket=ANY, precision=ANY, no_sync=ANY, + accept_partial=ANY, + use_v2_api=ANY, async_req=ANY, content_type=ANY, urlopen_kw=ANY, From 00d49a2331094f1eec2e568e0e49143d5ddd3ac7 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 5 May 2026 17:42:40 +0200 Subject: [PATCH 04/30] fix: docstring --- influxdb_client_3/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 43adf77..de38a75 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -159,14 +159,14 @@ def _parse_gzip_threshold(threshold: str) -> int: def _parse_write_bool(value): """ - Parses and validates the provided write no sync value. + Parses a truthy/falsy value for write options. - This function ensures that the given value is a valid boolean, - and it raises an appropriate error if the value is not valid. + The input is normalized to string and matched against common truthy values. + Any non-truthy value is treated as False. :param value: The input value to be parsed and validated. - :type write_no_sync: Any - :return: The validated write no sync value as an boolean. + :type value: Any + :return: Parsed boolean value. :rtype: bool """ return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes'] From b514d943279b240ea7ac9d28bb81eade82a8291f Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 5 May 2026 17:46:20 +0200 Subject: [PATCH 05/30] refactor: remove redundant try-catch-raise --- influxdb_client_3/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index de38a75..f84e74a 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -11,7 +11,7 @@ from pyarrow import ArrowException from influxdb_client_3.exceptions import InfluxDB3ClientQueryError -from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError +from influxdb_client_3.exceptions import InfluxDBError from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder from influxdb_client_3.read_file import UploadFile from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point @@ -430,10 +430,7 @@ def write(self, record=None, database=None, **kwargs): if database is None: database = self._database - try: - return self._write_api.write(bucket=database, record=record, **kwargs) - except (InfluxDBError, InfluxDBPartialWriteError) as e: - raise e + return self._write_api.write(bucket=database, record=record, **kwargs) def write_dataframe( self, From 8de987586a3777f90ce71585744babbf63e6b5ef Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:23:39 +0200 Subject: [PATCH 06/30] fix: preserve ApiException.message for v3-on-v2 compatibility error --- influxdb_client_3/write_client/service/write_service.py | 5 ++++- tests/test_write_local_server.py | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index c0f954a..4b680b0 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -162,7 +162,10 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] if not use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: message = "Server doesn't support v3 write API. Set use_v2_api=True for v2 compatibility endpoint." - raise ApiException(status=0, reason=message) + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + raise ex partial = InfluxDBPartialWriteError.from_response(e.response) if partial is not None: raise partial diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index 5cdb7ff..bebd750 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -119,9 +119,14 @@ def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): client = InfluxDBClient3( host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN") + expected = ("Server doesn't support v3 write API. " + "Set use_v2_api=True for v2 compatibility endpoint.") with pytest.raises(ApiException, match=r".*Server doesn't support v3 write API\. " - r"Set use_v2_api=True for v2 compatibility endpoint\."): + r"Set use_v2_api=True for v2 compatibility endpoint\.") as err: client.write(self.SAMPLE_RECORD) + assert err.value.message == expected + assert err.value.reason == expected + assert err.value.args == (expected,) self.assert_request_made(httpserver, RequestMatcher( method="POST", uri="/api/v3/write_lp", From 9bae3f73a9641ac383661e60ac0e7fc5a217a9e6 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:24:26 +0200 Subject: [PATCH 07/30] refactor: avoid double parsing when building partial write error --- influxdb_client_3/exceptions/exceptions.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index f7004ea..90ba5bb 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -231,11 +231,9 @@ class InfluxDBPartialWriteLineError: class InfluxDBPartialWriteError(InfluxDBError): """Structured partial-write error with per-line failures.""" - def __init__(self, response: HTTPResponse, message: str, line_errors: List[InfluxDBPartialWriteLineError]): + def __init__(self, response: HTTPResponse, line_errors: List[InfluxDBPartialWriteLineError]): super().__init__(response=response) - self.message = message self.line_errors = line_errors - self.args = (self.message,) @classmethod def from_response(cls, response: HTTPResponse): @@ -261,5 +259,4 @@ def from_response(cls, response: HTTPResponse): ) for error_message, line_number, original_line in parsed_line_errors ] - message = InfluxDBError(response=response).message - return cls(response=response, message=message, line_errors=line_errors) + return cls(response=response, line_errors=line_errors) From f387a7f3db4c76cbe2d451138c76f5d003b0568e Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:45:44 +0200 Subject: [PATCH 08/30] test: add more coverage for exceptions --- tests/test_api_client.py | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index a0184c0..f183aef 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -10,6 +10,7 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError +from influxdb_client_3.exceptions.exceptions import _parse_typed_partial_write_object from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -193,6 +194,23 @@ def test_api_error_v3_with_detail(self): "data not list", False, ), + # typed object with empty message is dropped + ( + "empty error_message in object", + '{"error":"partial write of line protocol occurred","data":' + '{"error_message":"","line_number":2,"original_line":"bad lp"}}', + "partial write of line protocol occurred", + False, + ), + # typed array parse fails, raw fallback skips null item + ( + "raw fallback skips null details", + '{"error":"partial write of line protocol occurred","data":' + '[null,{"error_message":123}]}', + "partial write of line protocol occurred:\n" + "\t{\"error_message\":123}", + False, + ), ] for name, response_body, expected, is_partial in cases: with self.subTest(name): @@ -222,6 +240,29 @@ def test_api_error_v3_partial_write_with_message_only_object_returns_partial_err self.assertEqual(0, err.exception.line_errors[0].line_number) self.assertEqual("", err.exception.line_errors[0].original_line) + def test_parse_typed_partial_write_object_guards(self): + self.assertIsNone(_parse_typed_partial_write_object(None)) + self.assertIsNone(_parse_typed_partial_write_object({"error_message": 123})) + + def test_partial_write_from_response_guards(self): + self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) + + empty_body = response.HTTPResponse(status=400, reason="Bad Request", body=b"") + self.assertIsNone(InfluxDBPartialWriteError.from_response(empty_body)) + + invalid_json = response.HTTPResponse(status=400, reason="Bad Request", body=b"{") + self.assertIsNone(InfluxDBPartialWriteError.from_response(invalid_json)) + + non_dict_json = response.HTTPResponse(status=400, reason="Bad Request", body=b"[]") + self.assertIsNone(InfluxDBPartialWriteError.from_response(non_dict_json)) + + object_without_typed_line_error = response.HTTPResponse( + status=400, + reason="Bad Request", + body=b'{"error":"partial write of line protocol occurred","data":{"error_message":123}}', + ) + self.assertIsNone(InfluxDBPartialWriteError.from_response(object_without_typed_line_error)) + def test_api_error_headers(self): body = '{"error": "test error"}' body_dic = json.loads(body) From 775468137f4eb9f8e0791a07db9ef6c2a6ad8a29 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:48:16 +0200 Subject: [PATCH 09/30] refactor: error parsing simplified --- influxdb_client_3/exceptions/exceptions.py | 36 ++++++++-------------- tests/test_api_client.py | 8 ++--- 2 files changed, 17 insertions(+), 27 deletions(-) diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index 90ba5bb..15a5f11 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -98,25 +98,10 @@ def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str] return line_errors if len(line_errors) > 0 else None -def _parse_raw_array_details(data) -> Optional[List[str]]: - if not isinstance(data, list): - return None - details: List[str] = [] - for item in data: - if item is None: - continue - raw = json.dumps(item, separators=(',', ':')) - if raw and raw.lower() != "null": - details.append(raw) - return details - - -def _parse_typed_partial_write_object(data) -> Optional[Tuple[str, int, str]]: - if data is None: - return None +def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]: try: return _parse_partial_write_data_item(data) - except ValueError: + except (TypeError, ValueError): return None @@ -138,14 +123,19 @@ def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str if typed_array is not None: return typed_array, _format_partial_write_details(typed_array) - raw_details = _parse_raw_array_details(data) - if raw_details is not None: - return [], raw_details + if isinstance(data, list): + details: List[str] = [] + for item in data: + if item is None: + continue + raw = json.dumps(item, separators=(',', ':')) + if raw and raw.lower() != "null": + details.append(raw) + return [], details - typed_single = _parse_typed_partial_write_object(data) + typed_single = _parse_typed_partial_write_object_or_none(data) if typed_single is not None: - line_errors = [typed_single] - return line_errors, _format_partial_write_details(line_errors) + return [typed_single], _format_partial_write_details([typed_single]) return [], [] diff --git a/tests/test_api_client.py b/tests/test_api_client.py index f183aef..3e8a992 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError -from influxdb_client_3.exceptions.exceptions import _parse_typed_partial_write_object +from influxdb_client_3.exceptions.exceptions import _parse_typed_partial_write_object_or_none from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -240,9 +240,9 @@ def test_api_error_v3_partial_write_with_message_only_object_returns_partial_err self.assertEqual(0, err.exception.line_errors[0].line_number) self.assertEqual("", err.exception.line_errors[0].original_line) - def test_parse_typed_partial_write_object_guards(self): - self.assertIsNone(_parse_typed_partial_write_object(None)) - self.assertIsNone(_parse_typed_partial_write_object({"error_message": 123})) + def test_parse_typed_partial_write_object_or_none_guards(self): + self.assertIsNone(_parse_typed_partial_write_object_or_none(None)) + self.assertIsNone(_parse_typed_partial_write_object_or_none({"error_message": 123})) def test_partial_write_from_response_guards(self): self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) From 728ca7129f32dd49aa9ded82d9e81a2d73790f41 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:58:01 +0200 Subject: [PATCH 10/30] test: refactor for higher-level testing --- tests/test_api_client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 3e8a992..90c184e 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -10,7 +10,6 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError -from influxdb_client_3.exceptions.exceptions import _parse_typed_partial_write_object_or_none from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -240,10 +239,6 @@ def test_api_error_v3_partial_write_with_message_only_object_returns_partial_err self.assertEqual(0, err.exception.line_errors[0].line_number) self.assertEqual("", err.exception.line_errors[0].original_line) - def test_parse_typed_partial_write_object_or_none_guards(self): - self.assertIsNone(_parse_typed_partial_write_object_or_none(None)) - self.assertIsNone(_parse_typed_partial_write_object_or_none({"error_message": 123})) - def test_partial_write_from_response_guards(self): self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) From 3c9251cecaafb47ced0d40b01c3e7a7caeb8718b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 09:58:46 +0200 Subject: [PATCH 11/30] refactor: simplify partial write error parsing guards --- influxdb_client_3/exceptions/exceptions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index 15a5f11..8bb8e25 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -43,7 +43,7 @@ def __init__(self, error_message, *args, **kwargs): def _is_partial_write_error(error_message) -> bool: - if not isinstance(error_message, str) or len(error_message) == 0: + if not isinstance(error_message, str) or not error_message: return False normalized = error_message.lower() return ( @@ -61,7 +61,7 @@ def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]: error_message = item.get("error_message") if not isinstance(error_message, str): raise ValueError("error_message must be string") - if len(error_message) == 0: + if not error_message: return None line_number_raw = item.get("line_number") @@ -101,7 +101,7 @@ def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str] def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]: try: return _parse_partial_write_data_item(data) - except (TypeError, ValueError): + except ValueError: return None @@ -239,7 +239,7 @@ def from_response(cls, response: HTTPResponse): if not _is_partial_write_error(error_text): return None parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data")) - if len(parsed_line_errors) == 0: + if not parsed_line_errors: return None line_errors = [ InfluxDBPartialWriteLineError( From 38180936b76884389068761b7a2d1ba297e2fc9b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 6 May 2026 11:16:17 +0200 Subject: [PATCH 12/30] fix: resolve write option kwargs before post_write call --- .../write_client/client/write_api.py | 22 +++++++---- tests/test_write_local_server.py | 39 +++++++++++++++++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 5c67875..6f20777 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -320,6 +320,14 @@ def __init__(self, # TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API warnings.warn(message, DeprecationWarning) + def _resolve_write_request_options(self, kwargs): + no_sync = kwargs.pop('no_sync', self._write_options.no_sync) + accept_partial = kwargs.pop('accept_partial', self._write_options.accept_partial) + use_v2_api = kwargs.pop('use_v2_api', self._write_options.use_v2_api) + if use_v2_api and no_sync: + raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") + return no_sync, accept_partial, use_v2_api + def write(self, bucket: str, org: str = None, record: Union[ str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], @@ -400,6 +408,8 @@ def write(self, bucket: str, org: str = None, write_precision = self._write_options.write_precision self._write_options.validate() + kwargs = dict(kwargs) + no_sync, accept_partial, use_v2_api = self._resolve_write_request_options(kwargs) if 'tag_order' in kwargs: kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order')) @@ -407,13 +417,12 @@ def write(self, bucket: str, org: str = None, kwargs['tag_order'] = self._write_options.tag_order if self._write_options.write_type is WriteType.batching: + kwargs['no_sync'] = no_sync + kwargs['accept_partial'] = accept_partial + kwargs['use_v2_api'] = use_v2_api return self._write_batching(bucket, org, record, write_precision, **kwargs) - no_sync = self._write_options.no_sync - accept_partial = self._write_options.accept_partial - use_v2_api = self._write_options.use_v2_api - payloads = defaultdict(list) self._serialize(record, write_precision, payloads, **kwargs) @@ -602,9 +611,8 @@ def _retry_callback_delegate(exception): else: _retry_callback_delegate = None - no_sync = self._write_options.no_sync - accept_partial = self._write_options.accept_partial - use_v2_api = self._write_options.use_v2_api + kwargs = dict(kwargs) + no_sync, accept_partial, use_v2_api = self._resolve_write_request_options(kwargs) retry = self._write_options.to_retry_strategy(retry_callback=_retry_callback_delegate) diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index bebd750..93391eb 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -77,6 +77,20 @@ def test_write_with_no_sync_true(self, httpserver: HTTPServer): method="POST", uri="/api/v3/write_lp", query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"})) + def test_write_with_no_sync_true_in_kwargs(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.US) + ) + ).write(self.SAMPLE_RECORD, no_sync=True) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v3/write_lp", + query_string={"org": "ORG", "db": "DB", "precision": "microsecond", "no_sync": "true"})) + def test_write_with_accept_partial_false(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -113,6 +127,20 @@ def test_write_with_use_v2_api_true(self, httpserver: HTTPServer): method="POST", uri="/api/v2/write", query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + def test_write_with_use_v2_api_true_in_kwargs(self, httpserver: HTTPServer): + self.set_response_status(httpserver, 200) + + InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.US) + ) + ).write(self.SAMPLE_RECORD, use_v2_api=True, accept_partial=False) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) + def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) @@ -186,6 +214,17 @@ def test_write_invalid_use_v2_api_and_no_sync(self, httpserver: HTTPServer): with pytest.raises(ValueError, match=r".*invalid write options: no_sync cannot be used with use_v2_api.*"): client.write(self.SAMPLE_RECORD) + def test_write_invalid_use_v2_api_and_no_sync_in_kwargs(self, httpserver: HTTPServer): + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous) + ) + ) + + with pytest.raises(ValueError, match=r".*invalid write options: no_sync cannot be used with use_v2_api.*"): + client.write(self.SAMPLE_RECORD, use_v2_api=True, no_sync=True) + def test_write_with_timeout_in_write_options(self, httpserver: HTTPServer): self.delay_response(httpserver, 0.5) From 5c91d255e7edee6395a0db5b0c8668a81c19a335 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 7 May 2026 18:22:12 +0200 Subject: [PATCH 13/30] docs: update CHANGELOG --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95854c1..6f66072 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## 0.20.0 [unreleased] +### Breaking Changes + +1. [#210](https://github.com/InfluxCommunity/influxdb3-python/pull/210): Adds partial write support and aligns write routing with v3 defaults. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. + For InfluxDB Clustered, set `use_v2_api=True` for writes. + ### Features 1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. From 2b59836e567698a4ce8ffb4113773bd901f62b32 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 12:44:42 +0200 Subject: [PATCH 14/30] fix: use V2 API by default --- README.md | 28 ++++++--- influxdb_client_3/exceptions/exceptions.py | 7 ++- .../write_client/client/write_api.py | 2 +- .../write_client/service/write_service.py | 10 +++- tests/test_api_client.py | 8 +++ tests/test_influxdb_client_3_integration.py | 16 ++--- tests/test_write_local_server.py | 60 ++++++++++++++----- 7 files changed, 96 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 9d6562d..104de74 100644 --- a/README.md +++ b/README.md @@ -249,13 +249,19 @@ On partial failure, the client raises `InfluxDBPartialWriteError` with structure ```python from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import WriteOptions, write_client_options from influxdb_client_3.exceptions import InfluxDBPartialWriteError -client = InfluxDBClient3(host="http://localhost:8181", token="token", database="db") -lp = "m v=1i 1\nm v=1.2 2" +client = InfluxDBClient3( + host="http://localhost:8181", + token="token", + database="db", + write_client_options=write_client_options(write_options=WriteOptions(use_v2_api=False)), +) +lp = "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200" try: - client.write(lp) # accept_partial=True by default + client.write(lp) # accept_partial=True by default on V3 API endpoint except InfluxDBPartialWriteError as e: for line_err in e.line_errors: print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})") @@ -270,24 +276,30 @@ client = InfluxDBClient3( token="token", database="db", write_client_options=write_client_options( - write_options=WriteOptions(accept_partial=False) + write_options=WriteOptions( + use_v2_api=False, + accept_partial=False + ) ), ) ``` -### V2 compatibility mode (Clustered) -Set `use_v2_api=True` to route writes through `/api/v2/write` for Clustered/v2-compatible backends. +### Compatibility with InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless +Writes use the V2 API endpoint by default, so no additional configuration is required for these products. `use_v2_api` can be configured by: -- `WriteOptions(use_v2_api=True)` +- `WriteOptions(use_v2_api=False)` (for V3 API endpoint features) - constructor kwarg: `write_use_v2_api=True` - env var: `INFLUX_WRITE_USE_V2_API=true` When `use_v2_api=True`: -- `accept_partial` is ignored by the backend +- `accept_partial` is not used - `no_sync=True` is invalid and rejected before dispatch with: `invalid write options: no_sync cannot be used with use_v2_api` +To use `no_sync` or `accept_partial` controls, set `use_v2_api=False` +(for example with InfluxDB 3 Core/Enterprise). + ## Querying ### Querying with SQL diff --git a/influxdb_client_3/exceptions/exceptions.py b/influxdb_client_3/exceptions/exceptions.py index 8bb8e25..2492fe3 100644 --- a/influxdb_client_3/exceptions/exceptions.py +++ b/influxdb_client_3/exceptions/exceptions.py @@ -108,8 +108,11 @@ def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]: details: List[str] = [] for error_message, line_number, original_line in line_errors: - if line_number != 0 and original_line != "": - details.append(f"\tline {line_number}: {error_message} ({original_line})") + if line_number != 0: + if original_line != "": + details.append(f"\tline {line_number}: {error_message} ({original_line})") + else: + details.append(f"\tline {line_number}: {error_message}") elif error_message: details.append(f"\t{error_message}") return details diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 6f20777..eec7a2a 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -30,7 +30,7 @@ DEFAULT_WRITE_NO_SYNC = False DEFAULT_WRITE_TIMEOUT = 10_000 DEFAULT_WRITE_ACCEPT_PARTIAL = True -DEFAULT_WRITE_USE_V2_API = False +DEFAULT_WRITE_USE_V2_API = True # Kwargs consumed during serialization that should not be passed to _post_write SERIALIZER_KWARGS = { diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 4b680b0..19f51ac 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -160,8 +160,16 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, urlopen_kw=kwargs.get('urlopen_kw', None)) except ApiException as e: use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] + if use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " + "Set use_v2_api=False to use the V3 API endpoint.") + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + raise ex if not use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = "Server doesn't support v3 write API. Set use_v2_api=True for v2 compatibility endpoint." + message = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.") ex = ApiException(status=0, reason=message) ex.message = message ex.args = (message,) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 90c184e..50db50c 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -239,6 +239,14 @@ def test_api_error_v3_partial_write_with_message_only_object_returns_partial_err self.assertEqual(0, err.exception.line_errors[0].line_number) self.assertEqual("", err.exception.line_errors[0].original_line) + def test_api_error_v3_partial_write_with_line_number_without_original_line(self): + response_body = ('{"error":"partial write of line protocol occurred","data":' + '{"error_message":"invalid field value","line_number":2}}') + with self.assertRaises(InfluxDBPartialWriteError) as err: + self._test_api_error(response_body) + self.assertEqual(1, len(err.exception.line_errors)) + self.assertEqual("partial write of line protocol occurred:\n\tline 2: invalid field value", err.exception.message) + def test_partial_write_from_response_guards(self): self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index e39b88a..118c5df 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -125,17 +125,19 @@ def test_write_and_query(self): self.assertEqual(123.0, df['value'][0]) def test_v3_error(self): - measurement = f'test{random_hex(3)}'.lower() lp = "\n".join([ - f"{measurement} v=1i 1770291280", - f"{measurement} v=1 1770291281", + "home,room=Sunroom temp=96 1735545600", + "home,room=Sunroom temp=\"hi\" 1735549200", ]) with InfluxDBClient3( host=self.host, database=self.database, token=self.token, - write_client_options=write_client_options(write_options=WriteOptions(write_type=WriteType.synchronous)) + write_client_options=write_client_options(write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=False + )) ) as client: try: client.write(lp) @@ -144,11 +146,11 @@ def test_v3_error(self): msg = err.message self.assertIn("partial write of line protocol occurred", msg) self.assertIn(( - "invalid column type for column 'v', expected iox::column_type::field::integer, " - "got iox::column_type::field::float" + "invalid column type for column 'temp', expected iox::column_type::field::float, " + "got iox::column_type::field::string" ), msg) self.assertIn("line 2", msg) - self.assertIn(measurement, msg) + self.assertIn("home,room=Sunroom", msg) def test_auth_error_token(self): self.client = InfluxDBClient3(host=self.host, database=self.database, token='fake token') diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index 93391eb..de8928c 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -38,8 +38,8 @@ def test_write_default_params(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v3/write_lp", - query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "ns"})) def test_write_with_write_options(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -56,8 +56,8 @@ def test_write_with_write_options(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v3/write_lp", - query_string={"org": "ORG", "db": "DB", "precision": "microsecond"})) + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"})) def test_write_with_no_sync_true(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -68,7 +68,8 @@ def test_write_with_no_sync_true(self, httpserver: HTTPServer): write_options=WriteOptions( write_type=WriteType.synchronous, write_precision=WritePrecision.US, - no_sync=True + no_sync=True, + use_v2_api=False ) ) ).write(self.SAMPLE_RECORD) @@ -85,7 +86,7 @@ def test_write_with_no_sync_true_in_kwargs(self, httpserver: HTTPServer): write_client_options=write_client_options( write_options=WriteOptions(write_type=WriteType.synchronous, write_precision=WritePrecision.US) ) - ).write(self.SAMPLE_RECORD, no_sync=True) + ).write(self.SAMPLE_RECORD, no_sync=True, use_v2_api=False) self.assert_request_made(httpserver, RequestMatcher( method="POST", uri="/api/v3/write_lp", @@ -99,7 +100,8 @@ def test_write_with_accept_partial_false(self, httpserver: HTTPServer): write_client_options=write_client_options( write_options=WriteOptions( write_type=WriteType.synchronous, - accept_partial=False + accept_partial=False, + use_v2_api=False ) ) ).write(self.SAMPLE_RECORD) @@ -145,12 +147,15 @@ def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) client = InfluxDBClient3( - host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN") + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous, use_v2_api=False) + )) - expected = ("Server doesn't support v3 write API. " - "Set use_v2_api=True for v2 compatibility endpoint.") - with pytest.raises(ApiException, match=r".*Server doesn't support v3 write API\. " - r"Set use_v2_api=True for v2 compatibility endpoint\.") as err: + expected = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.") + with pytest.raises(ApiException, match=r".*Server doesn't support the V3 API endpoint " + r"\(/api/v3/write_lp\)\. Set use_v2_api=True to use the V2 API endpoint\.") as err: client.write(self.SAMPLE_RECORD) assert err.value.message == expected assert err.value.reason == expected @@ -160,6 +165,25 @@ def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): method="POST", uri="/api/v3/write_lp", query_string={"org": "ORG", "db": "DB", "precision": "nanosecond"})) + def test_write_with_v2_on_v3_server(self, httpserver: HTTPServer): + self.set_response_status(httpserver, HTTPStatus.METHOD_NOT_ALLOWED) + + client = InfluxDBClient3( + host=(httpserver.url_for("/")), org="ORG", database="DB", token="TOKEN") + + expected = ("Server doesn't support the V2 API endpoint (/api/v2/write). " + "Set use_v2_api=False to use the V3 API endpoint.") + with pytest.raises(ApiException, match=r".*Server doesn't support the V2 API endpoint " + r"\(/api/v2/write\)\. Set use_v2_api=False to use the V3 API endpoint\.") as err: + client.write(self.SAMPLE_RECORD) + assert err.value.message == expected + assert err.value.reason == expected + assert err.value.args == (expected,) + + self.assert_request_made(httpserver, RequestMatcher( + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "ns"})) + def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): self.set_response_status(httpserver, 200) @@ -176,8 +200,8 @@ def test_write_with_no_sync_false_and_gzip(self, httpserver: HTTPServer): ).write(self.SAMPLE_RECORD) self.assert_request_made(httpserver, RequestMatcher( - method="POST", uri="/api/v3/write_lp", - query_string={"org": "ORG", "db": "DB", "precision": "microsecond"}, + method="POST", uri="/api/v2/write", + query_string={"org": "ORG", "bucket": "DB", "precision": "us"}, headers={"Content-Encoding": "gzip"}, )) def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): @@ -189,7 +213,8 @@ def test_write_with_no_sync_true_and_gzip(self, httpserver: HTTPServer): write_options=WriteOptions( write_type=WriteType.synchronous, write_precision=WritePrecision.US, - no_sync=True + no_sync=True, + use_v2_api=False ) ), enable_gzip=True @@ -236,7 +261,8 @@ def test_write_with_timeout_in_write_options(self, httpserver: HTTPServer): write_type=WriteType.synchronous, write_precision=WritePrecision.US, timeout=30, - no_sync=True + no_sync=True, + use_v2_api=False ) ), enable_gzip=True @@ -254,6 +280,7 @@ def test_write_with_write_timeout(self, httpserver: HTTPServer): write_type=WriteType.synchronous, write_precision=WritePrecision.US, no_sync=True, + use_v2_api=False, ) ), enable_gzip=True @@ -270,6 +297,7 @@ def test_write_with_timeout_arg(self, httpserver: HTTPServer): write_type=WriteType.synchronous, write_precision=WritePrecision.US, no_sync=True, + use_v2_api=False, ) ), enable_gzip=True From 805133531733e751b3cd4084d8789cca503d3cfa Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 12:47:03 +0200 Subject: [PATCH 15/30] test: fix formatting --- tests/test_api_client.py | 3 ++- tests/test_write_local_server.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 50db50c..8609306 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -245,7 +245,8 @@ def test_api_error_v3_partial_write_with_line_number_without_original_line(self) with self.assertRaises(InfluxDBPartialWriteError) as err: self._test_api_error(response_body) self.assertEqual(1, len(err.exception.line_errors)) - self.assertEqual("partial write of line protocol occurred:\n\tline 2: invalid field value", err.exception.message) + self.assertEqual("partial write of line protocol occurred:\n\tline 2: invalid field value", + err.exception.message) def test_partial_write_from_response_guards(self): self.assertIsNone(InfluxDBPartialWriteError.from_response(None)) diff --git a/tests/test_write_local_server.py b/tests/test_write_local_server.py index de8928c..d84db35 100644 --- a/tests/test_write_local_server.py +++ b/tests/test_write_local_server.py @@ -155,7 +155,8 @@ def test_write_with_v3_on_v2_server(self, httpserver: HTTPServer): expected = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " "Set use_v2_api=True to use the V2 API endpoint.") with pytest.raises(ApiException, match=r".*Server doesn't support the V3 API endpoint " - r"\(/api/v3/write_lp\)\. Set use_v2_api=True to use the V2 API endpoint\.") as err: + r"\(/api/v3/write_lp\)\. " + r"Set use_v2_api=True to use the V2 API endpoint\.") as err: client.write(self.SAMPLE_RECORD) assert err.value.message == expected assert err.value.reason == expected @@ -174,7 +175,8 @@ def test_write_with_v2_on_v3_server(self, httpserver: HTTPServer): expected = ("Server doesn't support the V2 API endpoint (/api/v2/write). " "Set use_v2_api=False to use the V3 API endpoint.") with pytest.raises(ApiException, match=r".*Server doesn't support the V2 API endpoint " - r"\(/api/v2/write\)\. Set use_v2_api=False to use the V3 API endpoint\.") as err: + r"\(/api/v2/write\)\. " + r"Set use_v2_api=False to use the V3 API endpoint\.") as err: client.write(self.SAMPLE_RECORD) assert err.value.message == expected assert err.value.reason == expected From 0060be752486c176b3fdbe8dde52e5489c6184ca Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 13:07:44 +0200 Subject: [PATCH 16/30] feat: add kwargs handler for write_no_sync --- README.md | 13 +++---------- influxdb_client_3/__init__.py | 4 ++++ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 104de74..0527873 100644 --- a/README.md +++ b/README.md @@ -249,14 +249,13 @@ On partial failure, the client raises `InfluxDBPartialWriteError` with structure ```python from influxdb_client_3 import InfluxDBClient3 -from influxdb_client_3 import WriteOptions, write_client_options from influxdb_client_3.exceptions import InfluxDBPartialWriteError client = InfluxDBClient3( host="http://localhost:8181", token="token", database="db", - write_client_options=write_client_options(write_options=WriteOptions(use_v2_api=False)), + write_use_v2_api=False, ) lp = "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200" @@ -269,18 +268,12 @@ except InfluxDBPartialWriteError as e: Disable partial writes: ```python -from influxdb_client_3 import WriteOptions, write_client_options - client = InfluxDBClient3( host="http://localhost:8181", token="token", database="db", - write_client_options=write_client_options( - write_options=WriteOptions( - use_v2_api=False, - accept_partial=False - ) - ), + write_use_v2_api=False, + write_accept_partial=False, ) ``` diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index f84e74a..0b6ed9e 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -241,6 +241,7 @@ def __init__( :key str write_timeout: int value used to set the client write API timeout in milliseconds. :key bool write_accept_partial: allow partial writes when some lines fail. :key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint. + :key bool write_no_sync: disable sync confirmation on V3 API endpoint writes. :key list[str] profilers: list of enabled Flux profilers """ self._org = org if org is not None else "default" @@ -273,6 +274,9 @@ def __init__( if kw_keys.__contains__('write_use_v2_api'): write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api')) + if kw_keys.__contains__('write_no_sync'): + write_no_sync = _parse_write_bool(kwargs.get('write_no_sync')) + write_options = WriteOptions( write_type=write_type, write_precision=write_precision, From d300ef8bacb462445129651f9f65cb3f92506039 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 13:21:33 +0200 Subject: [PATCH 17/30] docs: update CHANGELOG --- CHANGELOG.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f66072..dd805ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,14 +2,11 @@ ## 0.20.0 [unreleased] -### Breaking Changes - -1. [#210](https://github.com/InfluxCommunity/influxdb3-python/pull/210): Adds partial write support and aligns write routing with v3 defaults. - See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. - For InfluxDB Clustered, set `use_v2_api=True` for writes. - ### Features +1. [#213](https://github.com/InfluxCommunity/influxdb3-python/pull/213): Add partial write support and default writes to the V2 API endpoint. + See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. + `no_sync` requires `use_v2_api=False`; `accept_partial` applies only to V3 API endpoint writes. 1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. ### Bug Fixes From 57a26a431487bf9bb2e818bf0d90094b373ed11c Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 13:29:09 +0200 Subject: [PATCH 18/30] test: improve coverage --- tests/test_influxdb_client_3.py | 76 +++++++++++++-------------------- 1 file changed, 29 insertions(+), 47 deletions(-) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 9e38b40..44e2518 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -321,53 +321,35 @@ def test_parse_invalid_write_precision(self): InfluxDBClient3.from_env() self.assertIn("Invalid precision value: invalid_value", str(context.exception)) - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_NO_SYNC': 'true'}) - def test_parse_write_no_sync_true(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.no_sync, True) - - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_NO_SYNC': 'TrUe'}) - def test_parse_write_no_sync_true_mixed_chars(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.no_sync, True) - - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_NO_SYNC': 'false'}) - def test_parse_write_no_sync_false(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.no_sync, False) - - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_NO_SYNC': 'anything-else'}) - def test_parse_write_no_sync_anything_else_is_false(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.no_sync, False) - - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}) - def test_parse_write_accept_partial_false(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.accept_partial, False) - - @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', - 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_USE_V2_API': 'true'}) - def test_parse_write_use_v2_api_true(self): - client = InfluxDBClient3.from_env() - self.assertIsInstance(client, InfluxDBClient3) - write_options = client._write_client_options.get("write_options") - self.assertEqual(write_options.use_v2_api, True) + def test_write_bool_options_from_env(self): + _base = {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db'} + cases = [ + ('no_sync true', {'INFLUX_WRITE_NO_SYNC': 'true'}, 'no_sync', True), + ('no_sync TrUe mixed case', {'INFLUX_WRITE_NO_SYNC': 'TrUe'}, 'no_sync', True), + ('no_sync false', {'INFLUX_WRITE_NO_SYNC': 'false'}, 'no_sync', False), + ('no_sync anything-else', {'INFLUX_WRITE_NO_SYNC': 'anything-else'}, 'no_sync', False), + ('accept_partial false', {'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}, 'accept_partial', False), + ('use_v2_api true', {'INFLUX_WRITE_USE_V2_API': 'true'}, 'use_v2_api', True), + ] + for name, env_extra, field, expected in cases: + with self.subTest(name): + with patch.dict('os.environ', {**_base, **env_extra}): + client = InfluxDBClient3.from_env() + write_options = client._write_client_options.get("write_options") + self.assertEqual(getattr(write_options, field), expected) + + def test_write_bool_options_from_constructor_kwargs(self): + _base = {'host': 'localhost', 'token': 'test_token', 'database': 'test_db'} + cases = [ + ('use_v2_api False', {'write_use_v2_api': False}, 'use_v2_api', False), + ('accept_partial False', {'write_accept_partial': False}, 'accept_partial', False), + ('no_sync True', {'write_no_sync': True}, 'no_sync', True), + ] + for name, kwargs, field, expected in cases: + with self.subTest(name): + client = InfluxDBClient3(**_base, **kwargs) + write_options = client._write_client_options.get("write_options") + self.assertEqual(getattr(write_options, field), expected) @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db', 'INFLUX_WRITE_TIMEOUT': '6789'}) From 1d0134e6829930b8c959d25523eb72dc59983cdc Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 13:30:59 +0200 Subject: [PATCH 19/30] test: fix E241 linter complaint --- tests/test_influxdb_client_3.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 44e2518..554b25a 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -324,12 +324,12 @@ def test_parse_invalid_write_precision(self): def test_write_bool_options_from_env(self): _base = {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', 'INFLUX_DATABASE': 'test_db'} cases = [ - ('no_sync true', {'INFLUX_WRITE_NO_SYNC': 'true'}, 'no_sync', True), - ('no_sync TrUe mixed case', {'INFLUX_WRITE_NO_SYNC': 'TrUe'}, 'no_sync', True), - ('no_sync false', {'INFLUX_WRITE_NO_SYNC': 'false'}, 'no_sync', False), - ('no_sync anything-else', {'INFLUX_WRITE_NO_SYNC': 'anything-else'}, 'no_sync', False), - ('accept_partial false', {'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}, 'accept_partial', False), - ('use_v2_api true', {'INFLUX_WRITE_USE_V2_API': 'true'}, 'use_v2_api', True), + ('no_sync true', {'INFLUX_WRITE_NO_SYNC': 'true'}, 'no_sync', True), + ('no_sync TrUe mixed case', {'INFLUX_WRITE_NO_SYNC': 'TrUe'}, 'no_sync', True), + ('no_sync false', {'INFLUX_WRITE_NO_SYNC': 'false'}, 'no_sync', False), + ('no_sync anything-else', {'INFLUX_WRITE_NO_SYNC': 'anything-else'}, 'no_sync', False), + ('accept_partial false', {'INFLUX_WRITE_ACCEPT_PARTIAL': 'false'}, 'accept_partial', False), + ('use_v2_api true', {'INFLUX_WRITE_USE_V2_API': 'true'}, 'use_v2_api', True), ] for name, env_extra, field, expected in cases: with self.subTest(name): @@ -341,9 +341,9 @@ def test_write_bool_options_from_env(self): def test_write_bool_options_from_constructor_kwargs(self): _base = {'host': 'localhost', 'token': 'test_token', 'database': 'test_db'} cases = [ - ('use_v2_api False', {'write_use_v2_api': False}, 'use_v2_api', False), - ('accept_partial False', {'write_accept_partial': False}, 'accept_partial', False), - ('no_sync True', {'write_no_sync': True}, 'no_sync', True), + ('use_v2_api False', {'write_use_v2_api': False}, 'use_v2_api', False), + ('accept_partial False', {'write_accept_partial': False}, 'accept_partial', False), + ('no_sync True', {'write_no_sync': True}, 'no_sync', True), ] for name, kwargs, field, expected in cases: with self.subTest(name): From 9131053f33003e27a9ab5801d6396bee805dbd63 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 13:50:03 +0200 Subject: [PATCH 20/30] test: add coverage --- tests/test_influxdb_client_3.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 554b25a..7bba7f1 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -215,6 +215,23 @@ def test_write_api_custom_options_no_error(self): if sync_client is not None: sync_client.close() + def test_write_async_multiple_precisions_returns_list(self): + import warnings + client = InfluxDBClient3( + host="localhost", token="test_token", database="test_db", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.asynchronous)) + ) + point_s = Point.measurement("m").field("v", 1).time(1, WritePrecision.S) + point_ms = Point.measurement("m").field("v", 2).time(1000, WritePrecision.MS) + with patch.object(client._write_api, "_post_write", return_value="ok") as mock_post: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = client.write(record=[point_s, point_ms]) + self.assertEqual(mock_post.call_count, 2) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + def test_default_client(self): expected_precision = DefaultWriteOptions.write_precision.value expected_write_type = DefaultWriteOptions.write_type.value From 1191884300660d5af14e0eea19da34327d298e2a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 14:52:13 +0200 Subject: [PATCH 21/30] fix: option default --- influxdb_client_3/write_client/service/write_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 19f51ac..85fb41d 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -243,7 +243,7 @@ def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D path_params = {} query_params = [] - use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] + use_v2_api = local_var_params['use_v2_api'] if 'use_v2_api' in local_var_params else True no_sync = 'no_sync' in local_var_params and local_var_params['no_sync'] accept_partial = local_var_params['accept_partial'] if 'accept_partial' in local_var_params else True if 'org' in local_var_params: From 39941eb6515f088ec0b48a011c6b0f7a75a5aa3a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 14:54:18 +0200 Subject: [PATCH 22/30] fix: options in batching --- .../write_client/client/write_api.py | 12 ++++++++-- tests/test_influxdb_client_3.py | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index eec7a2a..e238e2c 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -205,15 +205,23 @@ def __init__(self, bucket, org, precision=DEFAULT_WRITE_PRECISION, **kwargs) -> self.bucket = bucket self.org = org self.precision = precision + self.no_sync = kwargs.get('no_sync', DEFAULT_WRITE_NO_SYNC) + self.accept_partial = kwargs.get('accept_partial', DEFAULT_WRITE_ACCEPT_PARTIAL) + self.use_v2_api = kwargs.get('use_v2_api', DEFAULT_WRITE_USE_V2_API) self.kwargs = kwargs pass def __hash__(self) -> int: - return hash((self.bucket, self.org, self.precision)) + return hash((self.bucket, self.org, self.precision, self.no_sync, self.accept_partial, self.use_v2_api)) def __eq__(self, o: object) -> bool: return isinstance(o, self.__class__) \ - and self.bucket == o.bucket and self.org == o.org and self.precision == o.precision + and self.bucket == o.bucket \ + and self.org == o.org \ + and self.precision == o.precision \ + and self.no_sync == o.no_sync \ + and self.accept_partial == o.accept_partial \ + and self.use_v2_api == o.use_v2_api def __str__(self) -> str: return '_BatchItemKey[bucket:\'{}\', org:\'{}\', precision:\'{}\', kwargs: \'{}\']' \ diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 7bba7f1..5079fb0 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -7,6 +7,7 @@ from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType, \ write_client_options from influxdb_client_3.exceptions import InfluxDB3ClientQueryError +from influxdb_client_3.write_client.client.write_api import _BatchItemKey from influxdb_client_3.write_client.rest import ApiException from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData, ErrorFlightServer @@ -132,6 +133,27 @@ def test_default_write_options(self): self.assertEqual(DefaultWriteOptions.timeout.value, client._write_client_options["write_options"].timeout) self.assertEqual([], client._write_client_options["write_options"].tag_order) + def test_batch_item_key_includes_write_routing_options(self): + k1 = _BatchItemKey("bucket", "org", WritePrecision.NS, + use_v2_api=True, no_sync=False, accept_partial=True) + k2 = _BatchItemKey("bucket", "org", WritePrecision.NS, + use_v2_api=False, no_sync=False, accept_partial=True) + k3 = _BatchItemKey("bucket", "org", WritePrecision.NS, + use_v2_api=True, no_sync=True, accept_partial=True) + k4 = _BatchItemKey("bucket", "org", WritePrecision.NS, + use_v2_api=True, no_sync=False, accept_partial=False) + + self.assertNotEqual(k1, k2) + self.assertNotEqual(k1, k3) + self.assertNotEqual(k1, k4) + self.assertNotEqual(hash(k1), hash(k2)) + self.assertNotEqual(hash(k1), hash(k3)) + self.assertNotEqual(hash(k1), hash(k4)) + + k1_same = _BatchItemKey("bucket", "org", WritePrecision.NS, + use_v2_api=True, no_sync=False, accept_partial=True) + self.assertEqual(k1, k1_same) + @asyncio_run async def test_query_async(self): with ConstantFlightServer() as server: From 19659d6bed3c0c39a61b9f9d8cde59aeb6281c55 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 15:11:51 +0200 Subject: [PATCH 23/30] docs: align use_v2_api config examples for V3 feature usage --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0527873..bff1f70 100644 --- a/README.md +++ b/README.md @@ -282,8 +282,8 @@ Writes use the V2 API endpoint by default, so no additional configuration is req `use_v2_api` can be configured by: - `WriteOptions(use_v2_api=False)` (for V3 API endpoint features) -- constructor kwarg: `write_use_v2_api=True` -- env var: `INFLUX_WRITE_USE_V2_API=true` +- constructor kwarg: `write_use_v2_api=False` +- env var: `INFLUX_WRITE_USE_V2_API=false` When `use_v2_api=True`: - `accept_partial` is not used From 0f8c7e6bc7a50b2424feae5a81cf492ad8ab60d9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 15:12:22 +0200 Subject: [PATCH 24/30] fix: default use_v2_api=True in 405 error handler path --- influxdb_client_3/write_client/service/write_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 85fb41d..3a746c2 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -159,7 +159,7 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, collection_formats={}, urlopen_kw=kwargs.get('urlopen_kw', None)) except ApiException as e: - use_v2_api = 'use_v2_api' in local_var_params and local_var_params['use_v2_api'] + use_v2_api = local_var_params['use_v2_api'] if 'use_v2_api' in local_var_params else True if use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " "Set use_v2_api=False to use the V3 API endpoint.") From c3f540742ebc6794269f132ab16577c0040caab0 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 May 2026 15:51:45 +0200 Subject: [PATCH 25/30] fix: translate async write errors consistently --- .../write_client/service/write_service.py | 55 +++++++++++------- tests/test_api_client.py | 58 +++++++++++++++++++ 2 files changed, 93 insertions(+), 20 deletions(-) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 3a746c2..d280e6c 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -141,8 +141,9 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, local_var_params, path, path_params, query_params, header_params, body_params = \ self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501 + use_v2_api = local_var_params['use_v2_api'] if 'use_v2_api' in local_var_params else True try: - return self.api_client.call_api( + result = self.api_client.call_api( path, 'POST', path_params, query_params, @@ -158,26 +159,40 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501, _request_timeout=local_var_params.get('_request_timeout'), collection_formats={}, urlopen_kw=kwargs.get('urlopen_kw', None)) + if local_var_params.get('async_req'): + original_get = result.get + + def translated_get(timeout=None): + try: + return original_get(timeout=timeout) + except ApiException as e: + raise self._translate_write_exception(e, use_v2_api) + + result.get = translated_get + return result except ApiException as e: - use_v2_api = local_var_params['use_v2_api'] if 'use_v2_api' in local_var_params else True - if use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " - "Set use_v2_api=False to use the V3 API endpoint.") - ex = ApiException(status=0, reason=message) - ex.message = message - ex.args = (message,) - raise ex - if not use_v2_api and e.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " - "Set use_v2_api=True to use the V2 API endpoint.") - ex = ApiException(status=0, reason=message) - ex.message = message - ex.args = (message,) - raise ex - partial = InfluxDBPartialWriteError.from_response(e.response) - if partial is not None: - raise partial - raise e + raise self._translate_write_exception(e, use_v2_api) + + @staticmethod + def _translate_write_exception(exc, use_v2_api): + if use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " + "Set use_v2_api=False to use the V3 API endpoint.") + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + return ex + if not use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.") + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + return ex + partial = InfluxDBPartialWriteError.from_response(exc.response) + if partial is not None: + return partial + return exc async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 """Write data. diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 8609306..79eb89c 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -10,6 +10,7 @@ from influxdb_client_3.write_client._sync.api_client import ApiClient from influxdb_client_3.write_client.configuration import Configuration from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError +from influxdb_client_3.write_client.rest import ApiException from influxdb_client_3.write_client.service import WriteService from influxdb_client_3.version import VERSION @@ -348,3 +349,60 @@ def test_should_gzip(self): # Test exact threshold match and less than threshold payload = "x" * 1000 self.assertTrue(ApiClient.should_gzip(payload, enable_gzip=True, gzip_threshold=1000)) + + def test_post_write_with_http_info_async_translates_exceptions(self): + cases = [ + ( + "v2 on v3-only backend", + True, + response.HTTPResponse(status=405, reason="Method Not Allowed", body=b""), + ApiException, + "Server doesn't support the V2 API endpoint (/api/v2/write). " + "Set use_v2_api=False to use the V3 API endpoint.", + ), + ( + "v3 on v2-only backend", + False, + response.HTTPResponse(status=405, reason="Method Not Allowed", body=b""), + ApiException, + "Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.", + ), + ( + "v3 partial write response", + False, + response.HTTPResponse( + status=400, + reason="Bad Request", + body=( + b'{"error":"partial write of line protocol occurred","data":[{"error_message":"bad line",' + b'"line_number":2,"original_line":"home,room=Sunroom temp=\\"hi\\" 1735549200"}]}' + ), + ), + InfluxDBPartialWriteError, + None, + ), + ] + for name, use_v2_api, http_resp, expected_type, expected_message in cases: + with self.subTest(name): + conf = Configuration() + local_client = ApiClient(conf) + local_client.call_api = mock.Mock() + thread = mock.Mock() + thread.get.side_effect = ApiException(http_resp=http_resp) + local_client.call_api.return_value = thread + service = WriteService(local_client) + result = service.post_write_with_http_info( + "TEST_ORG", + "TEST_BUCKET", + "home,room=Sunroom temp=96 1735545600", + async_req=True, + use_v2_api=use_v2_api, + ) + with self.assertRaises(expected_type) as err: + result.get() + if expected_message: + self.assertEqual(expected_message, err.exception.message) + self.assertEqual(expected_message, err.exception.reason) + else: + self.assertEqual(1, len(err.exception.line_errors)) From 23243512f3cbe287efe2009e8c24adcc6e9b3cc6 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 May 2026 09:40:11 +0200 Subject: [PATCH 26/30] fix: translate post_write_async API errors consistently --- .../write_client/service/write_service.py | 36 ++++++++++--------- tests/test_api_client.py | 26 ++++++++++++++ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index d280e6c..6b866a1 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -218,23 +218,27 @@ async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D40 """ # noqa: E501 local_var_params, path, path_params, query_params, header_params, body_params = \ self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501 + use_v2_api = local_var_params['use_v2_api'] if 'use_v2_api' in local_var_params else True - return await self.api_client.call_api( - path, 'POST', - path_params, - query_params, - header_params, - body=body_params, - post_params=[], - files={}, - response_type=None, # noqa: E501 - auth_settings=[], - async_req=local_var_params.get('async_req'), - _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 - _preload_content=local_var_params.get('_preload_content', True), - _request_timeout=local_var_params.get('_request_timeout'), - collection_formats={}, - urlopen_kw=kwargs.get('urlopen_kw', None)) + try: + return await self.api_client.call_api( + path, 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=[], + files={}, + response_type=None, # noqa: E501 + auth_settings=[], + async_req=local_var_params.get('async_req'), + _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 + _preload_content=local_var_params.get('_preload_content', True), + _request_timeout=local_var_params.get('_request_timeout'), + collection_formats={}, + urlopen_kw=kwargs.get('urlopen_kw', None)) + except ApiException as e: + raise self._translate_write_exception(e, use_v2_api) def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 local_var_params = dict(locals()) diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 79eb89c..6d0c520 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -1,3 +1,4 @@ +import asyncio import json import unittest import uuid @@ -406,3 +407,28 @@ def test_post_write_with_http_info_async_translates_exceptions(self): self.assertEqual(expected_message, err.exception.reason) else: self.assertEqual(1, len(err.exception.line_errors)) + + def test_post_write_async_translates_v3_unsupported(self): + conf = Configuration() + local_client = ApiClient(conf) + local_client.call_api = mock.AsyncMock( + side_effect=ApiException( + http_resp=response.HTTPResponse(status=405, reason="Method Not Allowed", body=b"") + ) + ) + service = WriteService(local_client) + + async def run(): + await service.post_write_async( + "TEST_ORG", + "TEST_BUCKET", + "home,room=Sunroom temp=96 1735545600", + use_v2_api=False, + ) + + with self.assertRaises(ApiException) as err: + asyncio.run(run()) + + expected = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.") + self.assertEqual(expected, err.exception.message) From f12a12ae3bb9ddaa66f6a49ccc5bd9cbad48442a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 May 2026 09:56:03 +0200 Subject: [PATCH 27/30] fix: ignore None write kwargs and remove dead no-sync parser --- influxdb_client_3/__init__.py | 10 +++------- tests/test_influxdb_client_3.py | 5 +++++ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 0b6ed9e..36bcd6f 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -172,10 +172,6 @@ def _parse_write_bool(value): return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes'] -def _parse_write_no_sync(write_no_sync: str): - return _parse_write_bool(write_no_sync) - - def _parse_timeout(to: str) -> int: try: timeout = int(to) @@ -268,13 +264,13 @@ def __init__( if kw_keys.__contains__('write_timeout'): write_timeout = kwargs.get('write_timeout') - if kw_keys.__contains__('write_accept_partial'): + if kw_keys.__contains__('write_accept_partial') and kwargs.get('write_accept_partial') is not None: write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial')) - if kw_keys.__contains__('write_use_v2_api'): + if kw_keys.__contains__('write_use_v2_api') and kwargs.get('write_use_v2_api') is not None: write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api')) - if kw_keys.__contains__('write_no_sync'): + if kw_keys.__contains__('write_no_sync') and kwargs.get('write_no_sync') is not None: write_no_sync = _parse_write_bool(kwargs.get('write_no_sync')) write_options = WriteOptions( diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 5079fb0..1183aad 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -383,6 +383,11 @@ def test_write_bool_options_from_constructor_kwargs(self): ('use_v2_api False', {'write_use_v2_api': False}, 'use_v2_api', False), ('accept_partial False', {'write_accept_partial': False}, 'accept_partial', False), ('no_sync True', {'write_no_sync': True}, 'no_sync', True), + ('use_v2_api None keeps default', {'write_use_v2_api': None}, + 'use_v2_api', DefaultWriteOptions.use_v2_api.value), + ('accept_partial None keeps default', {'write_accept_partial': None}, + 'accept_partial', DefaultWriteOptions.accept_partial.value), + ('no_sync None keeps default', {'write_no_sync': None}, 'no_sync', DefaultWriteOptions.no_sync.value), ] for name, kwargs, field, expected in cases: with self.subTest(name): From b92c58e306c9c21ffb933754551e712541aea670 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 May 2026 12:13:13 +0200 Subject: [PATCH 28/30] docs: fix CHANGELOG entries order --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd805ad..4871ddc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,10 @@ ### Features +1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. 1. [#213](https://github.com/InfluxCommunity/influxdb3-python/pull/213): Add partial write support and default writes to the V2 API endpoint. See [Partial writes](https://docs.influxdata.com/influxdb3/core/write-data/http-api/v3-write-lp/#partial-writes) for more. `no_sync` requires `use_v2_api=False`; `accept_partial` applies only to V3 API endpoint writes. -1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. ### Bug Fixes From bcc0459755b2fe3f30052a00e37b9f3432fad014 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 May 2026 12:40:43 +0200 Subject: [PATCH 29/30] docs: align across clients --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bff1f70..66e1a21 100644 --- a/README.md +++ b/README.md @@ -244,7 +244,7 @@ client.write_dataframe( ``` ### Accept partial writes and inspect failed lines -`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines. +`accept_partial` defaults to `True` and allows partial success when writing through the V3 API endpoint (`use_v2_api=False`) and a batch contains invalid lines. On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`. ```python From f716553680838bd97b7cb1c8f9dbc5b6ce55d2b9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 May 2026 14:28:13 +0200 Subject: [PATCH 30/30] test: add missing e2e tests --- tests/test_influxdb_client_3_integration.py | 52 ++++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 118c5df..8d3dffa 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -130,27 +130,55 @@ def test_v3_error(self): "home,room=Sunroom temp=\"hi\" 1735549200", ]) + for accept_partial in [True, False]: + with self.subTest(accept_partial=accept_partial): + with InfluxDBClient3( + host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options(write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=False, + accept_partial=accept_partial + )) + ) as client: + with self.assertRaises(InfluxDBPartialWriteError) as err: + client.write(lp) + + msg = err.exception.message + self.assertTrue( + "partial write of line protocol occurred" in msg or "parsing failed for write_lp endpoint" in msg + ) + self.assertIn(( + "invalid column type for column 'temp', expected iox::column_type::field::float, " + "got iox::column_type::field::string" + ), msg) + self.assertIn("line 2", msg) + self.assertIn("home,room=Sunroom", msg) + + def test_v2_error(self): + lp = "\n".join([ + "home,room=Sunroom temp=96 1735545600", + "home,room=Sunroom temp=\"hi\" 1735549200", + ]) + with InfluxDBClient3( host=self.host, database=self.database, token=self.token, write_client_options=write_client_options(write_options=WriteOptions( write_type=WriteType.synchronous, - use_v2_api=False + use_v2_api=True, + accept_partial=False )) ) as client: - try: + with self.assertRaises(InfluxDBError) as err: client.write(lp) - self.fail("Expected InfluxDBError from invalid line protocol.") - except InfluxDBPartialWriteError as err: - msg = err.message - self.assertIn("partial write of line protocol occurred", msg) - self.assertIn(( - "invalid column type for column 'temp', expected iox::column_type::field::float, " - "got iox::column_type::field::string" - ), msg) - self.assertIn("line 2", msg) - self.assertIn("home,room=Sunroom", msg) + + self.assertNotIsInstance(err.exception, InfluxDBPartialWriteError) + self.assertIsNotNone(err.exception.response) + self.assertEqual(400, err.exception.response.status) + self.assertTrue(err.exception.message) def test_auth_error_token(self): self.client = InfluxDBClient3(host=self.host, database=self.database, token='fake token')