diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index ba28fc608a0c..71b1e5082a6b 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -22,14 +22,21 @@ import logging import os from functools import partial -from typing import TYPE_CHECKING -from typing import Any -from typing import Dict -from typing import Optional -from typing import Union +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Optional, + Union, + Tuple, + Callable, + Iterator, + BinaryIO, +) from apache_beam import typehints from apache_beam.coders import coders +from apache_beam.coders.coders import Coder from apache_beam.io import filebasedsink from apache_beam.io import filebasedsource from apache_beam.io import iobase @@ -41,7 +48,7 @@ from apache_beam.transforms.display import DisplayDataItem if TYPE_CHECKING: - from apache_beam.io import fileio + from apache_beam.io import fileio __all__ = [ 'ReadFromText', @@ -59,502 +66,495 @@ class _TextSource(filebasedsource.FileBasedSource): - r"""A source for reading text files. - - Parses a text file as newline-delimited elements. Supports newline delimiters - '\n' and '\r\n. - - This implementation reads encoded text and uses the input coder's encoding to - decode from bytes to str. This does not support ``UTF-16`` or ``UTF-32`` - encodings. - """ - - DEFAULT_READ_BUFFER_SIZE = 8192 - - class ReadBuffer(object): - # A buffer that gives the buffered data and next position in the - # buffer that should be read. - - def __init__(self, data, position): - self._data = data - self._position = position - - @property - def data(self): - return self._data - - @data.setter - def data(self, value): - assert isinstance(value, bytes) - self._data = value - - @property - def position(self): - return self._position - - @position.setter - def position(self, value): - assert isinstance(value, int) - if value > len(self._data): - raise ValueError( - 'Cannot set position to %d since it\'s larger than ' - 'size of data %d.' % (value, len(self._data))) - self._position = value - - def reset(self): - self.data = b'' - self.position = 0 - - def __init__( - self, - file_pattern, - min_bundle_size, - compression_type, - strip_trailing_newlines, - coder: coders.Coder, - buffer_size=DEFAULT_READ_BUFFER_SIZE, - validate=True, - skip_header_lines=0, - header_processor_fns=(None, None), - delimiter=None, - escapechar=None): - """Initialize a _TextSource - - Args: - header_processor_fns (tuple): a tuple of a `header_matcher` function - and a `header_processor` function. The `header_matcher` should - return `True` for all lines at the start of the file that are part - of the file header and `False` otherwise. These header lines will - not be yielded when reading records and instead passed into - `header_processor` to be handled. If `skip_header_lines` and a - `header_matcher` are both provided, the value of `skip_header_lines` - lines will be skipped and the header will be processed from - there. - delimiter (bytes) Optional: delimiter to split records. - Must not self-overlap, because self-overlapping delimiters cause - ambiguous parsing. - escapechar (bytes) Optional: a single byte to escape the records - delimiter, can also escape itself. - Raises: - ValueError: if skip_lines is negative. - - Please refer to documentation in class `ReadFromText` for the rest - of the arguments. + r"""A source for reading text files. + + Parses a text file as newline-delimited elements. Supports newline delimiters + '\n' and '\r\n. + + This implementation reads encoded text and uses the input coder's encoding to + decode from bytes to str. This does not support ``UTF-16`` or ``UTF-32`` + encodings. """ - super().__init__( - file_pattern, - min_bundle_size, - compression_type=compression_type, - validate=validate) - - self._strip_trailing_newlines = strip_trailing_newlines - self._compression_type = compression_type - self._coder = coder - self._buffer_size = buffer_size - if skip_header_lines < 0: - raise ValueError( - 'Cannot skip negative number of header lines: %d' % skip_header_lines) - elif skip_header_lines > 10: - _LOGGER.warning( - 'Skipping %d header lines. Skipping large number of header ' - 'lines might significantly slow down processing.') - self._skip_header_lines = skip_header_lines - self._header_matcher, self._header_processor = header_processor_fns - if delimiter is not None: - if not isinstance(delimiter, bytes) or len(delimiter) == 0: - raise ValueError('Delimiter must be a non-empty bytes sequence.') - if self._is_self_overlapping(delimiter): - raise ValueError('Delimiter must not self-overlap.') - self._delimiter = delimiter - if escapechar is not None: - if not (isinstance(escapechar, bytes) and len(escapechar) == 1): - raise ValueError( - "escapechar must be bytes of size 1: '%s'" % escapechar) - self._escapechar = escapechar - - def display_data(self): - parent_dd = super().display_data() - parent_dd['strip_newline'] = DisplayDataItem( - self._strip_trailing_newlines, label='Strip Trailing New Lines') - parent_dd['buffer_size'] = DisplayDataItem( - self._buffer_size, label='Buffer Size') - parent_dd['coder'] = DisplayDataItem(self._coder.__class__, label='Coder') - return parent_dd - - def read_records(self, file_name, range_tracker): - start_offset = range_tracker.start_position() - read_buffer = _TextSource.ReadBuffer(b'', 0) - - next_record_start_position = -1 - - def split_points_unclaimed(stop_position): - return ( - 0 if stop_position <= next_record_start_position else - iobase.RangeTracker.SPLIT_POINTS_UNKNOWN) - - range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) - - with self.open_file(file_name) as file_to_read: - position_after_processing_header_lines = ( - self._process_header(file_to_read, read_buffer)) - start_offset = max(start_offset, position_after_processing_header_lines) - if start_offset > position_after_processing_header_lines: - # Seeking to one delimiter length before the start index and ignoring - # the current line. If start_position is at beginning of the line, that - # line belongs to the current bundle, hence ignoring that is incorrect. - # Seeking to one delimiter before prevents that. - - if self._delimiter is not None and start_offset >= len(self._delimiter): - required_position = start_offset - len(self._delimiter) - else: - required_position = start_offset - 1 - - if self._escapechar is not None: - # Need more bytes to check if the delimiter is escaped. - # Seek until the first escapechar if any. - while required_position > 0: - file_to_read.seek(required_position - 1) - if file_to_read.read(1) == self._escapechar: - required_position -= 1 + + DEFAULT_READ_BUFFER_SIZE = 8192 + + class ReadBuffer(object): + def __init__(self, data: bytes, position: int) -> None: + self._data = data + self._position = position + + @property + def data(self) -> bytes: + return self._data + + @data.setter + def data(self, value: bytes) -> None: + assert isinstance(value, bytes) + self._data = value + + @property + def position(self) -> int: + return self._position + + @position.setter + def position(self, value: int) -> None: + assert isinstance(value, int) + if value > len(self._data): + raise ValueError( + f"Cannot set position to {value} since it's larger than size of data {len(self._data)}." + ) + self._position = value + + def reset(self) -> None: + self.data = b'' + self.position = 0 + + def __init__( + self, + file_pattern: Optional[str], + min_bundle_size: Optional[int], + compression_type: CompressionTypes, + strip_trailing_newlines: bool, + coder: Coder, + buffer_size: int = DEFAULT_READ_BUFFER_SIZE, + validate: bool = True, + skip_header_lines: int = 0, + header_processor_fns: Tuple[Optional[Callable[..., Any]], Optional[Callable[..., Any]]] = (None, None), + delimiter: Optional[bytes] = None, + escapechar: Optional[bytes] = None, + ) -> None: + """Initialize a _TextSource + + Args: + header_processor_fns (tuple): a tuple of a `header_matcher` function + and a `header_processor` function. The `header_matcher` should + return `True` for all lines at the start of the file that are part + of the file header and `False` otherwise. These header lines will + not be yielded when reading records and instead passed into + `header_processor` to be handled. If `skip_header_lines` and a + `header_matcher` are both provided, the value of `skip_header_lines` + lines will be skipped and the header will be processed from + there. + delimiter (bytes) Optional: delimiter to split records. + Must not self-overlap, because self-overlapping delimiters cause + ambiguous parsing. + escapechar (bytes) Optional: a single byte to escape the records + delimiter, can also escape itself. + Raises: + ValueError: if skip_lines is negative. + + Please refer to documentation in class `ReadFromText` for the rest + of the arguments. + """ + super().__init__( + file_pattern, + min_bundle_size, + compression_type=compression_type, + validate=validate, + ) + + self._strip_trailing_newlines = strip_trailing_newlines + self._compression_type = compression_type + self._coder = coder + self._buffer_size = buffer_size + if skip_header_lines < 0: + raise ValueError( + 'Cannot skip negative number of header lines: %d' % skip_header_lines + ) + elif skip_header_lines > 10: + _LOGGER.warning( + 'Skipping %d header lines. Skipping large number of header ' + 'lines might significantly slow down processing.' + ) + self._skip_header_lines = skip_header_lines + self._header_matcher, self._header_processor = header_processor_fns + if delimiter is not None: + if not isinstance(delimiter, bytes) or len(delimiter) == 0: + raise ValueError('Delimiter must be a non-empty bytes sequence.') + if self._is_self_overlapping(delimiter): + raise ValueError('Delimiter must not self-overlap.') + self._delimiter = delimiter + if escapechar is not None: + if not (isinstance(escapechar, bytes) and len(escapechar) == 1): + raise ValueError("escapechar must be bytes of size 1: '%s'" % escapechar) + self._escapechar = escapechar + + def display_data(self): + parent_dd = super().display_data() + parent_dd['strip_newline'] = DisplayDataItem(self._strip_trailing_newlines, label='Strip Trailing New Lines') + parent_dd['buffer_size'] = DisplayDataItem(self._buffer_size, label='Buffer Size') + parent_dd['coder'] = DisplayDataItem(self._coder.__class__, label='Coder') + return parent_dd + + def read_records(self, file_name: str, range_tracker: iobase.RangeTracker) -> Iterator[Any]: + start_offset = range_tracker.start_position() + read_buffer = _TextSource.ReadBuffer(b'', 0) + + next_record_start_position = -1 + + def split_points_unclaimed(stop_position): + return ( + 0 if stop_position <= next_record_start_position else iobase.RangeTracker.SPLIT_POINTS_UNKNOWN + ) + + range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) + + with self.open_file(file_name) as file_to_read: + position_after_processing_header_lines = self._process_header(file_to_read, read_buffer) + start_offset = max(start_offset, position_after_processing_header_lines) + if start_offset > position_after_processing_header_lines: + # Seeking to one delimiter length before the start index and ignoring + # the current line. If start_position is at beginning of the line, that + # line belongs to the current bundle, hence ignoring that is incorrect. + # Seeking to one delimiter before prevents that. + + if self._delimiter is not None and start_offset >= len(self._delimiter): + required_position = start_offset - len(self._delimiter) + else: + required_position = start_offset - 1 + + if self._escapechar is not None: + # Need more bytes to check if the delimiter is escaped. + # Seek until the first escapechar if any. + while required_position > 0: + file_to_read.seek(required_position - 1) + if file_to_read.read(1) == self._escapechar: + required_position -= 1 + else: + break + + file_to_read.seek(required_position) + read_buffer.reset() + sep_bounds = self._find_separator_bounds(file_to_read, read_buffer) + if not sep_bounds: + # Could not find a delimiter after required_position. This means that + # none of the records within the file belongs to the current source. + return + + _, sep_end = sep_bounds + read_buffer.data = read_buffer.data[sep_end:] + next_record_start_position = required_position + sep_end else: - break + next_record_start_position = position_after_processing_header_lines + + while range_tracker.try_claim(next_record_start_position): + record, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer) + # For compressed text files that use an unsplittable OffsetRangeTracker + # with infinity as the end position, above 'try_claim()' invocation + # would pass for an empty record at the end of file that is not + # followed by a new line character. Since such a record is at the last + # position of a file, it should not be a part of the considered range. + # We do this check to ignore such records. + if len(record) == 0 and num_bytes_to_next_record < 0: # pylint: disable=len-as-condition + break + + # Record delimiter must be larger than zero bytes. + assert num_bytes_to_next_record != 0 + if num_bytes_to_next_record > 0: + next_record_start_position += num_bytes_to_next_record + + yield self._coder.decode(record) + if num_bytes_to_next_record < 0: + break + + def _process_header(self, file_to_read: BinaryIO, read_buffer: " _TextSource.ReadBuffer") -> int: + # Returns a tuple containing the position in file after processing header + # records and a list of decoded header lines that match + # 'header_matcher'. + header_lines = [] + position = self._skip_lines(file_to_read, read_buffer, self._skip_header_lines) if self._skip_header_lines else 0 + if self._header_matcher: + while True: + record, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer) + decoded_line = self._coder.decode(record) + if not self._header_matcher(decoded_line): + # We've read past the header section at this point, so go back a line. + file_to_read.seek(position) + read_buffer.reset() + break + header_lines.append(decoded_line) + if num_bytes_to_next_record < 0: + break + position += num_bytes_to_next_record + + if self._header_processor: + self._header_processor(header_lines) + + return position + + def _find_separator_bounds( + self, file_to_read: BinaryIO, read_buffer: " _TextSource.ReadBuffer" + ) -> Optional[Tuple[int, int]]: + # Determines the start and end positions within 'read_buffer.data' of the + # next delimiter starting from position 'read_buffer.position'. + # Use the custom delimiter to be used in place of + # the default ones ('\n' or '\r\n')' + # This method may increase the size of buffer but it will not decrease the + # size of it. + + current_pos = read_buffer.position + + # b'\n' use as default + delimiter = self._delimiter or b'\n' + delimiter_len = len(delimiter) + + while True: + if current_pos >= len(read_buffer.data) - delimiter_len + 1: + # Ensuring that there are enough bytes to determine + # at current_pos. + if not self._try_to_ensure_num_bytes_in_buffer(file_to_read, read_buffer, current_pos + delimiter_len): + return None + + # Using find() here is more efficient than a linear scan + # of the byte array. + next_delim = read_buffer.data.find(delimiter, current_pos) + + if next_delim >= 0: + if (self._delimiter is None and read_buffer.data[next_delim - 1:next_delim] == b'\r'): + if self._escapechar is not None and self._is_escaped(read_buffer, next_delim - 1): + # Accept '\n' as a default delimiter, because '\r' is escaped. + return (next_delim, next_delim + 1) + else: + # Accept both '\r\n' and '\n' as a default delimiter. + return (next_delim - 1, next_delim + 1) + else: + if self._escapechar is not None and self._is_escaped(read_buffer, next_delim): + # Skip an escaped delimiter. + current_pos = next_delim + delimiter_len + 1 + continue + else: + # Found a delimiter. Accepting that as the next delimiter. + return (next_delim, next_delim + delimiter_len) + + elif self._delimiter is not None: + # Corner case: custom delimiter is truncated at the end of the buffer. + next_delim = read_buffer.data.find(delimiter[0], len(read_buffer.data) - delimiter_len + 1) + if next_delim >= 0: + # Delimiters longer than 1 byte may cross the buffer boundary. + # Defer full matching till the next iteration. + current_pos = next_delim + continue + + current_pos = len(read_buffer.data) + + def _try_to_ensure_num_bytes_in_buffer( + self, file_to_read: BinaryIO, read_buffer: " _TextSource.ReadBuffer", num_bytes: int + ) -> bool: + # Tries to ensure that there are at least num_bytes bytes in the buffer. + # Returns True if this can be fulfilled, returned False if this cannot be + # fulfilled due to reaching EOF. + while len(read_buffer.data) < num_bytes: + read_data = file_to_read.read(self._buffer_size) + if not read_data: + return False + + read_buffer.data += read_data + + return True - file_to_read.seek(required_position) - read_buffer.reset() + def _skip_lines(self, file_to_read: BinaryIO, read_buffer: " _TextSource.ReadBuffer", num_lines: int) -> int: + """Skip num_lines from file_to_read, return num_lines+1 start position.""" + if file_to_read.tell() > 0: + file_to_read.seek(0) + position = 0 + for _ in range(num_lines): + _, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer) + if num_bytes_to_next_record < 0: + # We reached end of file. It is OK to just break here + # because subsequent _read_record will return same result. + break + position += num_bytes_to_next_record + return position + + def _read_record(self, file_to_read: BinaryIO, read_buffer: " _TextSource.ReadBuffer") -> Tuple[bytes, int]: + # Returns a tuple containing the current_record and number of bytes to the + # next record starting from 'read_buffer.position'. If EOF is + # reached, returns a tuple containing the current record and -1. + + if read_buffer.position > self._buffer_size: + # read_buffer is too large. Truncating and adjusting it. + read_buffer.data = read_buffer.data[read_buffer.position:] + read_buffer.position = 0 + + record_start_position_in_buffer = read_buffer.position sep_bounds = self._find_separator_bounds(file_to_read, read_buffer) + read_buffer.position = sep_bounds[1] if sep_bounds else len(read_buffer.data) + if not sep_bounds: - # Could not find a delimiter after required_position. This means that - # none of the records within the file belongs to the current source. - return - - _, sep_end = sep_bounds - read_buffer.data = read_buffer.data[sep_end:] - next_record_start_position = required_position + sep_end - else: - next_record_start_position = position_after_processing_header_lines - - while range_tracker.try_claim(next_record_start_position): - record, num_bytes_to_next_record = self._read_record(file_to_read, - read_buffer) - # For compressed text files that use an unsplittable OffsetRangeTracker - # with infinity as the end position, above 'try_claim()' invocation - # would pass for an empty record at the end of file that is not - # followed by a new line character. Since such a record is at the last - # position of a file, it should not be a part of the considered range. - # We do this check to ignore such records. - if len(record) == 0 and num_bytes_to_next_record < 0: # pylint: disable=len-as-condition - break - - # Record delimiter must be larger than zero bytes. - assert num_bytes_to_next_record != 0 - if num_bytes_to_next_record > 0: - next_record_start_position += num_bytes_to_next_record - - yield self._coder.decode(record) - if num_bytes_to_next_record < 0: - break - - def _process_header(self, file_to_read, read_buffer): - # Returns a tuple containing the position in file after processing header - # records and a list of decoded header lines that match - # 'header_matcher'. - header_lines = [] - position = self._skip_lines( - file_to_read, read_buffer, - self._skip_header_lines) if self._skip_header_lines else 0 - if self._header_matcher: - while True: - record, num_bytes_to_next_record = self._read_record(file_to_read, - read_buffer) - decoded_line = self._coder.decode(record) - if not self._header_matcher(decoded_line): - # We've read past the header section at this point, so go back a line. - file_to_read.seek(position) - read_buffer.reset() - break - header_lines.append(decoded_line) - if num_bytes_to_next_record < 0: - break - position += num_bytes_to_next_record - - if self._header_processor: - self._header_processor(header_lines) - - return position - - def _find_separator_bounds(self, file_to_read, read_buffer): - # Determines the start and end positions within 'read_buffer.data' of the - # next delimiter starting from position 'read_buffer.position'. - # Use the custom delimiter to be used in place of - # the default ones ('\n' or '\r\n')' - # This method may increase the size of buffer but it will not decrease the - # size of it. - - current_pos = read_buffer.position - - # b'\n' use as default - delimiter = self._delimiter or b'\n' - delimiter_len = len(delimiter) - - while True: - if current_pos >= len(read_buffer.data) - delimiter_len + 1: - # Ensuring that there are enough bytes to determine - # at current_pos. - if not self._try_to_ensure_num_bytes_in_buffer( - file_to_read, read_buffer, current_pos + delimiter_len): - return - - # Using find() here is more efficient than a linear scan - # of the byte array. - next_delim = read_buffer.data.find(delimiter, current_pos) - - if next_delim >= 0: - if (self._delimiter is None and - read_buffer.data[next_delim - 1:next_delim] == b'\r'): - if self._escapechar is not None and self._is_escaped(read_buffer, - next_delim - 1): - # Accept '\n' as a default delimiter, because '\r' is escaped. - return (next_delim, next_delim + 1) - else: - # Accept both '\r\n' and '\n' as a default delimiter. - return (next_delim - 1, next_delim + 1) + # Reached EOF. Bytes up to the EOF is the next record. Returning '-1' for + # the starting position of the next record. + return (read_buffer.data[record_start_position_in_buffer:], -1) + + if self._strip_trailing_newlines: + # Current record should not contain the delimiter. + return ( + read_buffer.data[record_start_position_in_buffer:sep_bounds[0]], + sep_bounds[1] - record_start_position_in_buffer, + ) else: - if self._escapechar is not None and self._is_escaped(read_buffer, - next_delim): - # Skip an escaped delimiter. - current_pos = next_delim + delimiter_len + 1 - continue - else: - # Found a delimiter. Accepting that as the next delimiter. - return (next_delim, next_delim + delimiter_len) - - elif self._delimiter is not None: - # Corner case: custom delimiter is truncated at the end of the buffer. - next_delim = read_buffer.data.find( - delimiter[0], len(read_buffer.data) - delimiter_len + 1) - if next_delim >= 0: - # Delimiters longer than 1 byte may cross the buffer boundary. - # Defer full matching till the next iteration. - current_pos = next_delim - continue - - current_pos = len(read_buffer.data) - - def _try_to_ensure_num_bytes_in_buffer( - self, file_to_read, read_buffer, num_bytes): - # Tries to ensure that there are at least num_bytes bytes in the buffer. - # Returns True if this can be fulfilled, returned False if this cannot be - # fulfilled due to reaching EOF. - while len(read_buffer.data) < num_bytes: - read_data = file_to_read.read(self._buffer_size) - if not read_data: + # Current record should contain the delimiter. + return ( + read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], + sep_bounds[1] - record_start_position_in_buffer, + ) + + @staticmethod + def _is_self_overlapping(delimiter: bytes) -> bool: + # A delimiter self-overlaps if it has a prefix that is also its suffix. + for i in range(1, len(delimiter)): + if delimiter[0:i] == delimiter[len(delimiter) - i:]: + return True return False - read_buffer.data += read_data - - return True - - def _skip_lines(self, file_to_read, read_buffer, num_lines): - """Skip num_lines from file_to_read, return num_lines+1 start position.""" - if file_to_read.tell() > 0: - file_to_read.seek(0) - position = 0 - for _ in range(num_lines): - _, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer) - if num_bytes_to_next_record < 0: - # We reached end of file. It is OK to just break here - # because subsequent _read_record will return same result. - break - position += num_bytes_to_next_record - return position - - def _read_record(self, file_to_read, read_buffer): - # Returns a tuple containing the current_record and number of bytes to the - # next record starting from 'read_buffer.position'. If EOF is - # reached, returns a tuple containing the current record and -1. - - if read_buffer.position > self._buffer_size: - # read_buffer is too large. Truncating and adjusting it. - read_buffer.data = read_buffer.data[read_buffer.position:] - read_buffer.position = 0 - - record_start_position_in_buffer = read_buffer.position - sep_bounds = self._find_separator_bounds(file_to_read, read_buffer) - read_buffer.position = sep_bounds[1] if sep_bounds else len( - read_buffer.data) - - if not sep_bounds: - # Reached EOF. Bytes up to the EOF is the next record. Returning '-1' for - # the starting position of the next record. - return (read_buffer.data[record_start_position_in_buffer:], -1) - - if self._strip_trailing_newlines: - # Current record should not contain the delimiter. - return ( - read_buffer.data[record_start_position_in_buffer:sep_bounds[0]], - sep_bounds[1] - record_start_position_in_buffer) - else: - # Current record should contain the delimiter. - return ( - read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], - sep_bounds[1] - record_start_position_in_buffer) - - @staticmethod - def _is_self_overlapping(delimiter): - # A delimiter self-overlaps if it has a prefix that is also its suffix. - for i in range(1, len(delimiter)): - if delimiter[0:i] == delimiter[len(delimiter) - i:]: - return True - return False - - def _is_escaped(self, read_buffer, position): - # Returns True if byte at position is preceded with an odd number - # of escapechar bytes or False if preceded by 0 or even escapes - # (the even number means that all the escapes are escaped themselves). - escape_count = 0 - for current_pos in reversed(range(0, position)): - if read_buffer.data[current_pos:current_pos + 1] != self._escapechar: - break - escape_count += 1 - return escape_count % 2 == 1 - - def output_type_hint(self): - try: - return self._coder.to_type_hint() - except NotImplementedError: - return Any + def _is_escaped(self, read_buffer: " _TextSource.ReadBuffer", position: int) -> bool: + # Returns True if byte at position is preceded with an odd number + # of escapechar bytes or False if preceded by 0 or even escapes + # (the even number means that all the escapes are escaped themselves). + escape_count = 0 + for current_pos in reversed(range(0, position)): + if read_buffer.data[current_pos:current_pos + 1] != self._escapechar: + break + escape_count += 1 + return escape_count % 2 == 1 + + def output_type_hint(self) -> Any: + try: + return self._coder.to_type_hint() + except NotImplementedError: + return Any class _TextSourceWithFilename(_TextSource): - def read_records(self, file_name, range_tracker): - records = super().read_records(file_name, range_tracker) - for record in records: - yield (file_name, record) + def read_records(self, file_name: str, range_tracker: iobase.RangeTracker) -> Iterator[Tuple[str, Any]]: + records = super().read_records(file_name, range_tracker) + for record in records: + yield (file_name, record) - def output_type_hint(self): - return typehints.KV[str, super().output_type_hint()] + def output_type_hint(self) -> Any: + return typehints.KV[str, super().output_type_hint()] class _TextSink(filebasedsink.FileBasedSink): - """A sink to a GCS or local text file or files.""" - def __init__( - self, - file_path_prefix, - file_name_suffix='', - append_trailing_newlines=True, - num_shards=0, - shard_name_template=None, - coder: coders.Coder = coders.ToBytesCoder(), - compression_type=CompressionTypes.AUTO, - header=None, - footer=None, - *, - max_records_per_shard=None, - max_bytes_per_shard=None, - skip_if_empty=False, - triggering_frequency=None): - """Initialize a _TextSink. - - Args: - file_path_prefix: The file path to write to. The files written will begin - with this prefix, followed by a shard identifier (see num_shards), and - end in a common extension, if given by file_name_suffix. In most cases, - only this argument is specified and num_shards, shard_name_template, and - file_name_suffix use default values. - file_name_suffix: Suffix for the files written. - append_trailing_newlines: indicate whether this sink should write an - additional newline char after writing each element. - num_shards: The number of files (shards) used for output. If not set, the - service will decide on the optimal number of shards. - Constraining the number of shards is likely to reduce - the performance of a pipeline. Setting this value is not recommended - unless you require a specific number of output files. - In streaming if not set, the service will write a file per bundle. - shard_name_template: A template string containing placeholders for - the shard number and shard count. Currently only ``''``, - ``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and - ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service. - When constructing a filename for a particular shard number, the - upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded - shard number and shard count respectively. This argument can be ``''`` - in which case it behaves as if num_shards was set to 1 and only one file - will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'`` for - bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded - PCollections. - W is used for windowed shard naming and is replaced with - ``[window.start, window.end)`` - V is used for windowed shard naming and is replaced with - ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"), - window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")`` - coder: Coder used to encode each line. - compression_type: Used to handle compressed output files. Typical value - is CompressionTypes.AUTO, in which case the final file path's - extension (as determined by file_path_prefix, file_name_suffix, - num_shards and shard_name_template) will be used to detect the - compression. - header: String to write at beginning of file as a header. If not None and - append_trailing_newlines is set, '\n' will be added. - footer: String to write at the end of file as a footer. If not None and - append_trailing_newlines is set, '\n' will be added. - max_records_per_shard: Maximum number of records to write to any - individual shard. - max_bytes_per_shard: Target maximum number of bytes to write to any - individual shard. This may be exceeded slightly, as a new shard is - created once this limit is hit, but the remainder of a given record, a - subsequent newline, and a footer may cause the actual shard size - to exceed this value. This also tracks the uncompressed, - not compressed, size of the shard. - skip_if_empty: Don't write any shards if the PCollection is empty. - triggering_frequency: (int) Every triggering_frequency duration, a window - will be triggered and all bundles in the window will be written. - If set it overrides user windowing. Mandatory for GlobalWindow. - - - Returns: - A _TextSink object usable for writing. - """ - super().__init__( + """A sink to a GCS or local text file or files.""" + + def __init__( + self, file_path_prefix, - file_name_suffix=file_name_suffix, - num_shards=num_shards, - shard_name_template=shard_name_template, - coder=coder, - mime_type='text/plain', - compression_type=compression_type, - max_records_per_shard=max_records_per_shard, - max_bytes_per_shard=max_bytes_per_shard, - skip_if_empty=skip_if_empty, - triggering_frequency=triggering_frequency) - self._append_trailing_newlines = append_trailing_newlines - self._header = header - self._footer = footer - - def open(self, temp_path): - file_handle = super().open(temp_path) - if self._header is not None: - file_handle.write(coders.ToBytesCoder().encode(self._header)) - if self._append_trailing_newlines: - file_handle.write(b'\n') - return file_handle - - def close(self, file_handle): - if self._footer is not None: - file_handle.write(coders.ToBytesCoder().encode(self._footer)) - if self._append_trailing_newlines: - file_handle.write(b'\n') - super().close(file_handle) - - def display_data(self): - dd_parent = super().display_data() - dd_parent['append_newline'] = DisplayDataItem( - self._append_trailing_newlines, label='Append Trailing New Lines') - return dd_parent - - def write_encoded_record(self, file_handle, encoded_value): - """Writes a single encoded record.""" - file_handle.write(encoded_value) - if self._append_trailing_newlines: - file_handle.write(b'\n') + file_name_suffix='', + append_trailing_newlines=True, + num_shards=0, + shard_name_template=None, + coder: coders.Coder = coders.ToBytesCoder(), + compression_type=CompressionTypes.AUTO, + header=None, + footer=None, + *, + max_records_per_shard=None, + max_bytes_per_shard=None, + skip_if_empty=False, + triggering_frequency=None, + ): + """Initialize a _TextSink. + + Args: + file_path_prefix: The file path to write to. The files written will begin + with this prefix, followed by a shard identifier (see num_shards), and + end in a common extension, if given by file_name_suffix. In most cases, + only this argument is specified and num_shards, shard_name_template, and + file_name_suffix use default values. + file_name_suffix: Suffix for the files written. + append_trailing_newlines: indicate whether this sink should write an + additional newline char after writing each element. + num_shards: The number of files (shards) used for output. If not set, the + service will decide on the optimal number of shards. + Constraining the number of shards is likely to reduce + the performance of a pipeline. Setting this value is not recommended + unless you require a specific number of output files. + In streaming if not set, the service will write a file per bundle. + shard_name_template: A template string containing placeholders for + the shard number and shard count. Currently only ``''``, ``'-SSSSS-of-NNNNN'``, + ``'-W-SSSSS-of-NNNNN'`` and ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service. + When constructing a filename for a particular shard number, the + upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded + shard number and shard count respectively. This argument can be ``''`` + in which case it behaves as if num_shards was set to 1 and only one file + will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'`` for + bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded + PCollections. + W is used for windowed shard naming and is replaced with + ``[window.start, window.end)`` + V is used for windowed shard naming and is replaced with + ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"), + window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")`` + + coder: Coder used to encode each line. + compression_type: Used to handle compressed output files. Typical value + is CompressionTypes.AUTO, in which case the final file path's + extension (as determined by file_path_prefix, file_name_suffix, + num_shards and shard_name_template) will be used to detect the + compression. + header: String to write at beginning of file as a header. If not None and + append_trailing_newlines is set, '\n' will be added. + footer: String to write at the end of file as a footer. If not None and + append_trailing_newlines is set, '\n' will be added. + max_records_per_shard: Maximum number of records to write to any + individual shard. + max_bytes_per_shard: Target maximum number of bytes to write to any + individual shard. This may be exceeded slightly, as a new shard is + created once this limit is hit, but the remainder of a given record, a + subsequent newline, and a footer may cause the actual shard size + to exceed this value. This also tracks the uncompressed, + not compressed, size of the shard. + skip_if_empty: Don't write any shards if the PCollection is empty. + triggering_frequency: (int) Every triggering_frequency duration, a window + will be triggered and all bundles in the window will be written. + If set it overrides user windowing. Mandatory for GlobalWindow. + + Returns: + A _TextSink object usable for writing. + """ + super().__init__( + file_path_prefix, + file_name_suffix=file_name_suffix, + num_shards=num_shards, + shard_name_template=shard_name_template, + coder=coder, + mime_type='text/plain', + compression_type=compression_type, + max_records_per_shard=max_records_per_shard, + max_bytes_per_shard=max_bytes_per_shard, + skip_if_empty=skip_if_empty, + triggering_frequency=triggering_frequency, + ) + self._append_trailing_newlines = append_trailing_newlines + self._header = header + self._footer = footer + + def open(self, temp_path): + file_handle = super().open(temp_path) + if self._header is not None: + file_handle.write(coders.ToBytesCoder().encode(self._header)) + if self._append_trailing_newlines: + file_handle.write(b'\n') + return file_handle + + def close(self, file_handle): + if self._footer is not None: + file_handle.write(coders.ToBytesCoder().encode(self._footer)) + if self._append_trailing_newlines: + file_handle.write(b'\n') + super().close(file_handle) + + def display_data(self): + dd_parent = super().display_data() + dd_parent['append_newline'] = DisplayDataItem(self._append_trailing_newlines, label='Append Trailing New Lines') + return dd_parent + + def write_encoded_record(self, file_handle, encoded_value): + """Writes a single encoded record.""" + file_handle.write(encoded_value) + if self._append_trailing_newlines: + file_handle.write(b'\n') def _create_text_source( @@ -566,552 +566,332 @@ def _create_text_source( validate=False, skip_header_lines=None, delimiter=None, - escapechar=None): - return _TextSource( - file_pattern=file_pattern, - min_bundle_size=min_bundle_size, - compression_type=compression_type, - strip_trailing_newlines=strip_trailing_newlines, - coder=coder, - validate=validate, - skip_header_lines=skip_header_lines, - delimiter=delimiter, - escapechar=escapechar) - - -class ReadAllFromText(PTransform): - """A ``PTransform`` for reading a ``PCollection`` of text files. - - Reads a ``PCollection`` of text files or file patterns and produces a - ``PCollection`` of strings. - - Parses a text file as newline-delimited elements, by default assuming - UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'. - - If `with_filename` is ``True`` the output will include the file name. This is - similar to ``ReadFromTextWithFilename`` but this ``PTransform`` can be placed - anywhere in the pipeline. - - If reading from a text file that that requires a different encoding, you may - provide a custom :class:`~apache_beam.coders.coders.Coder` that encodes and - decodes with the appropriate codec. For example, see the implementation of - :class:`~apache_beam.coders.coders.StrUtf8Coder`. - - This does not support ``UTF-16`` or ``UTF-32`` encodings. - - This implementation is only tested with batch pipeline. In streaming, - reading may happen with delay due to the limitation in ReShuffle involved. - """ - DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB - - def __init__( - self, - min_bundle_size=0, - desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, - compression_type=CompressionTypes.AUTO, - strip_trailing_newlines=True, - validate=False, - coder: coders.Coder = coders.StrUtf8Coder(), - skip_header_lines=0, - with_filename=False, - delimiter=None, - escapechar=None, - **kwargs): - """Initialize the ``ReadAllFromText`` transform. - - Args: - min_bundle_size: Minimum size of bundles that should be generated when - splitting this source into bundles. See ``FileBasedSource`` for more - details. - desired_bundle_size: Desired size of bundles that should be generated when - splitting this source into bundles. See ``FileBasedSource`` for more - details. - compression_type: Used to handle compressed input files. Typical value - is ``CompressionTypes.AUTO``, in which case the underlying file_path's - extension will be used to detect the compression. - strip_trailing_newlines: Indicates whether this source should remove - the newline char in each line it reads before decoding that line. - validate: flag to verify that the files exist during the pipeline - creation time. - skip_header_lines: Number of header lines to skip. Same number is skipped - from each source file. Must be 0 or higher. Large number of skipped - lines might impact performance. - coder: Coder used to decode each line. - with_filename: If True, returns a Key Value with the key being the file - name and the value being the actual data. If False, it only returns - the data. - delimiter (bytes) Optional: delimiter to split records. - Must not self-overlap, because self-overlapping delimiters cause - ambiguous parsing. - escapechar (bytes) Optional: a single byte to escape the records - delimiter, can also escape itself. - """ - super().__init__(**kwargs) - self._source_from_file = partial( - _create_text_source, + escapechar=None, +): + return _TextSource( + file_pattern=file_pattern, min_bundle_size=min_bundle_size, compression_type=compression_type, strip_trailing_newlines=strip_trailing_newlines, - validate=validate, coder=coder, + validate=validate, skip_header_lines=skip_header_lines, delimiter=delimiter, - escapechar=escapechar) - self._desired_bundle_size = desired_bundle_size - self._min_bundle_size = min_bundle_size - self._compression_type = compression_type - self._with_filename = with_filename - self._read_all_files = ReadAllFiles( - True, - self._compression_type, - self._desired_bundle_size, - self._min_bundle_size, - self._source_from_file, - self._with_filename) - - def expand(self, pvalue): - return pvalue | 'ReadAllFiles' >> self._read_all_files + escapechar=escapechar, + ) -class ReadAllFromTextContinuously(ReadAllFromText): - """A ``PTransform`` for reading text files in given file patterns. - This PTransform acts as a Source and produces continuously a ``PCollection`` - of strings. - - For more details, see ``ReadAllFromText`` for text parsing settings; - see ``apache_beam.io.fileio.MatchContinuously`` for watching settings. - - ReadAllFromTextContinuously is experimental. No backwards-compatibility - guarantees. Due to the limitation on Reshuffle, current implementation does - not scale. - """ - _ARGS_FOR_MATCH = ( - 'interval', - 'has_deduplication', - 'start_timestamp', - 'stop_timestamp', - 'match_updated_files', - 'apply_windowing') - _ARGS_FOR_READ = ( - 'min_bundle_size', - 'desired_bundle_size', - 'compression_type', - 'strip_trailing_newlines', - 'validate', - 'coder', - 'skip_header_lines', - 'with_filename', - 'delimiter', - 'escapechar') - - def __init__(self, file_pattern, **kwargs): - """Initialize the ``ReadAllFromTextContinuously`` transform. - - Accepts args for constructor args of both :class:`ReadAllFromText` and - :class:`~apache_beam.io.fileio.MatchContinuously`. - """ - kwargs_for_match = { - k: v - for (k, v) in kwargs.items() if k in self._ARGS_FOR_MATCH - } - kwargs_for_read = { - k: v - for (k, v) in kwargs.items() if k in self._ARGS_FOR_READ - } - kwargs_additinal = { - k: v - for (k, v) in kwargs.items() - if k not in self._ARGS_FOR_MATCH and k not in self._ARGS_FOR_READ - } - super().__init__(**kwargs_for_read, **kwargs_additinal) - self._file_pattern = file_pattern - self._kwargs_for_match = kwargs_for_match - - def expand(self, pbegin): - # Importing locally to prevent circular dependency issues. - from apache_beam.io.fileio import MatchContinuously - - # TODO(BEAM-14497) always reshuffle once gbk always trigger works. - return ( - pbegin - | MatchContinuously(self._file_pattern, **self._kwargs_for_match) - | 'ReadAllFiles' >> self._read_all_files._disable_reshuffle()) +class ReadAllFromText(PTransform): + """A ``PTransform`` for reading a ``PCollection`` of text files. + Reads a ``PCollection`` of text files or file patterns and produces a + ``PCollection`` of strings. -class ReadFromText(PTransform): - r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text - files. - - Parses a text file as newline-delimited elements, by default assuming - ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` - or specified delimiter. - - If reading from a text file that that requires a different encoding, you may - provide a custom :class:`~apache_beam.coders.coders.Coder` that encodes and - decodes with the appropriate codec. For example, see the implementation of - :class:`~apache_beam.coders.coders.StrUtf8Coder`. - - This does not support ``UTF-16`` or ``UTF-32`` encodings. - """ - - _source_class = _TextSource - - def __init__( - self, - file_pattern=None, - min_bundle_size=0, - compression_type=CompressionTypes.AUTO, - strip_trailing_newlines=True, - coder: coders.Coder = coders.StrUtf8Coder(), - validate=True, - skip_header_lines=0, - delimiter=None, - escapechar=None, - **kwargs): - """Initialize the :class:`ReadFromText` transform. - - Args: - file_pattern (str): The file path to read from as a local file path or a - GCS ``gs://`` path. The path can contain glob characters - (``*``, ``?``, and ``[...]`` sets). - min_bundle_size (int): Minimum size of bundles that should be generated - when splitting this source into bundles. See - :class:`~apache_beam.io.filebasedsource.FileBasedSource` for more - details. - compression_type (str): Used to handle compressed input files. - Typical value is :attr:`CompressionTypes.AUTO - `, in which case the - underlying file_path's extension will be used to detect the compression. - strip_trailing_newlines (bool): Indicates whether this source should - remove the newline char in each line it reads before decoding that line. - validate (bool): flag to verify that the files exist during the pipeline - creation time. - skip_header_lines (int): Number of header lines to skip. Same number is - skipped from each source file. Must be 0 or higher. Large number of - skipped lines might impact performance. - coder (~apache_beam.coders.coders.Coder): Coder used to decode each line. - delimiter (bytes) Optional: delimiter to split records. - Must not self-overlap, because self-overlapping delimiters cause - ambiguous parsing. - escapechar (bytes) Optional: a single byte to escape the records - delimiter, can also escape itself. - """ + Parses a text file as newline-delimited elements, by default assuming + UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'. - super().__init__(**kwargs) - if file_pattern: - try: - if not os.path.dirname(file_pattern): - file_pattern = os.path.join('.', file_pattern) - except TypeError: - pass - - self._source = self._source_class( - file_pattern, - min_bundle_size, - compression_type, - strip_trailing_newlines, - coder, - validate=validate, - skip_header_lines=skip_header_lines, - delimiter=delimiter, - escapechar=escapechar) + If `with_filename` is ``True`` the output will include the file name. This is + similar to ``ReadFromTextWithFilename`` but this ``PTransform`` can be placed + anywhere in the pipeline. - def expand(self, pvalue): - return pvalue.pipeline | Read(self._source).with_output_types( - self._source.output_type_hint()) + If reading from a text file that that requires a different encoding, you may + provide a custom :class:`~apache_beam.coders.coders.Coder` that encodes and + decodes with the appropriate codec. For example, see the implementation of + :class:`~apache_beam.coders.coders.StrUtf8Coder`. + This does not support ``UTF-16`` or ``UTF-32`` encodings. -class ReadFromTextWithFilename(ReadFromText): - r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text - files returning the name of the file and the content of the file. + This implementation is only tested with batch pipeline. In streaming, + reading may happen with delay due to the limitation in ReShuffle involved. + """ + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__( + self, + min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + compression_type=CompressionTypes.AUTO, + strip_trailing_newlines=True, + validate=False, + coder: coders.Coder = coders.StrUtf8Coder(), + skip_header_lines=0, + with_filename=False, + delimiter=None, + escapechar=None, + **kwargs, + ): + """Initialize the ``ReadAllFromText`` transform.""" + super().__init__(**kwargs) + self._source_from_file = partial( + _create_text_source, + min_bundle_size=min_bundle_size, + compression_type=compression_type, + strip_trailing_newlines=strip_trailing_newlines, + validate=validate, + coder=coder, + skip_header_lines=skip_header_lines, + delimiter=delimiter, + escapechar=escapechar, + ) + self._desired_bundle_size = desired_bundle_size + self._min_bundle_size = min_bundle_size + self._compression_type = compression_type + self._with_filename = with_filename + self._read_all_files = ReadAllFiles( + True, + self._compression_type, + self._desired_bundle_size, + self._min_bundle_size, + self._source_from_file, + self._with_filename, + ) + + def expand(self, pvalue): + return pvalue | 'ReadAllFiles' >> self._read_all_files - This class extend ReadFromText class just setting a different - _source_class attribute. - """ - _source_class = _TextSourceWithFilename +class ReadAllFromTextContinuously(ReadAllFromText): + """A ``PTransform`` for reading text files in given file patterns. + This PTransform acts as a Source and produces continuously a ``PCollection`` + of strings. + For more details, see ``ReadAllFromText`` for text parsing settings; + see ``apache_beam.io.fileio.MatchContinuously`` for watching settings. -class WriteToText(PTransform): - """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to - text files.""" - def __init__( - self, - file_path_prefix: str, - file_name_suffix='', - append_trailing_newlines=True, - num_shards=0, - shard_name_template: Optional[str] = None, - coder: coders.Coder = coders.ToBytesCoder(), - compression_type=CompressionTypes.AUTO, - header=None, - footer=None, - *, - max_records_per_shard=None, - max_bytes_per_shard=None, - skip_if_empty=False, - triggering_frequency=None): - r"""Initialize a :class:`WriteToText` transform. - - Args: - file_path_prefix (str): The file path to write to. The files written will - begin with this prefix, followed by a shard identifier (see - **num_shards**), and end in a common extension, if given by - **file_name_suffix**. In most cases, only this argument is specified and - **num_shards**, **shard_name_template**, and **file_name_suffix** use - default values. - file_name_suffix (str): Suffix for the files written. - append_trailing_newlines (bool): indicate whether this sink should write - an additional newline char after writing each element. - num_shards (int): The number of files (shards) used for output. - If not set, the service will decide on the optimal number of shards. - Constraining the number of shards is likely to reduce - the performance of a pipeline. Setting this value is not recommended - unless you require a specific number of output files. - shard_name_template (str): A template string containing placeholders for - the shard number and shard count. Currently only ``''``, - ``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and - ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service. - When constructing a filename for a particular shard number, the - upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded - shard number and shard count respectively. This argument can be ``''`` - in which case it behaves as if num_shards was set to 1 and only one file - will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'`` for - bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded - PCollections. - W is used for windowed shard naming and is replaced with - ``[window.start, window.end)`` - V is used for windowed shard naming and is replaced with - ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"), - window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")`` - coder (~apache_beam.coders.coders.Coder): Coder used to encode each line. - compression_type (str): Used to handle compressed output files. - Typical value is :class:`CompressionTypes.AUTO - `, in which case the - final file path's extension (as determined by **file_path_prefix**, - **file_name_suffix**, **num_shards** and **shard_name_template**) will - be used to detect the compression. - header (str): String to write at beginning of file as a header. - If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will - be added. - footer (str): String to write at the end of file as a footer. - If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will - be added. - max_records_per_shard: Maximum number of records to write to any - individual shard. - max_bytes_per_shard: Target maximum number of bytes to write to any - individual shard. This may be exceeded slightly, as a new shard is - created once this limit is hit, but the remainder of a given record, a - subsequent newline, and a footer may cause the actual shard size - to exceed this value. This also tracks the uncompressed, - not compressed, size of the shard. - skip_if_empty: Don't write any shards if the PCollection is empty. - In case of an empty PCollection, this will still delete existing - files having same file path and not create new ones. - triggering_frequency: (int) Every triggering_frequency duration, a window - will be triggered and all bundles in the window will be written. + ReadAllFromTextContinuously is experimental. No backwards-compatibility + guarantees. Due to the limitation on Reshuffle, current implementation does + not scale. """ + _ARGS_FOR_MATCH = ( + 'interval', + 'has_deduplication', + 'start_timestamp', + 'stop_timestamp', + 'match_updated_files', + 'apply_windowing', + ) + _ARGS_FOR_READ = ( + 'min_bundle_size', + 'desired_bundle_size', + 'compression_type', + 'strip_trailing_newlines', + 'validate', + 'coder', + 'skip_header_lines', + 'with_filename', + 'delimiter', + 'escapechar', + ) + + def __init__(self, file_pattern, **kwargs): + """Initialize the ``ReadAllFromTextContinuously`` transform.""" + kwargs_for_match = {k: v for (k, v) in kwargs.items() if k in self._ARGS_FOR_MATCH} + kwargs_for_read = {k: v for (k, v) in kwargs.items() if k in self._ARGS_FOR_READ} + kwargs_additinal = {k: v for (k, v) in kwargs.items() if k not in self._ARGS_FOR_MATCH and k not in self._ARGS_FOR_READ} + super().__init__(**kwargs_for_read, **kwargs_additinal) + self._file_pattern = file_pattern + self._kwargs_for_match = kwargs_for_match + + def expand(self, pbegin): + # Importing locally to prevent circular dependency issues. + from apache_beam.io.fileio import MatchContinuously + + # TODO(BEAM-14497) always reshuffle once gbk always trigger works. + return ( + pbegin + | MatchContinuously(self._file_pattern, **self._kwargs_for_match) + | 'ReadAllFiles' >> self._read_all_files._disable_reshuffle() + ) - self._sink = _TextSink( - file_path_prefix, - file_name_suffix, - append_trailing_newlines, - num_shards, - shard_name_template, - coder, - compression_type, - header, - footer, - max_records_per_shard=max_records_per_shard, - max_bytes_per_shard=max_bytes_per_shard, - skip_if_empty=skip_if_empty, - triggering_frequency=triggering_frequency) - - def expand(self, pcoll): - if (not pcoll.is_bounded and self._sink.shard_name_template - == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE): - self._sink.shard_name_template = ( - filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE) - self._sink.shard_name_format = self._sink._template_to_format( - self._sink.shard_name_template) - self._sink.shard_name_glob_format = self._sink._template_to_glob_format( - self._sink.shard_name_template) - return pcoll | Write(self._sink) +class ReadFromText(PTransform): + r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text + files. -try: - import pandas - - def append_pandas_args(src, exclude): - def append(dest): - state = None - skip = False - extra_lines = [] - for line in src.__doc__.split('\n'): - if line.strip() == 'Parameters': - indent = len(line) - len(line.lstrip()) - extra_lines = ['\n\nPandas Parameters'] - state = 'append' - continue - elif line.strip().startswith('Returns'): - break - - if state == 'append': - if skip: - if line and not line[indent:].startswith(' '): - skip = False - if any(line.strip().startswith(arg + ' : ') for arg in exclude): - skip = True - if not skip: - extra_lines.append(line[indent:]) - # Expand title underline due to Parameters -> Pandas Parameters. - extra_lines[1] += '-------' - dest.__doc__ += '\n'.join(extra_lines) - return dest - - return append - - @append_pandas_args( - pandas.read_csv, exclude=['filepath_or_buffer', 'iterator']) - def ReadFromCsv( - path: str, - *, - splittable: bool = True, - filename_column: Optional[str] = None, - **kwargs): - """A PTransform for reading comma-separated values (csv) files into a - PCollection. - - Args: - path (str): The file path to read from. The path can contain glob - characters such as ``*`` and ``?``. - splittable (bool): Whether the csv files are splittable at line - boundaries, i.e. each line of this file represents a complete record. - This should be set to False if single records span multiple lines (e.g. - a quoted field has a newline inside of it). Setting this to false may - disable liquid sharding. - filename_column (str): If not None, the name of the column to add - to each record, containing the filename of the source file. - **kwargs: Extra arguments passed to `pandas.read_csv` (see below). + Parses a text file as newline-delimited elements, by default assuming + ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` + or specified delimiter. """ - from apache_beam.dataframe.io import ReadViaPandas - return 'ReadFromCsv' >> ReadViaPandas( - 'csv', - path, - splittable=splittable, - filename_column=filename_column, - **kwargs) - - @append_pandas_args( - pandas.DataFrame.to_csv, exclude=['path_or_buf', 'index', 'index_label']) - def WriteToCsv( - path: str, - num_shards: Optional[int] = None, - file_naming: Optional['fileio.FileNaming'] = None, - **kwargs): - # pylint: disable=line-too-long - - """A PTransform for writing a schema'd PCollection as a (set of) - comma-separated values (csv) files. - - Args: - path (str): The file path to write to. The files written will - begin with this prefix, followed by a shard identifier (see - `num_shards`) according to the `file_naming` parameter. - num_shards (optional int): The number of shards to use in the distributed - write. Defaults to None, letting the system choose an optimal value. - file_naming (optional callable): A file-naming strategy, determining the - actual shard names given their shard number, etc. - See the section on `file naming - `_ - Defaults to `fileio.default_file_naming`, which names files as - `path-XXXXX-of-NNNNN`. - **kwargs: Extra arguments passed to `pandas.Dataframe.to_csv` (see below). - """ - from apache_beam.dataframe.io import WriteViaPandas - if num_shards is not None: - kwargs['num_shards'] = num_shards - if file_naming is not None: - kwargs['file_naming'] = file_naming - return 'WriteToCsv' >> WriteViaPandas('csv', path, index=False, **kwargs) - - @append_pandas_args(pandas.read_json, exclude=['path_or_buf']) - def ReadFromJson( - path: str, - *, - orient: str = 'records', - lines: bool = True, - dtype: Union[bool, Dict[str, Any]] = False, - **kwargs): - """A PTransform for reading json values from files into a PCollection. - - Args: - path (str): The file path to read from. The path can contain glob - characters such as ``*`` and ``?``. - orient (str): Format of the json elements in the file. - Default to 'records', meaning the file is expected to contain a list - of json objects like `{field1: value1, field2: value2, ...}`. - lines (bool): Whether each line should be considered a separate record, - as opposed to the entire file being a valid JSON object or list. - Defaults to True (unlike Pandas). - dtype (bool): If True, infer dtypes; if a dict of column to dtype, - then use those; if False, then don’t infer dtypes at all. - Defaults to False (unlike Pandas). - **kwargs: Extra arguments passed to `pandas.read_json` (see below). - """ - from apache_beam.dataframe.io import ReadViaPandas - return 'ReadFromJson' >> ReadViaPandas( - 'json', path, orient=orient, lines=lines, dtype=dtype, **kwargs) - - @append_pandas_args( - pandas.DataFrame.to_json, exclude=['path_or_buf', 'index']) - def WriteToJson( - path: str, - *, - num_shards: Optional[int] = None, - file_naming: Optional['fileio.FileNaming'] = None, - orient: str = 'records', - lines: Optional[bool] = None, - **kwargs): - # pylint: disable=line-too-long - - """A PTransform for writing a PCollection as json values to files. - - Args: - path (str): The file path to write to. The files written will - begin with this prefix, followed by a shard identifier (see - `num_shards`) according to the `file_naming` parameter. - num_shards (optional int): The number of shards to use in the distributed - write. Defaults to None, letting the system choose an optimal value. - file_naming (optional callable): A file-naming strategy, determining the - actual shard names given their shard number, etc. - See the section on `file naming - `_ - Defaults to `fileio.default_file_naming`, which names files as - `path-XXXXX-of-NNNNN`. - orient (str): Format of the json elements in the file. - Default to 'records', meaning the file will to contain a list - of json objects like `{field1: value1, field2: value2, ...}`. - lines (bool): Whether each line should be considered a separate record, - as opposed to the entire file being a valid JSON object or list. - Defaults to True if orient is 'records' (unlike Pandas). - **kwargs: Extra arguments passed to `pandas.Dataframe.to_json` - (see below). + _source_class = _TextSource + + def __init__( + self, + file_pattern=None, + min_bundle_size=0, + compression_type=CompressionTypes.AUTO, + strip_trailing_newlines=True, + coder: coders.Coder = coders.StrUtf8Coder(), + validate=True, + skip_header_lines=0, + delimiter=None, + escapechar=None, + **kwargs, + ): + """Initialize the :class:`ReadFromText` transform.""" + super().__init__(**kwargs) + if file_pattern: + try: + if not os.path.dirname(file_pattern): + file_pattern = os.path.join('.', file_pattern) + except TypeError: + pass + + self._source = self._source_class( + file_pattern, + min_bundle_size, + compression_type, + strip_trailing_newlines, + coder, + validate=validate, + skip_header_lines=skip_header_lines, + delimiter=delimiter, + escapechar=escapechar, + ) + + def expand(self, pvalue): + return pvalue.pipeline | Read(self._source).with_output_types(self._source.output_type_hint()) + + +class ReadFromTextWithFilename(ReadFromText): + r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text + files returning the name of the file and the content of the file. """ - from apache_beam.dataframe.io import WriteViaPandas - if num_shards is not None: - kwargs['num_shards'] = num_shards - if file_naming is not None: - kwargs['file_naming'] = file_naming - if lines is None: - lines = orient == 'records' - return 'WriteToJson' >> WriteViaPandas( - 'json', path, orient=orient, lines=lines, **kwargs) + _source_class = _TextSourceWithFilename + + +class WriteToText(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to + text files.""" + + def __init__( + self, + file_path_prefix: str, + file_name_suffix='', + append_trailing_newlines=True, + num_shards=0, + shard_name_template: Optional[str] = None, + coder: coders.Coder = coders.ToBytesCoder(), + compression_type=CompressionTypes.AUTO, + header=None, + footer=None, + *, + max_records_per_shard=None, + max_bytes_per_shard=None, + skip_if_empty=False, + triggering_frequency=None, + ): + r"""Initialize a :class:`WriteToText` transform.""" + self._sink = _TextSink( + file_path_prefix, + file_name_suffix, + append_trailing_newlines, + num_shards, + shard_name_template, + coder, + compression_type, + header, + footer, + max_records_per_shard=max_records_per_shard, + max_bytes_per_shard=max_bytes_per_shard, + skip_if_empty=skip_if_empty, + triggering_frequency=triggering_frequency, + ) + + def expand(self, pcoll): + if (not pcoll.is_bounded and self._sink.shard_name_template == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE): + self._sink.shard_name_template = filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE + self._sink.shard_name_format = self._sink._template_to_format(self._sink.shard_name_template) + self._sink.shard_name_glob_format = self._sink._template_to_glob_format(self._sink.shard_name_template) + return pcoll | Write(self._sink) + + +try: + import pandas + + def append_pandas_args(src, exclude): + def append(dest): + state = None + skip = False + extra_lines = [] + for line in src.__doc__.split('\n'): + if line.strip() == 'Parameters': + indent = len(line) - len(line.lstrip()) + extra_lines = ['\n\nPandas Parameters'] + state = 'append' + continue + elif line.strip().startswith('Returns'): + break + + if state == 'append': + if skip: + if line and not line[indent:].startswith(' '): + skip = False + if any(line.strip().startswith(arg + ' : ') for arg in exclude): + skip = True + if not skip: + extra_lines.append(line[indent:]) + # Expand title underline due to Parameters -> Pandas Parameters. + extra_lines[1] += '-------' + dest.__doc__ += '\n'.join(extra_lines) + return dest + + return append + + @append_pandas_args(pandas.read_csv, exclude=['filepath_or_buffer', 'iterator']) + def ReadFromCsv( + path: str, + *, + splittable: bool = True, + filename_column: Optional[str] = None, + **kwargs, + ): + """A PTransform for reading comma-separated values (csv) files into a PCollection.""" + from apache_beam.dataframe.io import ReadViaPandas + + return 'ReadFromCsv' >> ReadViaPandas('csv', path, splittable=splittable, filename_column=filename_column, **kwargs) + + @append_pandas_args(pandas.DataFrame.to_csv, exclude=['path_or_buf', 'index', 'index_label']) + def WriteToCsv(path: str, num_shards: Optional[int] = None, file_naming: Optional['fileio.FileNaming'] = None, **kwargs): + # pylint: disable=line-too-long + """A PTransform for writing a schema'd PCollection as a (set of) comma-separated values (csv) files.""" + from apache_beam.dataframe.io import WriteViaPandas + if num_shards is not None: + kwargs['num_shards'] = num_shards + if file_naming is not None: + kwargs['file_naming'] = file_naming + return 'WriteToCsv' >> WriteViaPandas('csv', path, index=False, **kwargs) + + @append_pandas_args(pandas.read_json, exclude=['path_or_buf']) + def ReadFromJson(path: str, *, orient: str = 'records', lines: bool = True, dtype: Union[bool, Dict[str, Any]] = False, **kwargs): + """A PTransform for reading json values from files into a PCollection.""" + from apache_beam.dataframe.io import ReadViaPandas + return 'ReadFromJson' >> ReadViaPandas('json', path, orient=orient, lines=lines, dtype=dtype, **kwargs) + + @append_pandas_args(pandas.DataFrame.to_json, exclude=['path_or_buf', 'index']) + def WriteToJson( + path: str, + *, + num_shards: Optional[int] = None, + file_naming: Optional['fileio.FileNaming'] = None, + orient: str = 'records', + lines: Optional[bool] = None, + **kwargs, + ): + # pylint: disable=line-too-long + """A PTransform for writing a PCollection as json values to files.""" + from apache_beam.dataframe.io import WriteViaPandas + if num_shards is not None: + kwargs['num_shards'] = num_shards + if file_naming is not None: + kwargs['file_naming'] = file_naming + if lines is None: + lines = orient == 'records' + return 'WriteToJson' >> WriteViaPandas('json', path, orient=orient, lines=lines, **kwargs) except ImportError: - def no_pandas(*args, **kwargs): - raise ImportError('Please install apache_beam[dataframe]') + def no_pandas(*args, **kwargs): + raise ImportError('Please install apache_beam[dataframe]') - for transform in ('ReadFromCsv', 'WriteToCsv', 'ReadFromJson', 'WriteToJson'): - globals()[transform] = no_pandas + for transform in ('ReadFromCsv', 'WriteToCsv', 'ReadFromJson', 'WriteToJson'): + globals()[transform] = no_pandas