diff --git a/python/samples/getting_started/image_spec_poc/database_logic_for_storing_images.py b/python/samples/getting_started/image_spec_poc/database_logic_for_storing_images.py new file mode 100644 index 0000000000..402b389331 --- /dev/null +++ b/python/samples/getting_started/image_spec_poc/database_logic_for_storing_images.py @@ -0,0 +1,39 @@ +import base64 +from pathlib import Path + +import dotenv +from datetime import datetime, timezone +from db_setup import SQLiteImageStore + +dotenv.load_dotenv() + +store: SQLiteImageStore | None = None + +async def create_and_store_base64_encoded_images() -> tuple[str, str]: + global store + store = SQLiteImageStore() + + """Load and encode the images as base64.""" + folder = Path("./images") + + for file_path in sorted(folder.iterdir()): + if not file_path.is_file(): + continue + with open(file_path, "rb") as f: + image_data = f.read() + image_base64 = base64.b64encode(image_data).decode('utf-8') + image_uri = f"data:image/jpeg;base64,{image_base64}" + + text_id = f"{file_path.name}-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" + file_name = file_path.name.split(".")[0] + record = await store.add_image_from_base64( + text_id=text_id, + base64_data=image_base64, + image_name=file_name, + description=f"{file_name} image", + metadata={"source": "sample", "scenario": "azure_ai_chat_client"}, + tags=[file_name], + mime_type="image/jpeg", + image_uri=image_uri, + ) + print(f"Stored image in SQLite with id={record.id} text_id={record.text_id}") diff --git a/python/samples/getting_started/image_spec_poc/db_setup.py b/python/samples/getting_started/image_spec_poc/db_setup.py new file mode 100644 index 0000000000..c83aa37618 --- /dev/null +++ b/python/samples/getting_started/image_spec_poc/db_setup.py @@ -0,0 +1,522 @@ +"""SQLite-backed image storage utilities aligned with agent-framework conventions.""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import sqlite3 +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, TypeVar +from collections.abc import Callable, Iterable, Sequence + +from agent_framework._serialization import SerializationMixin +from agent_framework.observability import get_tracer +from opentelemetry.trace import SpanKind + +__all__ = ["ImageRecord", "SQLiteImageStore", "SQLiteImageStoreState"] + +DEFAULT_DB_PATH = Path(__file__).with_name("image_database.db") +DEFAULT_EXPORT_DIR = Path(__file__).with_name("extracted_images") + +T = TypeVar("T") + + +@dataclass(slots=True) +class ImageRecord: + """Structural representation of a row in the images table.""" + + id: int + text_id: str + metadata: dict[str, Any] | None + image_path: str | None + image_name: str | None + mime_type: str + description: str | None + created_at: str + updated_at: str + image_data: bytes | None = None + + def tags(self) -> list[str]: + payload: Iterable[str] | str | None = None + if isinstance(self.metadata, dict): + payload = self.metadata.get("tags") + if payload is None: + return [] + if isinstance(payload, str): + items = [item.strip() for item in payload.split(",")] + else: + items = [str(item).strip() for item in payload] + unique: list[str] = [] + seen: set[str] = set() + for item in items: + if item and item not in seen: + seen.add(item) + unique.append(item) + return unique + + def to_dict(self, *, include_data: bool = False) -> dict[str, Any]: + result: dict[str, Any] = { + "id": self.id, + "text_id": self.text_id, + "metadata": self.metadata, + "image_path": self.image_path, + "image_name": self.image_name, + "mime_type": self.mime_type, + "description": self.description, + "created_at": self.created_at, + "updated_at": self.updated_at, + "tags": self.tags(), + } + if include_data and self.image_data is not None: + encoded = base64.b64encode(self.image_data).decode("utf-8") + result["base64_data"] = encoded + result["data_url"] = f"data:{self.mime_type};base64,{encoded}" + return result + + +class SQLiteImageStoreState(SerializationMixin): + """Serializable configuration for SQLiteImageStore.""" + + def __init__(self, *, db_path: str | None = None, export_dir: str | None = None) -> None: + self.db_path = db_path or str(DEFAULT_DB_PATH) + self.export_dir = export_dir or str(DEFAULT_EXPORT_DIR) + + +class SQLiteImageStore: + """Async-first SQLite store used by the observability samples.""" + + def __init__(self, *, db_path: str | Path = DEFAULT_DB_PATH, export_dir: str | Path = DEFAULT_EXPORT_DIR) -> None: + self._db_path = Path(db_path) + self._export_dir = Path(export_dir) + self._export_dir.mkdir(parents=True, exist_ok=True) + self._tracer = get_tracer() + self._ensure_schema() + + async def add_image_from_bytes( + self, + *, + text_id: str, + image_bytes: bytes, + image_name: str | None = None, + description: str | None = None, + metadata: dict[str, Any] | str | None = None, + tags: Sequence[str] | None = None, + mime_type: str = "image/png", + source_path: str | None = None, + ) -> ImageRecord: + metadata_json = self._prepare_metadata(metadata, tags) + now = datetime.utcnow().isoformat() + + def _operation(connection: sqlite3.Connection) -> int: + cursor = connection.execute( + """ + INSERT INTO images ( + text_id, + metadata, + image_path, + image_data, + image_name, + mime_type, + description, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + text_id, + metadata_json, + source_path, + sqlite3.Binary(image_bytes), + image_name, + mime_type, + description, + now, + now, + ), + ) + return cursor.lastrowid + + with self._tracer.start_as_current_span( + "sqlite_image_store.add_image_from_bytes", + kind=SpanKind.INTERNAL, + ) as span: + span.set_attribute("image.text_id", text_id) + span.set_attribute("image.mime_type", mime_type) + span.set_attribute("image.has_metadata", metadata_json is not None) + record_id = await self._run(_operation) + record = await self.get_image_by_id(record_id, include_data=False) + if record is None: + raise RuntimeError("Image insert succeeded but record fetch failed") + return record + + async def add_image_from_file( + self, + *, + text_id: str, + file_path: str | Path, + description: str | None = None, + metadata: dict[str, Any] | str | None = None, + tags: Sequence[str] | None = None, + mime_type: str | None = None, + persist_copy: bool = False, + ) -> ImageRecord: + source = Path(file_path) + image_bytes = source.read_bytes() + name = source.name + stored_path: str | None + if persist_copy: + target_dir = self._export_dir / "imports" + target_dir.mkdir(parents=True, exist_ok=True) + target_path = target_dir / name + target_path.write_bytes(image_bytes) + stored_path = str(target_path) + else: + stored_path = str(source) + return await self.add_image_from_bytes( + text_id=text_id, + image_bytes=image_bytes, + image_name=name, + description=description, + metadata=metadata, + tags=tags, + mime_type=mime_type or self._guess_mime_type(source.suffix), + source_path=stored_path, + ) + + async def add_image_from_base64( + self, + *, + text_id: str, + base64_data: str, + image_name: str | None = None, + description: str | None = None, + metadata: dict[str, Any] | str | None = None, + tags: Sequence[str] | None = None, + mime_type: str = "image/png", + image_uri: str | None = None, + ) -> ImageRecord: + payload = base64.b64decode(base64_data) + + # If image_uri is supplied, fold it into metadata so it is persisted. + merged_metadata: dict[str, Any] | str | None + if image_uri is None: + merged_metadata = metadata + elif metadata is None: + merged_metadata = {"image_uri": image_uri} + elif isinstance(metadata, dict): + merged = dict(metadata) + merged.setdefault("image_uri", image_uri) + merged_metadata = merged + else: + # metadata is a non-dict (e.g., str); wrap to preserve both + merged_metadata = {"metadata": metadata, "image_uri": image_uri} + + return await self.add_image_from_bytes( + text_id=text_id, + image_bytes=payload, + image_name=image_name, + description=description, + metadata=merged_metadata, + tags=tags, + mime_type=mime_type, + ) + + async def get_image_by_id(self, record_id: int, *, include_data: bool = False) -> ImageRecord | None: + def _operation(connection: sqlite3.Connection) -> ImageRecord | None: + cursor = connection.execute("SELECT * FROM images WHERE id = ?", (record_id,)) + row = cursor.fetchone() + if row is None: + return None + return self._row_to_record(row, include_data=include_data) + + with self._tracer.start_as_current_span( + "sqlite_image_store.get_image_by_id", + kind=SpanKind.INTERNAL, + ) as span: + span.set_attribute("image.id", record_id) + record = await self._run(_operation) + span.set_attribute("result.found", record is not None) + return record + + async def get_image_by_text_id(self, text_id: str, *, include_data: bool = False) -> ImageRecord | None: + def _operation(connection: sqlite3.Connection) -> ImageRecord | None: + cursor = connection.execute( + """ + SELECT * FROM images + WHERE text_id = ? + ORDER BY created_at DESC, id DESC + LIMIT 1 + """, + (text_id,), + ) + row = cursor.fetchone() + if row is None: + return None + return self._row_to_record(row, include_data=include_data) + + with self._tracer.start_as_current_span( + "sqlite_image_store.get_image_by_text_id", + kind=SpanKind.INTERNAL, + ) as span: + span.set_attribute("image.text_id", text_id) + record = await self._run(_operation) + span.set_attribute("result.found", record is not None) + return record + + async def list_images(self, *, include_data: bool = False) -> list[ImageRecord]: + def _operation(connection: sqlite3.Connection) -> list[ImageRecord]: + cursor = connection.execute( + "SELECT * FROM images ORDER BY created_at DESC, id DESC", + ) + rows = cursor.fetchall() + return [self._row_to_record(row, include_data=include_data) for row in rows] + + with self._tracer.start_as_current_span( + "sqlite_image_store.list_images", + kind=SpanKind.INTERNAL, + ): + return await self._run(_operation) + + async def search_images(self, query: str, *, include_data: bool = False) -> list[ImageRecord]: + pattern = f"%{query}%" + + def _operation(connection: sqlite3.Connection) -> list[ImageRecord]: + cursor = connection.execute( + """ + SELECT * FROM images + WHERE metadata LIKE ? OR description LIKE ? + ORDER BY created_at DESC, id DESC + """, + (pattern, pattern), + ) + rows = cursor.fetchall() + return [self._row_to_record(row, include_data=include_data) for row in rows] + + with self._tracer.start_as_current_span( + "sqlite_image_store.search_images", + kind=SpanKind.INTERNAL, + ) as span: + span.set_attribute("search.query", query) + return await self._run(_operation) + + async def save_image_to_file( + self, + *, + record_id: int | None = None, + text_id: str | None = None, + output_dir: str | Path | None = None, + ) -> Path | None: + if record_id is None and text_id is None: + raise ValueError("Either record_id or text_id must be provided") + record = ( + await self.get_image_by_id(record_id, include_data=True) if record_id is not None else await self.get_image_by_text_id(text_id or "", include_data=True) + ) + if record is None or record.image_data is None: + return None + directory = Path(output_dir) if output_dir else self._export_dir + directory.mkdir(parents=True, exist_ok=True) + extension = self._guess_extension(record.mime_type) + filename = record.image_name or f"{record.text_id}_{record.id}.{extension}" + target = directory / filename + target.write_bytes(record.image_data) + await self._update_image_path(record.id, str(target)) + return target + + async def add_tags(self, *, text_id: str, tags: Sequence[str], replace_existing: bool = False) -> ImageRecord | None: + record = await self.get_image_by_text_id(text_id, include_data=False) + if record is None: + return None + metadata_dict = dict(record.metadata) if isinstance(record.metadata, dict) else {} + existing = [] if replace_existing else record.tags() + merged = sorted({tag.strip().lower() for tag in (*existing, *tags) if tag.strip()}) + if merged: + metadata_dict["tags"] = merged + elif "tags" in metadata_dict: + metadata_dict.pop("tags") + await self._update_metadata(record.id, metadata_dict) + return await self.get_image_by_id(record.id, include_data=False) + + async def get_all_available_tags(self) -> dict[str, Any]: + records = await self.list_images(include_data=False) + counts: dict[str, int] = {} + for record in records: + for tag in record.tags(): + counts[tag] = counts.get(tag, 0) + 1 + ordered = sorted(counts.items(), key=lambda item: item[1], reverse=True) + return { + "unique_tags": len(counts), + "tags": [{"tag": tag, "count": count} for tag, count in ordered], + } + + async def get_all_images_as_base64(self) -> list[dict[str, Any]]: + records = await self.list_images(include_data=True) + return [record.to_dict(include_data=True) for record in records if record.image_data is not None] + + async def export_images_as_base64_json(self, output_file: str | Path) -> Path: + payload = { + "exported_at": datetime.utcnow().isoformat(), + "images": await self.get_all_images_as_base64(), + } + path = Path(output_file) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return path + + async def clear(self) -> None: + def _operation(connection: sqlite3.Connection) -> None: + connection.execute("DELETE FROM images") + + with self._tracer.start_as_current_span( + "sqlite_image_store.clear", + kind=SpanKind.INTERNAL, + ): + await self._run(_operation) + + async def serialize(self, **kwargs: Any) -> dict[str, Any]: + state = SQLiteImageStoreState(db_path=str(self._db_path), export_dir=str(self._export_dir)) + return state.to_dict(**kwargs) + + @classmethod + async def deserialize(cls, serialized_store_state: Any, **kwargs: Any) -> SQLiteImageStore: + state = SQLiteImageStoreState.from_dict(serialized_store_state, **kwargs) + return cls(db_path=state.db_path, export_dir=state.export_dir) + + async def update_from_state(self, serialized_store_state: Any, **kwargs: Any) -> None: + if not serialized_store_state: + return + state = SQLiteImageStoreState.from_dict(serialized_store_state, **kwargs) + self._db_path = Path(state.db_path) + self._export_dir = Path(state.export_dir) + self._export_dir.mkdir(parents=True, exist_ok=True) + self._ensure_schema() + + async def _update_metadata(self, record_id: int, metadata: dict[str, Any] | None) -> None: + timestamp = datetime.utcnow().isoformat() + metadata_json = json.dumps(metadata, ensure_ascii=False) if metadata else None + + def _operation(connection: sqlite3.Connection) -> None: + connection.execute( + "UPDATE images SET metadata = ?, updated_at = ? WHERE id = ?", + (metadata_json, timestamp, record_id), + ) + + await self._run(_operation) + + async def _update_image_path(self, record_id: int, image_path: str) -> None: + timestamp = datetime.utcnow().isoformat() + + def _operation(connection: sqlite3.Connection) -> None: + connection.execute( + "UPDATE images SET image_path = ?, updated_at = ? WHERE id = ?", + (image_path, timestamp, record_id), + ) + + await self._run(_operation) + + async def _run(self, operation: Callable[[sqlite3.Connection], T]) -> T: + def _wrapper() -> T: + with sqlite3.connect(self._db_path) as connection: + connection.row_factory = sqlite3.Row + result = operation(connection) + connection.commit() + return result + + return await asyncio.to_thread(_wrapper) + + def _ensure_schema(self) -> None: + self._db_path.parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(self._db_path) as connection: + connection.execute( + """ + CREATE TABLE IF NOT EXISTS images ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + text_id TEXT NOT NULL, + metadata TEXT, + image_path TEXT, + image_data BLOB, + image_name TEXT, + mime_type TEXT NOT NULL DEFAULT 'image/png', + description TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """, + ) + connection.execute("CREATE INDEX IF NOT EXISTS idx_images_text_id ON images(text_id)") + connection.execute("CREATE INDEX IF NOT EXISTS idx_images_metadata ON images(metadata)") + connection.commit() + + def _row_to_record(self, row: sqlite3.Row, *, include_data: bool) -> ImageRecord: + metadata_payload = row["metadata"] + metadata: dict[str, Any] | None + if metadata_payload: + try: + loaded = json.loads(metadata_payload) + metadata = loaded if isinstance(loaded, dict) else {"value": loaded} + except json.JSONDecodeError: + metadata = {"value": metadata_payload} + else: + metadata = None + image_data = row["image_data"] if include_data else None + return ImageRecord( + id=row["id"], + text_id=row["text_id"], + metadata=metadata, + image_path=row["image_path"], + image_name=row["image_name"], + mime_type=row["mime_type"] or "image/png", + description=row["description"], + created_at=row["created_at"], + updated_at=row["updated_at"], + image_data=image_data, + ) + + def _prepare_metadata(self, metadata: dict[str, Any] | str | None, tags: Sequence[str] | None) -> str | None: + payload = self._metadata_to_dict(metadata) + clean_tags = [tag.strip().lower() for tag in tags] if tags else [] + if clean_tags: + payload["tags"] = sorted({tag for tag in clean_tags if tag}) + if not payload: + return None + payload.setdefault("metadata_updated_at", datetime.utcnow().isoformat()) + return json.dumps(payload, ensure_ascii=False) + + @staticmethod + def _metadata_to_dict(metadata: dict[str, Any] | str | None) -> dict[str, Any]: + if metadata is None: + return {} + if isinstance(metadata, dict): + return dict(metadata) + text = metadata.strip() + if not text: + return {} + try: + loaded = json.loads(text) + if isinstance(loaded, dict): + return dict(loaded) + return {"value": loaded} + except json.JSONDecodeError: + return {"value": text} + + @staticmethod + def _guess_extension(mime_type: str | None) -> str: + if not mime_type or "/" not in mime_type: + return "png" + tail = mime_type.split("/")[-1].strip().lower() + return tail or "png" + + @staticmethod + def _guess_mime_type(suffix: str) -> str: + mapping = { + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", + ".gif": "image/gif", + ".bmp": "image/bmp", + ".tiff": "image/tiff", + ".webp": "image/webp", + } + return mapping.get(suffix.lower(), "image/png") diff --git a/python/samples/getting_started/image_spec_poc/db_tool.py b/python/samples/getting_started/image_spec_poc/db_tool.py new file mode 100644 index 0000000000..80bf29f2c5 --- /dev/null +++ b/python/samples/getting_started/image_spec_poc/db_tool.py @@ -0,0 +1,329 @@ +"""Async helpers for working with the local SQLite image store.""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +from typing import Any +from collections.abc import Awaitable, Iterable, Sequence + +from agent_framework.observability import get_tracer +from opentelemetry.trace import SpanKind + +from db_setup import ImageRecord, SQLiteImageStore + +__all__ = ["DatabaseImageTool"] + + +class DatabaseImageTool: + """High-level façade that wraps :class:`SQLiteImageStore` operations. + + The observability samples previously depended on Semantic Kernel decorators and a + bespoke ``ImageDatabase`` class. This version aligns the tool with the new + :class:`SQLiteImageStore` implementation so callers can reuse the modern async API + while still having convenient synchronous entry points for quick scripts. + """ + + def __init__( + self, + *, + store: SQLiteImageStore | None = None, + db_path: str | Path | None = None, + export_dir: str | Path | None = None, + ) -> None: + store_kwargs: dict[str, Any] = {} + if db_path is not None: + store_kwargs["db_path"] = db_path + if export_dir is not None: + store_kwargs["export_dir"] = export_dir + self._store = store or SQLiteImageStore(**store_kwargs) + self._tracer = get_tracer() + + async def get_image_by_text_id_async( + self, + text_id: str, + *, + include_data: bool = False, + export_dir: str | Path | None = None, + ) -> dict[str, Any]: + with self._tracer.start_as_current_span( + "database_image_tool.get_image_by_text_id", kind=SpanKind.INTERNAL + ) as span: + span.set_attribute("image.text_id", text_id) + record = await self._store.get_image_by_text_id(text_id, include_data=include_data) + if record is None: + span.set_attribute("result.found", False) + return { + "success": False, + "error": f"No image found with text_id: {text_id}", + "text_id": text_id, + } + + payload = self._record_to_payload(record, include_data=include_data) + saved_path: Path | None = None + if export_dir is not None: + saved_path = await self._store.save_image_to_file(text_id=text_id, output_dir=export_dir) + if saved_path is not None: + payload["image_path"] = str(saved_path) + + span.set_attribute("result.found", True) + span.set_attribute("result.tags_count", len(payload.get("tags", []))) + if saved_path is not None: + span.set_attribute("result.export_path", str(saved_path)) + return {"success": True, "image": payload} + + def get_image_by_text_id( + self, + text_id: str, + *, + include_data: bool = False, + export_dir: str | Path | None = None, + as_json: bool = True, + ) -> str | dict[str, Any]: + result = self._run( + self.get_image_by_text_id_async( + text_id, + include_data=include_data, + export_dir=export_dir, + ) + ) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def get_image_by_metadata_tags_async( + self, + tags: Sequence[str] | str, + match_mode: str = "any", + ) -> dict[str, Any]: + normalized_tags = self._normalize_tags(tags) + mode = match_mode.lower().strip() or "any" + with self._tracer.start_as_current_span( + "database_image_tool.get_image_by_metadata_tags", kind=SpanKind.INTERNAL + ) as span: + span.set_attribute("search.tags", normalized_tags) + span.set_attribute("search.match_mode", mode) + + if not normalized_tags: + span.set_attribute("result.count", 0) + return { + "success": False, + "error": "No valid tags provided", + "search_tags": [], + "match_mode": mode, + } + + records = await self._store.list_images(include_data=False) + matches: list[ImageRecord] = [] + for record in records: + record_tags = [tag.lower() for tag in record.tags()] + if not record_tags: + continue + if mode == "all": + if all(tag in record_tags for tag in normalized_tags): + matches.append(record) + else: + if any(tag in record_tags for tag in normalized_tags): + matches.append(record) + + serialized = [self._record_to_payload(record, include_data=False) for record in matches] + span.set_attribute("result.count", len(serialized)) + return { + "success": True, + "search_tags": normalized_tags, + "match_mode": mode, + "results": serialized, + "count": len(serialized), + } + + def get_image_by_metadata_tags( + self, + tags: Sequence[str] | str, + match_mode: str = "any", + *, + as_json: bool = True, + ) -> str | dict[str, Any]: + result = self._run(self.get_image_by_metadata_tags_async(tags, match_mode=match_mode)) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def add_tags_to_image_async( + self, + text_id: str, + tags: Sequence[str] | str, + *, + replace_existing: bool = False, + ) -> dict[str, Any]: + normalized_tags = self._normalize_tags(tags) + with self._tracer.start_as_current_span( + "database_image_tool.add_tags_to_image", kind=SpanKind.INTERNAL + ) as span: + span.set_attribute("image.text_id", text_id) + span.set_attribute("tags.input", normalized_tags) + span.set_attribute("tags.replace_existing", replace_existing) + + if not normalized_tags: + span.set_attribute("result.status", "invalid_tags") + return { + "success": False, + "error": "No valid tags provided", + "text_id": text_id, + } + + before = await self._store.get_image_by_text_id(text_id, include_data=False) + before_tags = before.tags() if before else [] + updated = await self._store.add_tags( + text_id=text_id, + tags=normalized_tags, + replace_existing=replace_existing, + ) + if updated is None: + span.set_attribute("result.status", "not_found") + return { + "success": False, + "error": f"Image not found with text_id: {text_id}", + "text_id": text_id, + } + + payload = self._record_to_payload(updated, include_data=False) + span.set_attribute("result.status", "success") + span.set_attribute("result.final_tags_count", len(payload.get("tags", []))) + return { + "success": True, + "text_id": text_id, + "previous_tags": before_tags, + "final_tags": payload.get("tags", []), + "replace_existing": replace_existing, + "image": payload, + } + + def add_tags_to_image( + self, + text_id: str, + tags: Sequence[str] | str, + *, + replace_existing: bool = False, + as_json: bool = True, + ) -> str | dict[str, Any]: + result = self._run( + self.add_tags_to_image_async( + text_id, + tags, + replace_existing=replace_existing, + ) + ) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def get_all_available_tags_async(self) -> dict[str, Any]: + with self._tracer.start_as_current_span( + "database_image_tool.get_all_available_tags", kind=SpanKind.INTERNAL + ) as span: + data = await self._store.get_all_available_tags() + span.set_attribute("result.unique_tags", data.get("unique_tags", 0)) + return {"success": True, **data} + + def get_all_available_tags(self, *, as_json: bool = True) -> str | dict[str, Any]: + result = self._run(self.get_all_available_tags_async()) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def search_images_by_metadata_async(self, metadata_query: str) -> dict[str, Any]: + with self._tracer.start_as_current_span( + "database_image_tool.search_images_by_metadata", kind=SpanKind.INTERNAL + ) as span: + span.set_attribute("search.query", metadata_query) + records = await self._store.search_images(metadata_query, include_data=False) + serialized = [self._record_to_payload(record, include_data=False) for record in records] + span.set_attribute("result.count", len(serialized)) + return { + "success": True, + "query": metadata_query, + "results": serialized, + "count": len(serialized), + } + + def search_images_by_metadata(self, metadata_query: str, *, as_json: bool = True) -> str | dict[str, Any]: + result = self._run(self.search_images_by_metadata_async(metadata_query)) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def list_all_images_async(self, include_data: bool = False) -> dict[str, Any]: + with self._tracer.start_as_current_span( + "database_image_tool.list_all_images", kind=SpanKind.INTERNAL + ) as span: + records = await self._store.list_images(include_data=include_data) + serialized = [self._record_to_payload(record, include_data=include_data) for record in records] + span.set_attribute("result.count", len(serialized)) + return {"success": True, "images": serialized, "total_count": len(serialized)} + + def list_all_images(self, *, include_data: bool = False, as_json: bool = True) -> str | dict[str, Any]: + result = self._run(self.list_all_images_async(include_data=include_data)) + return json.dumps(result, ensure_ascii=False) if as_json else result + + async def extract_image_to_current_session_async( + self, + text_id: str, + *, + output_dir: str | Path = "session_images", + ) -> dict[str, Any]: + with self._tracer.start_as_current_span( + "database_image_tool.extract_image", kind=SpanKind.INTERNAL + ) as span: + span.set_attribute("image.text_id", text_id) + saved_path = await self._store.save_image_to_file(text_id=text_id, output_dir=output_dir) + if saved_path is None: + span.set_attribute("result.status", "not_found") + return { + "success": False, + "error": f"Image not found or no data for text_id: {text_id}", + "text_id": text_id, + } + + span.set_attribute("result.status", "extracted") + span.set_attribute("result.path", str(saved_path)) + return {"success": True, "text_id": text_id, "image_path": str(saved_path)} + + def extract_image_to_current_session( + self, + text_id: str, + *, + output_dir: str | Path = "session_images", + ) -> str: + result = self._run( + self.extract_image_to_current_session_async(text_id, output_dir=output_dir) + ) + return result["image_path"] if result.get("success") else result.get("error", "Unexpected error") + + def _run(self, coro: Awaitable[Any]) -> Any: + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + raise RuntimeError( + "DatabaseImageTool synchronous APIs cannot be used while an event loop is running. " + "Use the '*_async' coroutine instead." + ) + + @staticmethod + def _normalize_tags(tags: Sequence[str] | str | None) -> list[str]: + if tags is None: + return [] + if isinstance(tags, str): + candidates = tags.split(",") + else: + candidates = tags + + normalized: list[str] = [] + seen: set[str] = set() + for candidate in candidates: + value = candidate.strip().lower() + if not value or value in seen: + continue + seen.add(value) + normalized.append(value) + return normalized + + @staticmethod + def _record_to_payload(record: ImageRecord, *, include_data: bool) -> dict[str, Any]: + payload = record.to_dict(include_data=include_data) + if payload.get("metadata") is None: + payload["metadata"] = {} + payload["tags"] = record.tags() + payload["image_path"] = record.image_path + return payload diff --git a/python/samples/getting_started/image_spec_poc/image_database.db b/python/samples/getting_started/image_spec_poc/image_database.db new file mode 100644 index 0000000000..ab260ed592 Binary files /dev/null and b/python/samples/getting_started/image_spec_poc/image_database.db differ diff --git a/python/samples/getting_started/image_spec_poc/tracing_with_db_tools.py b/python/samples/getting_started/image_spec_poc/tracing_with_db_tools.py new file mode 100644 index 0000000000..cc81cdf8b2 --- /dev/null +++ b/python/samples/getting_started/image_spec_poc/tracing_with_db_tools.py @@ -0,0 +1,110 @@ +# Copyright (c) Microsoft. All rights reserved. + +# Scenario where the DB has a bunch of base64 images stored and the user provides text_id to fetch the image. + +import asyncio +import base64 +import os +from typing import Annotated + +import dotenv +from agent_framework import ChatAgent, ChatMessage, Role, TextContent +from agent_framework.observability import get_tracer, enable_instrumentation +from agent_framework.azure import AzureAIClient +from azure.ai.projects.aio import AIProjectClient +from azure.identity.aio import AzureCliCredential +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import format_trace_id +from pydantic import Field +from db_setup import SQLiteImageStore +from database_logic_for_storing_images import create_and_store_base64_encoded_images + +dotenv.load_dotenv() + +store: SQLiteImageStore | None = None + +async def get_image_data( + text_id: Annotated[ + str, + Field(description="Fetch stored image metadata, image URI for a given text_id."), + ], +) -> dict[str, object]: + if store is None: + raise RuntimeError("Image store is not initialized") + record = await store.get_image_by_text_id(text_id=text_id, include_data=True) + if record is None or record.image_data is None: + raise ValueError(f"No image stored for text_id={text_id}") + mime_type = record.mime_type or "application/octet-stream" + base64_data = base64.b64encode(record.image_data).decode("utf-8") + # truncate to keep tool payload small + base64_data = base64_data[:256] + "...(truncated)" if len(base64_data) > 256 else base64_data + return { + "description": record.description or "", + "image_uri": f"data:{mime_type};base64,{base64_data}", + "query": { + "sql": f"SELECT * FROM images WHERE text_id = {text_id}", + }, + } + +async def main() -> None: + """Run image analysis with Azure OpenAI and collect telemetry.""" + global store + store = SQLiteImageStore() + async with ( + AzureCliCredential() as credential, + AIProjectClient(endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"], credential=credential) as project, + AzureAIClient(project_client=project) as client, + ): + await client.configure_azure_monitor() + + enable_instrumentation(enable_sensitive_data=True) + + await create_and_store_base64_encoded_images() + + with get_tracer().start_as_current_span( + name="Get Image Data with Tool", kind=SpanKind.CLIENT + ) as current_span: + print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}") + + agent = ChatAgent( + chat_client=client, + name="ImageInspector", + tools=[get_image_data], + instructions=( + "You must call the get_image_data tool using the text_id provided in the user's message before responding. " + "The tool returns a truncated image_uri; do not expand it. " + "After you receive the tool response, output JSON that copies the tool fields exactly (description, image_uri, query). " + "The query field must remain an object with keys sql. Do not convert it to text. " + "Do NOT wrap the JSON in Markdown, code fences, or additional narration." + ), + ) + + message = ChatMessage( + role=Role.USER, + contents=[ + TextContent( + text=( + "Get me the image with the text_id=dog.jpg-20260109T193355\n" + "Return plain text, one field per line, exactly:\n" + "description: \n" + "image_uri: \n" + "sql_query: \n" + "No Markdown or extra text." + ) + ), + ], + ) + + async with agent: + thread = agent.get_new_thread() + response = await agent.run(message, thread=thread, store=True) + + if response.messages: + assistant_reply = response.messages[-1] + print("Assistant response:") + for content in assistant_reply.contents: + print(content) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/image_spec_poc/tracing_with_db_tools_image_with_prompt.py b/python/samples/getting_started/image_spec_poc/tracing_with_db_tools_image_with_prompt.py new file mode 100644 index 0000000000..0a1140b865 --- /dev/null +++ b/python/samples/getting_started/image_spec_poc/tracing_with_db_tools_image_with_prompt.py @@ -0,0 +1,133 @@ +# Copyright (c) Microsoft. All rights reserved. + +# The user passes the prompt + image_uri + text_id to the agent and requests for the image from the database. + +import asyncio +import base64 +import os +from typing import Annotated + +import dotenv +from datetime import datetime, timezone +from agent_framework import ChatAgent, ChatMessage, DataContent, Role, TextContent +from agent_framework.observability import get_tracer, enable_instrumentation +from agent_framework.azure import AzureAIClient +from azure.ai.projects.aio import AIProjectClient +from azure.identity.aio import AzureCliCredential +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import format_trace_id +from pydantic import Field +from db_setup import SQLiteImageStore + +dotenv.load_dotenv() + +store: SQLiteImageStore | None = None + +async def get_image_data( + text_id: Annotated[ + str, + Field(description="Fetch stored image metadata, image URI for a given text_id."), + ], +) -> dict[str, object]: + if store is None: + raise RuntimeError("Image store is not initialized") + record = await store.get_image_by_text_id(text_id=text_id, include_data=True) + if record is None or record.image_data is None: + raise ValueError(f"No image stored for text_id={text_id}") + mime_type = record.mime_type or "application/octet-stream" + base64_data = base64.b64encode(record.image_data).decode("utf-8") + # truncate to keep tool payload small + base64_data = base64_data[:256] + "...(truncated)" if len(base64_data) > 256 else base64_data + return { + "description": record.description or "", + "image_uri": f"data:{mime_type};base64,{base64_data}", + "query": { + "sql": f"SELECT * FROM images WHERE text_id = {text_id}", + }, + } + +# Image encoded using base64 +def create_sample_image() -> tuple[str, str]: + """Load and encode the elephant image as base64.""" + with open("./elephant.jpg", "rb") as f: + image_data = f.read() + image_base64 = base64.b64encode(image_data).decode('utf-8') + image_uri = f"data:image/jpeg;base64,{image_base64}" + return image_uri, image_base64 + + + +async def main() -> None: + global store + store = SQLiteImageStore() + async with ( + AzureCliCredential() as credential, + AIProjectClient(endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"], credential=credential) as project, + AzureAIClient(project_client=project) as client, + ): + await client.configure_azure_monitor() + + enable_instrumentation(enable_sensitive_data=True) + + # Store encoded image in the database + image_uri, image_base64 = create_sample_image() + text_id = f"elephant-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}" + record = await store.add_image_from_base64( + text_id=text_id, + base64_data=image_base64, + image_name="elephant.jpg", + description="Sample elephant image", + metadata={"source": "sample", "scenario": "azure_ai_chat_client"}, + tags=["elephant"], + mime_type="image/jpeg", + ) + print(f"Stored image in SQLite with id={record.id} text_id={record.text_id}") + + + with get_tracer().start_as_current_span( + name="Get Image from DB Tool", kind=SpanKind.CLIENT + ) as current_span: + print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}") + + agent = ChatAgent( + chat_client=client, + name="ImageFetchingAgent", + tools=[get_image_data], + instructions=( + "You must call the get_image_data tool using the text_id provided in the user's message before responding. " + "The tool returns a truncated image_uri; do not expand it." + "Do NOT wrap the JSON in Markdown, code fences, or additional narration." + ), + ) + + # User passes the text_id of the image to be fetched from the database + message = ChatMessage( + role=Role.USER, + contents=[ + TextContent( + text=( + "Get me the image with the text_id=elephant-20260109T191702\n" + "Return plain text, one field per line, exactly:\n" + "description: \n" + "image_uri: \n" + "sql_query: \n" + "No Markdown or extra text." + ) + ), + DataContent(uri=image_uri, media_type="image/jpeg"), + ], + ) + + async with agent: + thread = agent.get_new_thread() + response = await agent.run(message, thread=thread, store=True) + + if response.messages: + assistant_reply = response.messages[-1] + print("Assistant response:") + for content in assistant_reply.contents: + print(content) + + +if __name__ == "__main__": + asyncio.run(main())