diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 7443dccd6..826d89112 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -5,7 +5,7 @@ import logging import time from abc import ABC, abstractmethod -from datetime import datetime +from datetime import timezone from enum import Enum from io import IOBase from os import makedirs, path @@ -24,6 +24,7 @@ from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile +from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse class FileReadMode(Enum): @@ -105,7 +106,7 @@ def filter_files_by_globs_and_start_date( Utility method for filtering files based on globs. """ start_date = ( - datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) + ab_datetime_parse(self.config.start_date) if self.config and self.config.start_date else None ) @@ -113,7 +114,12 @@ def filter_files_by_globs_and_start_date( for file in files: if self.file_matches_globs(file, globs): - if file.uri not in seen and (not start_date or file.last_modified >= start_date): + last_modified = ( + file.last_modified + if file.last_modified.tzinfo is not None + else file.last_modified.replace(tzinfo=timezone.utc) + ) + if file.uri not in seen and (not start_date or last_modified >= start_date): seen.add(file.uri) yield file diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 13fa1025c..7058545bd 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -408,6 +408,41 @@ def documentation_url(cls) -> AnyUrl: set(), id="all_csvs_modified_exactly_on_start_date", ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-01T00:00:00Z", "streams": []}, + {"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"}, + set(), + id="start_date_without_microseconds", + ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-10T00:00:00Z", "streams": []}, + set(), + set(), + id="start_date_without_microseconds_modified_before", + ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-01T00:00:00+00:00", "streams": []}, + {"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"}, + set(), + id="start_date_with_utc_offset", + ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-05T08:54:07+05:00", "streams": []}, + {"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"}, + set(), + id="start_date_with_non_zero_offset", + ), + pytest.param( + ["**/*.csv"], + {"start_date": "2023-06-01", "streams": []}, + {"a.csv", "a/b.csv", "a/c.csv", "a/b/c.csv", "a/c/c.csv", "a/b/c/d.csv"}, + set(), + id="start_date_date_only", + ), ], ) def test_globs_and_prefixes_from_globs(