diff --git a/README.md b/README.md index 9452c88..e2084c5 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,17 @@ command line. - Multi-channel profiles: manage several channels from one machine and switch per command. - Comments moderation: list, reply, and moderate from the CLI. - Channel analytics queries via the YouTube Analytics API. +- Playlists: bulk-add by search and reorder by views with one command. + +## Power-user moves + +```bash +# Bulk-add to a playlist from a search query (dry-run by default) +ytstudio playlists add PL... --from-search "shorts compilation" -n 20 --execute + +# Reorder a playlist by view count, descending +ytstudio playlists reorder PL... --by views --execute +``` ## Documentation diff --git a/docs/playlists.md b/docs/playlists.md new file mode 100644 index 0000000..ec056cf --- /dev/null +++ b/docs/playlists.md @@ -0,0 +1,110 @@ +# Playlists + +Bulk operations on YouTube playlists from the terminal. YouTube Studio is fine +for tweaking one playlist; this set of commands is for the cases where you +want to add a search query's worth of videos at once, reorder by views, or +sync a manifest across many channels. + +## List and inspect + +```bash +ytstudio playlists list # your playlists +ytstudio playlists list -n 200 -o json # 200, JSON +ytstudio playlists list -o csv > playlists.csv # export + +ytstudio playlists show PL_xxx # one playlist +ytstudio playlists show PL_xxx --items # plus the first 50 items +ytstudio playlists items PL_xxx # full item listing +ytstudio playlists items PL_xxx -n 500 -o csv # paginated CSV +``` + +`list` sorts locally with `--sort date|title|count`. `items` shows the +playlist item id (`PLPLI...`) you need for `remove --item` and `reorder`. + +## Create, update, delete + +```bash +ytstudio playlists create --title "Best of 2026" --privacy public --execute +ytstudio playlists update PL_xxx --title "Best of 2026 (final)" --execute +ytstudio playlists delete PL_xxx --execute # asks to confirm +ytstudio playlists delete PL_xxx --execute --yes # no prompt (CI / agents) +``` + +Mutations are **dry-run by default**; add `--execute` to apply. `update` only +sends the fields you pass; everything else is re-specified from the current +snippet so YouTube does not silently nuke a field by omitting it. + +The channel's auto-uploads playlist (`UU...` belonging to your channel) is +always refused. The check resolves the canonical uploads id via +`channels.list(mine=True).contentDetails.relatedPlaylists.uploads`, so a +random other playlist id that happens to start with `UU` is not blocked. + +## Bulk add + +`add` accepts explicit video ids, a search query, or both. The combined batch +is capped by `--limit`. + +```bash +ytstudio playlists add PL_xxx -v vid_a -v vid_b --execute +ytstudio playlists add PL_xxx --from-search "shorts compilation" -n 20 --execute +ytstudio playlists add PL_xxx -v vid_a --from-search "topic" -n 10 --execute +ytstudio playlists add PL_xxx -v vid_a -v vid_b --position 5 --execute +``` + +`--from-search` calls `search.list(forMine=True, type=video, q=...)`. +`--position N` inserts the first video at position `N`, the second at `N+1`, +and so on, so the on-playlist order matches the CLI order. `--note "text"` +attaches a per-item note. + +!!! warning "Quota cost" + + `playlistItems.insert` costs 50 quota units per video. The default daily + quota (10 000 units) covers ~200 inserts. `search.list` costs an + additional 100 units per call. See [API quota](api-quota.md). + +## Bulk remove + +```bash +ytstudio playlists remove PL_xxx --item PLPLI_a --item PLPLI_b --execute +ytstudio playlists remove PL_xxx --video vid_a --execute +``` + +`--video` resolves to every playlist item for that video in the playlist and +removes all occurrences; pass the same id once even if it appears multiple +times. + +## Reorder + +`reorder` sorts the playlist by views, likes, publish date, or title. + +```bash +ytstudio playlists reorder PL_xxx --by views --execute +ytstudio playlists reorder PL_xxx --by title --order asc --execute +``` + +The playlist must be set to **Manual** sort in YouTube Studio +(Settings -> Sort by -> Manual). If it is not, YouTube returns +`manualSortRequired` and the command stops with an actionable message. + +Writes that become no-ops once earlier moves have shifted neighbouring items +are skipped, so a full reverse on N items costs at most `N - 1` writes +instead of `N`. + +## Output formats + +All read commands take `--output table|json|csv`. CSV uses Python's +`csv.writer`, so titles with commas, newlines, or quotes survive round-trips +through Excel and `pandas.read_csv`. + +## Power moves + +```bash +# Curate a "watch later" from a search +ytstudio playlists add PL_watchlater --from-search "rust async" -n 30 --execute + +# Move the all-time top by views to the front +ytstudio playlists reorder PL_pinned --by views --execute + +# Promote a single video to the top of an existing playlist +ytstudio playlists add PL_pinned -v --position 0 --execute +``` diff --git a/mkdocs.yml b/mkdocs.yml index 0b1b57b..e3b3e8c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -96,6 +96,7 @@ nav: - Multi-channel profiles: profiles.md - Live broadcasts: livestreams.md - Videos (bulk update, upload): videos.md + - Playlists: playlists.md - Analytics: analytics.md - Comments: comments.md - Reference: diff --git a/src/ytstudio/commands/playlists.py b/src/ytstudio/commands/playlists.py new file mode 100644 index 0000000..fbda088 --- /dev/null +++ b/src/ytstudio/commands/playlists.py @@ -0,0 +1,919 @@ +import csv +import json +import sys +from dataclasses import asdict, dataclass, field + +import typer +from google.auth.exceptions import RefreshError +from googleapiclient.errors import HttpError +from rich.prompt import Confirm + +from ytstudio.api import api, handle_api_error +from ytstudio.services import get_data_service +from ytstudio.ui import ( + console, + create_kv_table, + create_table, + dim, + format_number, + success_message, + truncate, +) + +app = typer.Typer(help="Playlist management commands") + + +@dataclass +class Playlist: + id: str + title: str + description: str + published_at: str + item_count: int + privacy: str + default_language: str | None = None + localizations: dict = field(default_factory=dict) + + +@dataclass +class PlaylistItem: + id: str + video_id: str + title: str + position: int + added_at: str + note: str = "" + + +# Uploads playlists are channel-owned and cannot be mutated through the +# playlists or playlistItems APIs. Resolve the canonical id once per service. +_uploads_id_cache: dict[int, str | None] = {} + + +def _resolve_uploads_id(service) -> str | None: + sid = id(service) + if sid not in _uploads_id_cache: + response = api(service.channels().list(part="contentDetails", mine=True)) or {} + items = response.get("items") or [] + uploads = "" + if items: + related = (items[0].get("contentDetails") or {}).get("relatedPlaylists") or {} + uploads = related.get("uploads") or "" + _uploads_id_cache[sid] = uploads or None + return _uploads_id_cache[sid] + + +def _refuse_uploads_playlist(service, playlist_id: str) -> None: + if playlist_id == _resolve_uploads_id(service): + console.print( + "[red]Cannot modify the channel uploads playlist. " + "Manage video privacy or delete videos instead.[/red]" + ) + raise typer.Exit(1) + + +def _execute_or_session_exit(request): + """Run the request and translate a revoked OAuth token into a friendly exit.""" + try: + return request.execute() + except RefreshError: + console.print( + "\n[red]Session expired or revoked.[/red] " + "Run [bold]ytstudio login[/bold] to re-authenticate." + ) + raise typer.Exit(1) from None + + +def _http_reason(error: HttpError) -> str: + detail = error.error_details[0] if getattr(error, "error_details", None) else {} + if isinstance(detail, dict): + return detail.get("reason", "") or "" + return "" + + +def _parse_playlist(item: dict) -> Playlist: + snippet = item.get("snippet") or {} + content = item.get("contentDetails") or {} + status = item.get("status") or {} + return Playlist( + id=str(item["id"]), + title=snippet.get("title", ""), + description=snippet.get("description", ""), + published_at=snippet.get("publishedAt", ""), + item_count=int(content.get("itemCount", 0) or 0), + privacy=status.get("privacyStatus", "unknown"), + default_language=snippet.get("defaultLanguage"), + localizations=item.get("localizations", {}) or {}, + ) + + +def _parse_playlist_item(item: dict) -> PlaylistItem: + snippet = item.get("snippet") or {} + content = item.get("contentDetails") or {} + resource = snippet.get("resourceId") or {} + return PlaylistItem( + id=str(item.get("id", "")), + video_id=resource.get("videoId") or content.get("videoId", ""), + title=snippet.get("title", ""), + position=int(snippet.get("position", 0) or 0), + added_at=snippet.get("publishedAt", ""), + note=content.get("note", "") or "", + ) + + +def _fetch_playlist(service, playlist_id: str) -> Playlist | None: + response = api( + service.playlists().list( + part="snippet,contentDetails,status,localizations", + id=playlist_id, + ) + ) + items = (response or {}).get("items", []) + if not items: + return None + return _parse_playlist(items[0]) + + +def _fetch_all_items(service, playlist_id: str, limit: int | None = None) -> list[PlaylistItem]: + """Paginate through playlistItems, capping at `limit` if provided.""" + items: list[PlaylistItem] = [] + page_token: str | None = None + + while True: + remaining = None if limit is None else max(0, limit - len(items)) + if remaining == 0: + break + batch_size = 50 if remaining is None else min(50, remaining) + + response = ( + api( + service.playlistItems().list( + part="snippet,contentDetails", + playlistId=playlist_id, + maxResults=batch_size, + pageToken=page_token, + ) + ) + or {} + ) + + for raw in response.get("items", []): + items.append(_parse_playlist_item(raw)) + + page_token = response.get("nextPageToken") + if not page_token: + break + + return items + + +def _list_items_page(service, playlist_id: str, max_results: int, page_token: str | None) -> dict: + response = ( + api( + service.playlistItems().list( + part="snippet,contentDetails", + playlistId=playlist_id, + maxResults=max_results, + pageToken=page_token, + ) + ) + or {} + ) + parsed = [_parse_playlist_item(raw) for raw in response.get("items", [])] + return { + "items": parsed, + "next_page_token": response.get("nextPageToken"), + "total_results": (response.get("pageInfo") or {}).get("totalResults", 0), + } + + +def _print_playlists_table(playlists: list[Playlist]) -> None: + table = create_table() + table.add_column("ID", style="yellow") + table.add_column("Title", style="cyan") + table.add_column("Items", justify="right") + table.add_column("Privacy") + table.add_column("Updated") + + for p in playlists: + table.add_row( + p.id, + truncate(p.title), + format_number(p.item_count), + p.privacy, + p.published_at[:10], + ) + console.print(table) + + +@app.command("list") +def list_playlists( + limit: int = typer.Option(50, "--limit", "-n", help="Number of playlists to list"), + page_token: str = typer.Option(None, "--page-token", "-p", help="Page token for pagination"), + sort: str = typer.Option("date", "--sort", "-s", help="Sort by: date, title, count"), + output: str = typer.Option("table", "--output", "-o", help="Output format: table, json, csv"), +): + """List your playlists.""" + service = get_data_service() + + all_playlists: list[Playlist] = [] + current_token = page_token + next_page_token: str | None = None + total_results = 0 + + while len(all_playlists) < limit: + batch = min(limit - len(all_playlists), 50) + response = ( + api( + service.playlists().list( + part="snippet,contentDetails,status", + mine=True, + maxResults=batch, + pageToken=current_token, + ) + ) + or {} + ) + + if not total_results: + total_results = (response.get("pageInfo") or {}).get("totalResults", 0) + + for raw in response.get("items", []): + all_playlists.append(_parse_playlist(raw)) + + next_page_token = response.get("nextPageToken") + if not next_page_token: + break + current_token = next_page_token + + if sort == "title": + all_playlists.sort(key=lambda p: p.title.lower()) + elif sort == "count": + all_playlists.sort(key=lambda p: p.item_count, reverse=True) + + if output == "json": + print( + json.dumps( + { + "playlists": [asdict(p) for p in all_playlists], + "next_page_token": next_page_token, + "total_results": total_results, + }, + indent=2, + ) + ) + return + + if output == "csv": + writer = csv.writer(sys.stdout, lineterminator="\n") + writer.writerow(["id", "title", "items", "privacy", "published_at"]) + for p in all_playlists: + writer.writerow([p.id, p.title, p.item_count, p.privacy, p.published_at]) + return + + _print_playlists_table(all_playlists) + + if next_page_token: + console.print(f"\nNext page: --page-token {next_page_token}") + + +@app.command() +def show( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + output: str = typer.Option("table", "--output", "-o", help="Output format: table, json"), + items: bool = typer.Option( + False, "--items", "-i", help="Also fetch and render the first 50 items" + ), +): + """Show details for a specific playlist.""" + service = get_data_service() + playlist = _fetch_playlist(service, playlist_id) + + if not playlist: + console.print(f"[red]Playlist not found: {playlist_id}[/red]") + raise typer.Exit(1) + + rendered_items: list[PlaylistItem] = [] + if items: + rendered_items = _fetch_all_items(service, playlist_id, limit=50) + + if output == "json": + payload: dict = {"playlist": asdict(playlist)} + if items: + payload["items"] = [asdict(it) for it in rendered_items] + print(json.dumps(payload, indent=2)) + return + + console.print(f"\n[bold]{playlist.title}[/bold]\n") + + table = create_kv_table() + table.add_column("field", style="dim") + table.add_column("value") + table.add_row("title", playlist.title) + table.add_row("description", playlist.description or "-") + table.add_row("items", format_number(playlist.item_count)) + table.add_row("privacy", playlist.privacy) + table.add_row("published", playlist.published_at[:10] if playlist.published_at else "-") + table.add_row("language", playlist.default_language or "-") + console.print(table) + + if not items: + return + + if not rendered_items: + console.print(dim("\nPlaylist is empty.")) + return + + console.print() + items_table = create_table() + items_table.add_column("Position", justify="right") + items_table.add_column("Video ID", style="yellow") + items_table.add_column("Title", style="cyan") + items_table.add_column("Added") + for it in rendered_items: + items_table.add_row( + str(it.position), + it.video_id, + truncate(it.title), + it.added_at[:10] if it.added_at else "-", + ) + console.print(items_table) + + +@app.command() +def create( + title: str = typer.Option(..., "--title", "-t", help="Playlist title"), + description: str = typer.Option("", "--description", "-d", help="Playlist description"), + privacy: str = typer.Option("private", "--privacy", help="Privacy: private, public, unlisted"), + language: str = typer.Option(None, "--language", help="Default language tag (e.g. en, nl)"), + execute: bool = typer.Option(False, "--execute", help="Create the playlist (default dry-run)"), +): + """Create a new playlist.""" + if privacy not in {"private", "public", "unlisted"}: + console.print("[red]Invalid --privacy. Use: private, public, unlisted[/red]") + raise typer.Exit(2) + + snippet: dict = {"title": title, "description": description} + if language: + snippet["defaultLanguage"] = language + body = {"snippet": snippet, "status": {"privacyStatus": privacy}} + + if not execute: + console.print("[bold]Preview new playlist:[/bold]\n") + preview = create_kv_table() + preview.add_column("field", style="dim") + preview.add_column("value") + preview.add_row("title", title) + preview.add_row("description", description or "-") + preview.add_row("privacy", privacy) + preview.add_row("language", language or "-") + console.print(preview) + console.print("\nRun with --execute to create.") + return + + service = get_data_service() + response = api(service.playlists().insert(part="snippet,status", body=body)) or {} + playlist_id = response.get("id", "unknown") + success_message(f"Created: {title} ({playlist_id})") + + +@app.command() +def update( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + title: str = typer.Option(None, "--title", "-t", help="New title"), + description: str = typer.Option(None, "--description", "-d", help="New description"), + privacy: str = typer.Option(None, "--privacy", help="New privacy: private, public, unlisted"), + language: str = typer.Option(None, "--language", help="New default language"), + execute: bool = typer.Option(False, "--execute", help="Apply changes (default is dry-run)"), +): + """Update a playlist's metadata.""" + if all(v is None for v in (title, description, privacy, language)): + console.print( + "[yellow]Nothing to update. Provide --title, --description, --privacy, " + "or --language[/yellow]" + ) + raise typer.Exit(1) + + if privacy is not None and privacy not in {"private", "public", "unlisted"}: + console.print("[red]Invalid --privacy. Use: private, public, unlisted[/red]") + raise typer.Exit(2) + + service = get_data_service() + _refuse_uploads_playlist(service, playlist_id) + current = _fetch_playlist(service, playlist_id) + if not current: + console.print(f"[red]Playlist not found: {playlist_id}[/red]") + raise typer.Exit(1) + + new_title = title if title is not None else current.title + new_description = description if description is not None else current.description + new_privacy = privacy if privacy is not None else current.privacy + new_language = language if language is not None else current.default_language + + if not execute: + console.print("[bold]Preview changes:[/bold]\n") + preview = create_kv_table() + preview.add_column("field", style="dim") + preview.add_column("current") + preview.add_column("new", style="green") + if title is not None: + preview.add_row("title", current.title, new_title) + if description is not None: + preview.add_row("description", current.description or "-", new_description or "-") + if privacy is not None: + preview.add_row("privacy", current.privacy, new_privacy) + if language is not None: + preview.add_row("language", current.default_language or "-", new_language or "-") + console.print(preview) + console.print("\nRun with --execute to apply.") + return + + # snippet PUTs replace the whole part, so we always re-specify the current + # title and description even when only changing one field. + snippet: dict = {"title": new_title, "description": new_description} + if new_language: + snippet["defaultLanguage"] = new_language + body = { + "id": playlist_id, + "snippet": snippet, + "status": {"privacyStatus": new_privacy}, + } + + api(service.playlists().update(part="snippet,status", body=body)) + success_message(f"Updated: {new_title}") + + +@app.command() +def delete( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + execute: bool = typer.Option(False, "--execute", help="Apply deletion (default is dry-run)"), + yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), +): + """Delete a playlist.""" + service = get_data_service() + _refuse_uploads_playlist(service, playlist_id) + current = _fetch_playlist(service, playlist_id) + if not current: + console.print(f"[red]Playlist not found: {playlist_id}[/red]") + raise typer.Exit(1) + + if not execute: + console.print( + f"[yellow]Would delete:[/yellow] {current.title} ({current.item_count} items)" + ) + console.print("\nRun with --execute to apply.") + return + + if not yes and not Confirm.ask(f"Delete playlist '{current.title}'?", default=False): + console.print(dim("Aborted.")) + return + + api(service.playlists().delete(id=playlist_id)) + success_message(f"Deleted: {current.title}") + + +@app.command() +def items( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + limit: int = typer.Option(50, "--limit", "-n", help="Number of items to list"), + page_token: str = typer.Option(None, "--page-token", "-p", help="Page token for pagination"), + output: str = typer.Option("table", "--output", "-o", help="Output format: table, json, csv"), +): + """List the items in a playlist.""" + service = get_data_service() + + all_items: list[PlaylistItem] = [] + current_token = page_token + next_page_token: str | None = None + total_results = 0 + + while len(all_items) < limit: + batch = min(limit - len(all_items), 50) + result = _list_items_page(service, playlist_id, batch, current_token) + if not total_results: + total_results = result["total_results"] + all_items.extend(result["items"]) + next_page_token = result["next_page_token"] + if not next_page_token: + break + current_token = next_page_token + + if output == "json": + print( + json.dumps( + { + "items": [asdict(it) for it in all_items], + "next_page_token": next_page_token, + "total_results": total_results, + }, + indent=2, + ) + ) + return + + if output == "csv": + writer = csv.writer(sys.stdout, lineterminator="\n") + writer.writerow(["item_id", "video_id", "position", "title", "added_at"]) + for it in all_items: + writer.writerow([it.id, it.video_id, it.position, it.title, it.added_at]) + return + + if not all_items: + console.print(dim("Playlist is empty.")) + return + + table = create_table() + table.add_column("Position", justify="right") + table.add_column("Item ID", style="yellow") + table.add_column("Video ID") + table.add_column("Title", style="cyan") + table.add_column("Added") + for it in all_items: + table.add_row( + str(it.position), + it.id, + it.video_id, + truncate(it.title), + it.added_at[:10] if it.added_at else "-", + ) + console.print(table) + + if next_page_token: + console.print(f"\nNext page: --page-token {next_page_token}") + + +def _resolve_search_video_ids(service, query: str, limit: int) -> list[tuple[str, str]]: + """Return [(video_id, title)] from search().list, capped at `limit`.""" + results: list[tuple[str, str]] = [] + page_token: str | None = None + + while len(results) < limit: + batch = min(50, limit - len(results)) + response = ( + api( + service.search().list( + part="snippet", + forMine=True, + type="video", + q=query, + maxResults=batch, + pageToken=page_token, + ) + ) + or {} + ) + + for item in response.get("items", []): + vid_field = item.get("id") or {} + video_id = vid_field.get("videoId") if isinstance(vid_field, dict) else None + if not video_id: + continue + title = (item.get("snippet") or {}).get("title", "") + results.append((video_id, title)) + if len(results) >= limit: + break + + page_token = response.get("nextPageToken") + if not page_token: + break + + return results[:limit] + + +@app.command() +def add( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + video: list[str] = typer.Option(None, "--video", "-v", help="Video ID to add (repeatable)"), + from_search: str = typer.Option( + None, "--from-search", help="Add the top results from a search of your videos" + ), + limit: int = typer.Option( + 50, "--limit", "-n", help="Max videos to add per invocation (search mode)" + ), + position: int = typer.Option(None, "--position", help="Insert at this position"), + note: str = typer.Option(None, "--note", help="Set contentDetails.note on each item"), + execute: bool = typer.Option(False, "--execute", help="Apply changes (default is dry-run)"), +): + """Add videos to a playlist by ID or search query. + + If --video is passed more than once, every ID is added. --from-search runs + search().list(forMine=True, type=video, q=...) and adds up to --limit hits. + Quota is 50 units per inserted video; a running counter is printed. + """ + if not video and not from_search: + console.print("[red]Pass at least one --video or --from-search[/red]") + raise typer.Exit(2) + + service = get_data_service() + _refuse_uploads_playlist(service, playlist_id) + + candidates: list[tuple[str, str]] = [] + if video: + candidates.extend((vid, "") for vid in video) + if from_search: + # Reserve any limit headroom not already taken by explicit --video entries + # so the combined batch stays under --limit. + headroom = max(0, limit - len(candidates)) + found = _resolve_search_video_ids(service, from_search, headroom) if headroom else [] + if not found and not video: + console.print("[yellow]No videos matched search[/yellow]") + raise typer.Exit(0) + candidates.extend(found) + + candidates = candidates[:limit] + + table = create_table() + table.add_column("Video ID", style="yellow") + table.add_column("Title", style="cyan") + table.add_column("Position", justify="right") + for i, (vid, title) in enumerate(candidates): + pos_str = str(position + i) if position is not None else "-" + table.add_row(vid, truncate(title) if title else "-", pos_str) + + if not execute: + console.print(dim(f"Pending {len(candidates)} adds\n")) + console.print(table) + console.print("\n[dim]Run with --execute to apply changes[/dim]") + return + + console.print(dim(f"Adding {len(candidates)} videos\n")) + console.print(table) + console.print() + + added = 0 + failed = 0 + current_position = position + for idx, (vid, _title) in enumerate(candidates, start=1): + snippet: dict = { + "playlistId": playlist_id, + "resourceId": {"kind": "youtube#video", "videoId": vid}, + } + if current_position is not None: + snippet["position"] = current_position + body: dict = {"snippet": snippet} + if note is not None: + body["contentDetails"] = {"note": note} + + try: + _execute_or_session_exit( + service.playlistItems().insert(part="snippet,contentDetails", body=body) + ) + console.print(f"[green]Added[/green] {idx}/{len(candidates)} {vid}") + added += 1 + if current_position is not None: + current_position += 1 + except HttpError as e: + reason = _http_reason(e) + if reason == "quotaExceeded": + console.print(f"\n[bold]Partial progress:[/bold] {added} added, {failed} failed") + handle_api_error(e) + if reason == "playlistContainsMaximumNumberOfVideos": + console.print( + f"[red]Playlist reached the 5000-item limit.[/red] Stopped at {added} added." + ) + raise typer.Exit(1) from None + if reason == "videoNotFound": + console.print(f"[red]Not found[/red] {vid}") + failed += 1 + continue + console.print(f"[red]Failed[/red] {vid}: {e}") + failed += 1 + + console.print(f"\n[bold]Done:[/bold] {added} added, {failed} failed") + + +def _resolve_video_to_item_ids(service, playlist_id: str, video_id: str) -> list[str]: + """Return every playlistItem id for `video_id` in this playlist.""" + found: list[str] = [] + page_token: str | None = None + while True: + response = ( + api( + service.playlistItems().list( + part="snippet", + playlistId=playlist_id, + videoId=video_id, + maxResults=50, + pageToken=page_token, + ) + ) + or {} + ) + for raw in response.get("items", []): + if raw.get("id"): + found.append(raw["id"]) + page_token = response.get("nextPageToken") + if not page_token: + break + return found + + +@app.command() +def remove( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + item: list[str] = typer.Option( + None, "--item", "-i", help="Playlist item ID (PLPLI...) to remove (repeatable)" + ), + video: list[str] = typer.Option( + None, + "--video", + "-v", + help=( + "Video ID to remove (repeatable). Resolves to all playlist items for that video; " + "duplicates are all removed." + ), + ), + execute: bool = typer.Option(False, "--execute", help="Apply changes (default is dry-run)"), +): + """Remove items from a playlist by item id or video id.""" + if not item and not video: + console.print("[red]Pass at least one --item or --video[/red]") + raise typer.Exit(2) + + service = get_data_service() + _refuse_uploads_playlist(service, playlist_id) + + targets: list[str] = list(item) if item else [] + if video: + for vid in video: + resolved = _resolve_video_to_item_ids(service, playlist_id, vid) + if not resolved: + console.print(f"[yellow]Not in playlist: {vid}[/yellow]") + continue + targets.extend(resolved) + + if not targets: + console.print("[yellow]Nothing to remove[/yellow]") + raise typer.Exit(0) + + table = create_table() + table.add_column("Item ID", style="yellow") + for item_id in targets: + table.add_row(item_id) + + if not execute: + console.print(dim(f"Pending {len(targets)} removals\n")) + console.print(table) + console.print("\n[dim]Run with --execute to apply changes[/dim]") + return + + console.print(dim(f"Removing {len(targets)} items\n")) + console.print(table) + console.print() + + removed = 0 + failed = 0 + for idx, item_id in enumerate(targets, start=1): + try: + _execute_or_session_exit(service.playlistItems().delete(id=item_id)) + console.print(f"[green]Removed[/green] {idx}/{len(targets)} {item_id}") + removed += 1 + except HttpError as e: + reason = _http_reason(e) + if reason == "quotaExceeded": + console.print( + f"\n[bold]Partial progress:[/bold] {removed} removed, {failed} failed" + ) + handle_api_error(e) + console.print(f"[red]Failed[/red] {item_id}: {e}") + failed += 1 + + console.print(f"\n[bold]Done:[/bold] {removed} removed, {failed} failed") + + +def _hydrate_sort_keys(service, video_ids: list[str]) -> dict[str, dict]: + """Batch-fetch statistics+snippet for `video_ids` in chunks of 50.""" + out: dict[str, dict] = {} + for start in range(0, len(video_ids), 50): + chunk = video_ids[start : start + 50] + response = ( + api( + service.videos().list( + part="statistics,snippet", + id=",".join(chunk), + ) + ) + or {} + ) + for vid_item in response.get("items", []): + out[vid_item["id"]] = vid_item + return out + + +def _sort_key(item: PlaylistItem, hydrated: dict, by: str): + """Compose (primary, video_id) sort tuple; video_id keeps ties stable.""" + data = hydrated.get(item.video_id, {}) + snippet = data.get("snippet", {}) + stats = data.get("statistics", {}) + if by == "views": + primary = int(stats.get("viewCount", 0) or 0) + elif by == "likes": + primary = int(stats.get("likeCount", 0) or 0) + elif by == "published": + primary = snippet.get("publishedAt", "") + elif by == "title": + primary = snippet.get("title", "").lower() + else: + primary = 0 + return (primary, item.video_id) + + +@app.command() +def reorder( + playlist_id: str = typer.Argument(..., help="Playlist ID"), + by: str = typer.Option("views", "--by", help="Sort by: views, likes, published, title"), + order: str = typer.Option("desc", "--order", help="Order: asc, desc"), + execute: bool = typer.Option(False, "--execute", help="Apply changes (default is dry-run)"), +): + """Reorder a playlist by one of views, likes, published, title.""" + if by not in {"views", "likes", "published", "title"}: + console.print("[red]Invalid --by. Use: views, likes, published, title[/red]") + raise typer.Exit(2) + if order not in {"asc", "desc"}: + console.print("[red]Invalid --order. Use: asc, desc[/red]") + raise typer.Exit(2) + + service = get_data_service() + _refuse_uploads_playlist(service, playlist_id) + + items = _fetch_all_items(service, playlist_id) + if not items: + console.print(dim("Playlist is empty.")) + return + + hydrated = _hydrate_sort_keys(service, [it.video_id for it in items]) + + reverse = order == "desc" + sorted_items = sorted(items, key=lambda it: _sort_key(it, hydrated, by), reverse=reverse) + + moves: list[tuple[PlaylistItem, int, int]] = [] + for new_pos, it in enumerate(sorted_items): + if it.position != new_pos: + moves.append((it, it.position, new_pos)) + + if not moves: + console.print("[yellow]No changes; playlist already sorted.[/yellow]") + return + + moves.sort(key=lambda m: m[2]) + + table = create_table() + table.add_column("Item ID", style="yellow") + table.add_column("Title", style="cyan") + table.add_column("Current", justify="right") + table.add_column("Target", justify="right") + for it, cur, tgt in moves: + table.add_row(it.id, truncate(it.title), str(cur), str(tgt)) + + if not execute: + console.print(dim(f"Pending {len(moves)} moves\n")) + console.print(table) + console.print("\n[dim]Run with --execute to apply changes[/dim]") + return + + console.print(dim(f"Applying {len(moves)} moves\n")) + console.print(table) + console.print() + + # Track the playlist order locally so we can skip writes that would be no-ops + # after prior moves shift positions. Each successful update is mirrored here. + live = [it.id for it in items] + updated = 0 + skipped = 0 + failed = 0 + for idx, (it, _cur, tgt) in enumerate(moves, start=1): + if live.index(it.id) == tgt: + skipped += 1 + continue + body = { + "id": it.id, + "snippet": { + "playlistId": playlist_id, + "resourceId": {"kind": "youtube#video", "videoId": it.video_id}, + "position": tgt, + }, + } + try: + _execute_or_session_exit(service.playlistItems().update(part="snippet", body=body)) + console.print(f"[green]Moved[/green] {idx}/{len(moves)} {it.id} -> {tgt}") + updated += 1 + live.remove(it.id) + live.insert(tgt, it.id) + except HttpError as e: + reason = _http_reason(e) + if reason == "manualSortRequired": + console.print( + "[red]Playlist is not set to Manual sort. " + "Open it in YouTube Studio -> Sort by -> Manual, then retry.[/red]" + ) + raise typer.Exit(1) from None + if reason == "quotaExceeded": + console.print(f"\n[bold]Partial progress:[/bold] {updated} moved, {failed} failed") + handle_api_error(e) + console.print(f"[red]Failed[/red] {it.id}: {e}") + failed += 1 + + summary = f"\n[bold]Done:[/bold] {updated} moved, {failed} failed" + if skipped: + summary += f", {skipped} already in place" + console.print(summary) diff --git a/src/ytstudio/main.py b/src/ytstudio/main.py index 57339a3..6de2269 100644 --- a/src/ytstudio/main.py +++ b/src/ytstudio/main.py @@ -4,7 +4,7 @@ from rich.console import Console from ytstudio.api import authenticate, get_status -from ytstudio.commands import analytics, comments, livestreams, profile, videos +from ytstudio.commands import analytics, comments, livestreams, playlists, profile, videos from ytstudio.config import migrate_legacy_credentials, setup_credentials from ytstudio.version import get_current_version, is_update_available @@ -21,6 +21,7 @@ app.add_typer(analytics.app, name="analytics") app.add_typer(comments.app, name="comments") app.add_typer(livestreams.app, name="livestreams") +app.add_typer(playlists.app, name="playlists") app.add_typer(profile.app, name="profile") diff --git a/tests/conftest.py b/tests/conftest.py index 836cb91..c43ef39 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,8 @@ import pytest +from ytstudio.commands import playlists as _playlists_module + MOCK_CHANNEL = { "id": "UC_test_channel_id", "snippet": { @@ -96,6 +98,29 @@ }, } +MOCK_PLAYLIST = { + "id": "PL_test_123", + "snippet": { + "title": "Test Playlist", + "description": "desc", + "publishedAt": "2026-01-01T00:00:00Z", + "defaultLanguage": "en", + }, + "contentDetails": {"itemCount": 3}, + "status": {"privacyStatus": "private"}, +} + +MOCK_PLAYLIST_ITEM_FULL = { + "id": "PLPLI_test_item_1", + "snippet": { + "title": "Item title", + "publishedAt": "2026-01-02T00:00:00Z", + "position": 0, + "resourceId": {"kind": "youtube#video", "videoId": "test_video_123"}, + }, + "contentDetails": {"videoId": "test_video_123", "note": ""}, +} + def create_mock_service(): service = MagicMock() @@ -133,6 +158,38 @@ def create_mock_service(): } service.search.return_value.list.return_value = search_list + playlists_list = MagicMock() + playlists_list.execute.return_value = { + "items": [MOCK_PLAYLIST], + "nextPageToken": None, + "pageInfo": {"totalResults": 1}, + } + service.playlists.return_value.list.return_value = playlists_list + + playlists_insert = MagicMock() + playlists_insert.execute.return_value = MOCK_PLAYLIST + service.playlists.return_value.insert.return_value = playlists_insert + + playlists_update = MagicMock() + playlists_update.execute.return_value = MOCK_PLAYLIST + service.playlists.return_value.update.return_value = playlists_update + + playlists_delete = MagicMock() + playlists_delete.execute.return_value = "" + service.playlists.return_value.delete.return_value = playlists_delete + + playlist_items_insert = MagicMock() + playlist_items_insert.execute.return_value = MOCK_PLAYLIST_ITEM_FULL + service.playlistItems.return_value.insert.return_value = playlist_items_insert + + playlist_items_update = MagicMock() + playlist_items_update.execute.return_value = MOCK_PLAYLIST_ITEM_FULL + service.playlistItems.return_value.update.return_value = playlist_items_update + + playlist_items_delete = MagicMock() + playlist_items_delete.execute.return_value = "" + service.playlistItems.return_value.delete.return_value = playlist_items_delete + return service @@ -141,6 +198,13 @@ def mock_service(): return create_mock_service() +@pytest.fixture(autouse=True) +def _clear_playlists_caches(): + _playlists_module._uploads_id_cache.clear() + yield + _playlists_module._uploads_id_cache.clear() + + @pytest.fixture def mock_auth(mock_service): mock_creds = MagicMock() diff --git a/tests/test_playlists.py b/tests/test_playlists.py new file mode 100644 index 0000000..2c9d23e --- /dev/null +++ b/tests/test_playlists.py @@ -0,0 +1,705 @@ +import csv +import io +import json +from unittest.mock import MagicMock, patch + +from google.auth.exceptions import RefreshError +from googleapiclient.errors import HttpError +from typer.testing import CliRunner + +from tests.conftest import MOCK_PLAYLIST, MOCK_PLAYLIST_ITEM_FULL +from ytstudio.main import app + +runner = CliRunner() + + +def _set_playlists_list(mock_auth, payload): + list_mock = MagicMock() + list_mock.execute.return_value = payload + mock_auth.playlists.return_value.list.return_value = list_mock + + +def _set_playlist_items_list(mock_auth, payload): + list_mock = MagicMock() + list_mock.execute.return_value = payload + mock_auth.playlistItems.return_value.list.return_value = list_mock + + +def _set_playlist_items_list_side_effect(mock_auth, payloads): + list_mock = MagicMock() + list_mock.execute.side_effect = payloads + mock_auth.playlistItems.return_value.list.return_value = list_mock + + +def _set_videos_list(mock_auth, payload): + videos_mock = MagicMock() + videos_mock.execute.return_value = payload + mock_auth.videos.return_value.list.return_value = videos_mock + + +def _set_search_list(mock_auth, items): + search_mock = MagicMock() + search_mock.execute.return_value = {"items": items} + mock_auth.search.return_value.list.return_value = search_mock + + +def _http_error(status: int, reason: str) -> HttpError: + resp = MagicMock() + resp.status = status + err = HttpError(resp=resp, content=b"{}") + err.error_details = [{"reason": reason}] + return err + + +class TestPlaylistsList: + def test_list_renders_table(self, mock_auth): + result = runner.invoke(app, ["playlists", "list"]) + assert result.exit_code == 0 + assert "Test Playlist" in result.stdout + assert "PL_test_123" in result.stdout + + def test_list_json_output(self, mock_auth): + result = runner.invoke(app, ["playlists", "list", "-o", "json"]) + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["playlists"][0]["id"] == "PL_test_123" + + def test_list_csv_output(self, mock_auth): + result = runner.invoke(app, ["playlists", "list", "-o", "csv"]) + assert result.exit_code == 0 + lines = [ln for ln in result.stdout.strip().splitlines() if ln] + assert lines[0] == "id,title,items,privacy,published_at" + assert any("PL_test_123" in ln for ln in lines[1:]) + + def test_list_sort_count_local(self, mock_auth): + second = { + **MOCK_PLAYLIST, + "id": "PL_other", + "snippet": {**MOCK_PLAYLIST["snippet"], "title": "Big Playlist"}, + "contentDetails": {"itemCount": 10}, + } + _set_playlists_list( + mock_auth, + { + "items": [MOCK_PLAYLIST, second], + "nextPageToken": None, + "pageInfo": {"totalResults": 2}, + }, + ) + + result = runner.invoke(app, ["playlists", "list", "--sort", "count"]) + assert result.exit_code == 0 + out = result.stdout + assert out.index("Big Playlist") < out.index("Test Playlist") + + def test_list_pagination_prints_token(self, mock_auth): + _set_playlists_list( + mock_auth, + { + "items": [MOCK_PLAYLIST], + "nextPageToken": "next_tok", + "pageInfo": {"totalResults": 1}, + }, + ) + result = runner.invoke(app, ["playlists", "list", "-n", "1"]) + assert result.exit_code == 0 + assert "--page-token next_tok" in result.stdout + + +class TestPlaylistsShow: + def test_show_renders_kv_table(self, mock_auth): + result = runner.invoke(app, ["playlists", "show", "PL_test_123"]) + assert result.exit_code == 0 + assert "Test Playlist" in result.stdout + assert "private" in result.stdout + + def test_show_not_found_exits_1(self, mock_auth): + _set_playlists_list(mock_auth, {"items": []}) + result = runner.invoke(app, ["playlists", "show", "PL_missing"]) + assert result.exit_code == 1 + assert "Playlist not found" in result.stdout + + def test_show_with_items_flag_renders_items(self, mock_auth): + second_item = { + **MOCK_PLAYLIST_ITEM_FULL, + "id": "PLPLI_test_item_2", + "snippet": { + **MOCK_PLAYLIST_ITEM_FULL["snippet"], + "title": "Second item", + "position": 1, + "resourceId": {"kind": "youtube#video", "videoId": "vid_2"}, + }, + "contentDetails": {"videoId": "vid_2", "note": ""}, + } + _set_playlist_items_list( + mock_auth, + { + "items": [MOCK_PLAYLIST_ITEM_FULL, second_item], + "nextPageToken": None, + "pageInfo": {"totalResults": 2}, + }, + ) + + result = runner.invoke(app, ["playlists", "show", "PL_test_123", "--items"]) + assert result.exit_code == 0 + assert "Item title" in result.stdout + assert "Second item" in result.stdout + + def test_show_json(self, mock_auth): + result = runner.invoke(app, ["playlists", "show", "PL_test_123", "-o", "json"]) + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["playlist"]["id"] == "PL_test_123" + + +class TestPlaylistsCreate: + def test_create_dry_run_shows_preview(self, mock_auth): + result = runner.invoke(app, ["playlists", "create", "-t", "Brand new"]) + assert result.exit_code == 0 + assert "Preview" in result.stdout + mock_auth.playlists.return_value.insert.assert_not_called() + + def test_create_execute_calls_insert(self, mock_auth): + result = runner.invoke( + app, + ["playlists", "create", "-t", "Brand new", "--execute"], + ) + assert result.exit_code == 0 + call = mock_auth.playlists.return_value.insert.call_args + assert call.kwargs["part"] == "snippet,status" + assert call.kwargs["body"]["snippet"]["title"] == "Brand new" + assert call.kwargs["body"]["status"]["privacyStatus"] == "private" + + def test_create_privacy_choice(self, mock_auth): + result = runner.invoke( + app, + ["playlists", "create", "-t", "Brand new", "--execute"], + ) + assert result.exit_code == 0 + call = mock_auth.playlists.return_value.insert.call_args + assert call.kwargs["body"]["status"]["privacyStatus"] == "private" + + def test_create_requires_title(self, mock_auth): + result = runner.invoke(app, ["playlists", "create"]) + assert result.exit_code == 2 + + +class TestPlaylistsUpdate: + def test_update_dry_run_diff_table(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "update", + "PL_test_123", + "-t", + "Renamed", + "--privacy", + "public", + ], + ) + assert result.exit_code == 0 + assert "Renamed" in result.stdout + assert "public" in result.stdout + mock_auth.playlists.return_value.update.assert_not_called() + + def test_update_execute_merges_existing_fields(self, mock_auth): + result = runner.invoke( + app, + ["playlists", "update", "PL_test_123", "-t", "New title", "--execute"], + ) + assert result.exit_code == 0 + call = mock_auth.playlists.return_value.update.call_args + body = call.kwargs["body"] + assert body["snippet"]["title"] == "New title" + # Re-spec rule: description from current GET is kept. + assert body["snippet"]["description"] == MOCK_PLAYLIST["snippet"]["description"] + + def test_update_no_changes_exits_with_message(self, mock_auth): + result = runner.invoke(app, ["playlists", "update", "PL_test_123"]) + assert result.exit_code == 1 + assert "Nothing to update" in result.stdout + + +class TestPlaylistsDelete: + def test_delete_dry_run_shows_count(self, mock_auth): + result = runner.invoke(app, ["playlists", "delete", "PL_test_123"]) + assert result.exit_code == 0 + assert "Would delete" in result.stdout + assert "3" in result.stdout + + def test_delete_execute_calls_delete(self, mock_auth): + result = runner.invoke(app, ["playlists", "delete", "PL_test_123", "--execute", "--yes"]) + assert result.exit_code == 0 + mock_auth.playlists.return_value.delete.assert_called_with(id="PL_test_123") + + def test_delete_aborts_when_prompt_denied(self, mock_auth): + with patch("ytstudio.commands.playlists.Confirm.ask", return_value=False): + result = runner.invoke(app, ["playlists", "delete", "PL_test_123", "--execute"]) + assert result.exit_code == 0 + mock_auth.playlists.return_value.delete.assert_not_called() + + +class TestPlaylistsItems: + def test_items_table_columns(self, mock_auth): + _set_playlist_items_list( + mock_auth, + { + "items": [MOCK_PLAYLIST_ITEM_FULL], + "nextPageToken": None, + "pageInfo": {"totalResults": 1}, + }, + ) + result = runner.invoke(app, ["playlists", "items", "PL_test_123"]) + assert result.exit_code == 0 + assert "Position" in result.stdout + assert "Video ID" in result.stdout + assert "Title" in result.stdout + + def test_items_pagination_prints_token(self, mock_auth): + _set_playlist_items_list( + mock_auth, + { + "items": [MOCK_PLAYLIST_ITEM_FULL], + "nextPageToken": "next_tok", + "pageInfo": {"totalResults": 1}, + }, + ) + result = runner.invoke(app, ["playlists", "items", "PL_test_123", "-n", "1"]) + assert result.exit_code == 0 + assert "--page-token next_tok" in result.stdout + + +class TestPlaylistsAdd: + def test_add_video_dry_run_shows_preview(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "-v", + "vid_b", + ], + ) + assert result.exit_code == 0 + assert "vid_a" in result.stdout + assert "vid_b" in result.stdout + mock_auth.playlistItems.return_value.insert.assert_not_called() + + def test_add_video_execute_calls_insert_per_video(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "-v", + "vid_b", + "--execute", + ], + ) + assert result.exit_code == 0 + calls = mock_auth.playlistItems.return_value.insert.call_args_list + assert len(calls) == 2 + ids = [c.kwargs["body"]["snippet"]["resourceId"]["videoId"] for c in calls] + assert ids == ["vid_a", "vid_b"] + + def test_add_from_search_uses_search_api(self, mock_auth): + _set_search_list( + mock_auth, + [{"id": {"videoId": f"vid_{i}"}, "snippet": {"title": f"Hit {i}"}} for i in range(5)], + ) + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "--from-search", + "test query", + "-n", + "3", + "--execute", + ], + ) + assert result.exit_code == 0 + calls = mock_auth.playlistItems.return_value.insert.call_args_list + assert len(calls) == 3 + + def test_add_position_sets_snippet_position(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "--position", + "2", + "--execute", + ], + ) + assert result.exit_code == 0 + call = mock_auth.playlistItems.return_value.insert.call_args + assert call.kwargs["body"]["snippet"]["position"] == 2 + + def test_add_quota_exceeded_stops_and_reports_partial(self, mock_auth): + insert_mock = MagicMock() + insert_mock.execute.side_effect = [ + MOCK_PLAYLIST_ITEM_FULL, + _http_error(403, "quotaExceeded"), + ] + mock_auth.playlistItems.return_value.insert.return_value = insert_mock + + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "-v", + "vid_b", + "--execute", + ], + ) + assert result.exit_code == 1 + assert "1 added" in result.stdout + + +class TestPlaylistsRemove: + def test_remove_by_item_id_execute(self, mock_auth): + result = runner.invoke( + app, + ["playlists", "remove", "PL_test_123", "-i", "PLPLI_1", "--execute"], + ) + assert result.exit_code == 0 + mock_auth.playlistItems.return_value.delete.assert_called_with(id="PLPLI_1") + + def test_remove_by_video_resolves_to_item_id(self, mock_auth): + _set_playlist_items_list( + mock_auth, + { + "items": [ + { + "id": "PLPLI_resolved_1", + "snippet": { + "title": "x", + "publishedAt": "2026-01-02T00:00:00Z", + "position": 0, + "resourceId": {"kind": "youtube#video", "videoId": "VID_A"}, + }, + "contentDetails": {"videoId": "VID_A"}, + }, + ], + "nextPageToken": None, + }, + ) + result = runner.invoke( + app, + ["playlists", "remove", "PL_test_123", "-v", "VID_A", "--execute"], + ) + assert result.exit_code == 0 + list_call = mock_auth.playlistItems.return_value.list.call_args + assert list_call.kwargs["videoId"] == "VID_A" + mock_auth.playlistItems.return_value.delete.assert_called_with(id="PLPLI_resolved_1") + + def test_remove_dry_run_no_delete_calls(self, mock_auth): + result = runner.invoke(app, ["playlists", "remove", "PL_test_123", "-i", "PLPLI_1"]) + assert result.exit_code == 0 + mock_auth.playlistItems.return_value.delete.assert_not_called() + + +def _make_item(item_id: str, video_id: str, position: int) -> dict: + return { + "id": item_id, + "snippet": { + "title": f"Title {video_id}", + "publishedAt": "2026-01-02T00:00:00Z", + "position": position, + "resourceId": {"kind": "youtube#video", "videoId": video_id}, + }, + "contentDetails": {"videoId": video_id}, + } + + +def _make_video_stats(video_id: str, views: int) -> dict: + return { + "id": video_id, + "snippet": {"title": f"Title {video_id}", "publishedAt": "2026-01-02T00:00:00Z"}, + "statistics": {"viewCount": str(views)}, + } + + +class TestPlaylistsReorder: + def test_reorder_by_views_descending_dry_run(self, mock_auth): + items = [ + _make_item("PLPLI_a", "vid_a", 0), + _make_item("PLPLI_b", "vid_b", 1), + _make_item("PLPLI_c", "vid_c", 2), + ] + _set_playlist_items_list( + mock_auth, + {"items": items, "nextPageToken": None, "pageInfo": {"totalResults": 3}}, + ) + _set_videos_list( + mock_auth, + { + "items": [ + _make_video_stats("vid_a", 100), + _make_video_stats("vid_b", 500), + _make_video_stats("vid_c", 50), + ] + }, + ) + result = runner.invoke(app, ["playlists", "reorder", "PL_test_123", "--by", "views"]) + assert result.exit_code == 0 + # vid_b (PLPLI_b) was at position 1, should move to 0. + assert "PLPLI_b" in result.stdout + mock_auth.playlistItems.return_value.update.assert_not_called() + + def test_reorder_execute_calls_update_with_required_snippet_fields(self, mock_auth): + items = [ + _make_item("PLPLI_a", "vid_a", 0), + _make_item("PLPLI_b", "vid_b", 1), + ] + _set_playlist_items_list( + mock_auth, + {"items": items, "nextPageToken": None, "pageInfo": {"totalResults": 2}}, + ) + _set_videos_list( + mock_auth, + { + "items": [ + _make_video_stats("vid_a", 10), + _make_video_stats("vid_b", 500), + ] + }, + ) + result = runner.invoke( + app, ["playlists", "reorder", "PL_test_123", "--by", "views", "--execute"] + ) + assert result.exit_code == 0 + calls = mock_auth.playlistItems.return_value.update.call_args_list + assert calls + for call in calls: + body = call.kwargs["body"] + assert body["snippet"]["playlistId"] == "PL_test_123" + assert body["snippet"]["resourceId"]["kind"] == "youtube#video" + assert "videoId" in body["snippet"]["resourceId"] + assert "position" in body["snippet"] + + def test_reorder_skips_unchanged_positions(self, mock_auth): + items = [ + _make_item("PLPLI_a", "vid_a", 0), + _make_item("PLPLI_b", "vid_b", 1), + ] + _set_playlist_items_list( + mock_auth, + {"items": items, "nextPageToken": None, "pageInfo": {"totalResults": 2}}, + ) + _set_videos_list( + mock_auth, + { + "items": [ + _make_video_stats("vid_a", 500), + _make_video_stats("vid_b", 100), + ] + }, + ) + result = runner.invoke(app, ["playlists", "reorder", "PL_test_123", "--by", "views"]) + assert result.exit_code == 0 + assert "No changes" in result.stdout + mock_auth.playlistItems.return_value.update.assert_not_called() + + def test_reorder_handles_manual_sort_required_error(self, mock_auth): + items = [ + _make_item("PLPLI_a", "vid_a", 0), + _make_item("PLPLI_b", "vid_b", 1), + ] + _set_playlist_items_list( + mock_auth, + {"items": items, "nextPageToken": None, "pageInfo": {"totalResults": 2}}, + ) + _set_videos_list( + mock_auth, + { + "items": [ + _make_video_stats("vid_a", 10), + _make_video_stats("vid_b", 500), + ] + }, + ) + update_mock = MagicMock() + update_mock.execute.side_effect = _http_error(400, "manualSortRequired") + mock_auth.playlistItems.return_value.update.return_value = update_mock + + result = runner.invoke( + app, + ["playlists", "reorder", "PL_test_123", "--by", "views", "--execute"], + ) + assert result.exit_code == 1 + assert "Manual sort" in result.stdout or "Manual" in result.stdout + + +class TestPlaylistsReviewFixes: + """Regression tests for the fixes applied after PR review.""" + + def test_add_position_increments_per_insert(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "-v", + "vid_b", + "-v", + "vid_c", + "--position", + "5", + "--execute", + ], + ) + assert result.exit_code == 0 + calls = mock_auth.playlistItems.return_value.insert.call_args_list + positions = [c.kwargs["body"]["snippet"]["position"] for c in calls] + assert positions == [5, 6, 7] + + def test_add_combines_video_and_search_respects_limit(self, mock_auth): + _set_search_list( + mock_auth, + [{"id": {"videoId": f"hit_{i}"}, "snippet": {"title": f"Hit {i}"}} for i in range(5)], + ) + result = runner.invoke( + app, + [ + "playlists", + "add", + "PL_test_123", + "-v", + "vid_a", + "-v", + "vid_b", + "--from-search", + "topic", + "-n", + "3", + "--execute", + ], + ) + assert result.exit_code == 0 + calls = mock_auth.playlistItems.return_value.insert.call_args_list + # 2 explicit videos + 1 search hit = 3, capped by --limit/-n + assert len(calls) == 3 + + def test_uploads_check_refuses_canonical_channel_uploads_id(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "UU_test_uploads_playlist", + "-v", + "vid_a", + "--execute", + ], + ) + assert result.exit_code == 1 + assert "uploads playlist" in result.stdout + mock_auth.playlistItems.return_value.insert.assert_not_called() + + def test_uploads_check_allows_non_uploads_uu_prefix(self, mock_auth): + result = runner.invoke( + app, + [ + "playlists", + "add", + "UU_unrelated_id_starting_with_UU", + "-v", + "vid_a", + "--execute", + ], + ) + # Canonical check resolves uploads to "UU_test_uploads_playlist"; a + # different UU-prefixed id is not refused. + assert result.exit_code == 0 + mock_auth.playlistItems.return_value.insert.assert_called() + + def test_reorder_skips_writes_that_become_no_ops_after_prior_moves(self, mock_auth): + # Full reverse on 4 items: targets become [0, 1, 2, 3] for the new order + # [d, c, b, a]. Applied in target-ascending order, the last write would + # land on the position the item already occupies after prior shifts. + items = [ + _make_item("PLPLI_a", "vid_a", 0), + _make_item("PLPLI_b", "vid_b", 1), + _make_item("PLPLI_c", "vid_c", 2), + _make_item("PLPLI_d", "vid_d", 3), + ] + _set_playlist_items_list( + mock_auth, + {"items": items, "nextPageToken": None, "pageInfo": {"totalResults": 4}}, + ) + _set_videos_list( + mock_auth, + { + "items": [ + _make_video_stats("vid_a", 1), + _make_video_stats("vid_b", 2), + _make_video_stats("vid_c", 3), + _make_video_stats("vid_d", 4), + ] + }, + ) + result = runner.invoke( + app, + ["playlists", "reorder", "PL_test_123", "--by", "views", "--execute"], + ) + assert result.exit_code == 0 + # 4 logical moves planned, but the last one is a no-op after prior shifts. + calls = mock_auth.playlistItems.return_value.update.call_args_list + assert len(calls) == 3 + assert "already in place" in result.stdout + + def test_list_csv_quotes_titles_with_special_chars(self, mock_auth): + nasty = { + **MOCK_PLAYLIST, + "id": "PL_nasty", + "snippet": { + **MOCK_PLAYLIST["snippet"], + "title": 'has, a comma and a "quote" and a\nnewline', + }, + } + _set_playlists_list( + mock_auth, + {"items": [nasty], "nextPageToken": None, "pageInfo": {"totalResults": 1}}, + ) + result = runner.invoke(app, ["playlists", "list", "-o", "csv"]) + assert result.exit_code == 0 + rows = list(csv.reader(io.StringIO(result.stdout))) + # Header + one data row, even with embedded comma/newline in the title. + assert rows[0] == ["id", "title", "items", "privacy", "published_at"] + assert rows[1][0] == "PL_nasty" + assert rows[1][1] == 'has, a comma and a "quote" and a\nnewline' + + def test_add_session_expired_exits_friendly(self, mock_auth): + insert_mock = MagicMock() + insert_mock.execute.side_effect = RefreshError("revoked") + mock_auth.playlistItems.return_value.insert.return_value = insert_mock + + result = runner.invoke( + app, + ["playlists", "add", "PL_test_123", "-v", "vid_a", "--execute"], + ) + assert result.exit_code == 1 + assert "Session expired" in result.stdout + assert "ytstudio login" in result.stdout