Skip to content
Draft
77 changes: 42 additions & 35 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,13 @@
import traceback
from datetime import datetime
from io import BytesIO, IOBase
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
from typing import IO, Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast

import backoff
import dpath
import nltk
import requests
from unstructured.file_utils.filetype import (
EXT_TO_FILETYPE,
FILETYPE_TO_MIMETYPE,
STR_TO_FILETYPE,
FileType,
detect_filetype,
)
from unstructured.file_utils.filetype import FileType, detect_filetype

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
Expand Down Expand Up @@ -85,14 +79,23 @@ def _import_unstructured() -> None:
global unstructured_partition_docx
global unstructured_partition_pptx
from unstructured.partition.docx import partition_docx
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.pptx import partition_pptx

# separate global variables to properly propagate typing
unstructured_partition_pdf = partition_pdf
unstructured_partition_docx = partition_docx
unstructured_partition_pptx = partition_pptx

try:
from unstructured.partition.pdf import partition_pdf

unstructured_partition_pdf = partition_pdf
except (ImportError, ModuleNotFoundError):
# partition_pdf requires the heavy unstructured_inference package;
# PDF support is disabled when it is not installed.
logger = logging.getLogger(__name__)
logger.info(
"Could not import unstructured.partition.pdf (requires unstructured_inference). PDF parsing will be unavailable."
)


def user_error(e: Exception) -> bool:
"""
Expand Down Expand Up @@ -207,13 +210,6 @@ def _read_file(
logger: logging.Logger,
) -> str:
_import_unstructured()
if (
(not unstructured_partition_pdf)
or (not unstructured_partition_docx)
or (not unstructured_partition_pptx)
):
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

filetype: FileType | None = self._get_filetype(file_handle, remote_file)

Expand Down Expand Up @@ -335,7 +331,7 @@ def _read_file_remotely(

data = self._params_to_dict(format.parameters, strategy)

file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}
file_data = {"files": ("filename", file_handle, filetype.mime_type)}

response = requests.post(
f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data
Expand All @@ -356,13 +352,6 @@ def _read_file_locally(
self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile
) -> str:
_import_unstructured()
if (
(not unstructured_partition_pdf)
or (not unstructured_partition_docx)
or (not unstructured_partition_pptx)
):
# check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
raise Exception("unstructured library is not available")

file: Any = file_handle

Expand All @@ -373,15 +362,29 @@ def _read_file_locally(

try:
if filetype == FileType.PDF:
# for PDF, read the file into a BytesIO object because some code paths in pdf parsing are doing an instance check on the file object and don't work with file-like objects
if not unstructured_partition_pdf:
raise self._create_parse_error(
remote_file,
"PDF parsing requires the 'unstructured_inference' package. Install it with: pip install unstructured-inference",
)
file_handle.seek(0)
with BytesIO(file_handle.read()) as file:
file_handle.seek(0)
elements = unstructured_partition_pdf(file=file, strategy=strategy)
elif filetype == FileType.DOCX:
if not unstructured_partition_docx:
raise self._create_parse_error(
remote_file, "DOCX partition function is not available"
)
elements = unstructured_partition_docx(file=file)
elif filetype == FileType.PPTX:
if not unstructured_partition_pptx:
raise self._create_parse_error(
remote_file, "PPTX partition function is not available"
)
elements = unstructured_partition_pptx(file=file)
except RecordParseError:
raise
except Exception as e:
raise self._create_parse_error(remote_file, str(e))

Expand All @@ -405,8 +408,11 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
2. Use the file name if available
3. Use the file content
"""
if remote_file.mime_type and remote_file.mime_type in STR_TO_FILETYPE:
return STR_TO_FILETYPE[remote_file.mime_type]
if remote_file.mime_type:
try:
return FileType.from_mime_type(remote_file.mime_type)
except ValueError:
pass

# set name to none, otherwise unstructured will try to get the modified date from the local file system
if hasattr(file, "name"):
Expand All @@ -418,7 +424,7 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
file_type: FileType | None = None
try:
file_type = detect_filetype(
filename=remote_file.uri,
file_path=remote_file.uri,
)
except Exception:
# Path doesn't exist locally. Try something else...
Expand All @@ -427,16 +433,17 @@ def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileT
if file_type and file_type != FileType.UNK:
return file_type

type_based_on_content = detect_filetype(file=file)
extension = "." + remote_file.uri.split(".")[-1].lower()
ext_type = FileType.from_extension(extension)
if ext_type is not None:
return ext_type

type_based_on_content = detect_filetype(file=cast(IO[bytes], file))
file.seek(0) # detect_filetype is reading to read the file content, so we need to reset

if type_based_on_content and type_based_on_content != FileType.UNK:
return type_based_on_content

extension = "." + remote_file.uri.split(".")[-1].lower()
if extension in EXT_TO_FILETYPE:
return EXT_TO_FILETYPE[extension]

return None

def _supported_file_types(self) -> List[Any]:
Expand Down
Loading
Loading