diff --git a/blog/helpers.py b/blog/helpers.py index 1ea5c18f8..8535cb93f 100644 --- a/blog/helpers.py +++ b/blog/helpers.py @@ -1,12 +1,14 @@ import os import enum -GIT_REPO = 'https://github.com/superlinked/VectorHub' +GIT_REPO = "https://github.com/superlinked/VectorHub" + class ItemType(enum.Enum): FOLDER = "folder" FILE = "file" + class Item: def __init__(self, type, name, path, has_blogs=False, children=None): self.type = type @@ -36,7 +38,7 @@ def from_dict(cls, data): name=data.get("name", ""), path=data.get("path", ""), has_blogs=data.get("has_blogs", False), - children=data.get("children", []) + children=data.get("children", []), ) def to_dict(self): @@ -62,27 +64,38 @@ def __init__(self, content, filepath, last_updated): self.title = self.get_title() def get_title(self) -> str: - lines = self.content.split('\n') + lines = self.content.split("\n") first_line = str(lines[0]).strip() - if first_line.startswith('# '): - self.content = '\n'.join(lines[1:]) + if first_line.startswith("# "): + self.content = "\n".join(lines[1:]) self.content = self.content.strip() - return first_line.replace('# ', '').strip() + return first_line.replace("# ", "").strip() else: - return os.path.basename(self.filepath).replace('-', ' ').replace('_', ' ').replace('.md', '') + return ( + os.path.basename(self.filepath) + .replace("-", " ") + .replace("_", " ") + .replace(".md", "") + ) def __str__(self) -> str: return self.title def get_github_url(self): - return f'{GIT_REPO}/blob/main/{self.filepath}' - + return f"{GIT_REPO}/blob/main/{self.filepath}" + def get_filepath(self): - return self.filepath.replace('&', '').replace('--', '-').replace('__', '_') - + return self.filepath.replace("&", "").replace("--", "-").replace("__", "_") + def get_slug(self): if not self.slug_url: - slug = self.get_filepath().replace('.md', '').replace('_', '-').replace(' ', '-').replace('docs/', '') + slug = ( + self.get_filepath() + .replace(".md", "") + .replace("_", "-") + .replace(" ", "-") + .replace("docs/", "") + ) self.slug_url = slug.lower() return self.slug_url @@ -94,14 +107,15 @@ def set_slug_url(self, slug_url): def get_json(self): return { - "github_url": self.get_github_url(), - "content": self.content, - "github_last_updated_date": self.last_updated, - "title": self.title, - "slug_url": self.get_slug(), - "publishedAt": self.publishedAt, - "filepath": self.get_filepath() - } + "github_url": self.get_github_url(), + "content": self.content, + "github_last_updated_date": self.last_updated, + "title": self.title, + "slug_url": self.get_slug(), + "publishedAt": self.publishedAt, + "filepath": self.get_filepath(), + "meta_desc": self.meta_desc, + } def get_post_json(self, is_draft=False): return {"data": self.get_json()} diff --git a/blog/main.py b/blog/main.py index 1f19019ee..d9e10fe2b 100644 --- a/blog/main.py +++ b/blog/main.py @@ -11,69 +11,77 @@ args = None -BASE_URL = os.getenv('STRAPI_URL', "") -API_KEY = os.getenv('STRAPI_API_KEY', "") +BASE_URL = os.getenv("STRAPI_URL", "") +API_KEY = os.getenv("STRAPI_API_KEY", "") paths_to_search = [] existing_filepaths_discovered = {} -headers = { - 'Authorization': f'Bearer {API_KEY}', - 'Content-Type': 'application/json' -} +headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} + def arg_parse(): global args parser = argparse.ArgumentParser(description="VectorHub Strapi Upload") - parser.add_argument('--directories', help='Path to json which describes the directories to parse') + parser.add_argument( + "--directories", help="Path to json which describes the directories to parse" + ) args = parser.parse_args() + def load_items_from_json(directories: str) -> list: if os.path.exists(directories): - items = [] try: - with open(directories, 'r') as file: + with open(directories, "r") as file: data = json.load(file) - for item_data in data: - items.append(Item.from_dict(item_data)) - except JSONDecodeError as e: - print('JSON Structure is invalid.') + return [Item.from_dict(item_data) for item_data in data] + except JSONDecodeError: + print("❌ Invalid JSON structure.") exit(1) except Exception as e: - print('Unknown error occured.') + print("❌ Unknown error while reading directory JSON:") print(e) exit(1) - return items else: print(f"{directories} does not exist.") exit(1) def load_existing_blogs(page_num=1): + """Loads all blogs currently in Strapi.""" global existing_filepaths_discovered - base_url = urljoin(BASE_URL, 'api/blogs') - search_url = base_url + f"?pagination[page]={page_num}&publicationState=preview" - session = requests.Session() + base_url = urljoin(BASE_URL, "api/blogs") + search_url = f"{base_url}?pagination[page]={page_num}&pagination[pageSize]=100" + session = requests.Session() response = session.get(search_url, headers=headers) + if response.status_code == 200: - data = json.loads(response.text)['data'] - if len(data) > 0: - for item in data: - existing_filepaths_discovered[item['attributes']['filepath']] = {'discovered': False, 'id': item['id']} - load_existing_blogs(page_num+1) + data = response.json().get("data", []) + if not data: + return + for item in data: + filepath = item.get("filepath") + if filepath: + existing_filepaths_discovered[filepath] = { + "discovered": False, + "id": item["id"], + } + load_existing_blogs(page_num + 1) + else: + print(f"⚠️ Failed to load blogs: {response.status_code} {response.text}") def fetch_paths(node: Item, current_path=""): + """Recursively collect directories containing blogs.""" global paths_to_search - # Update the current path with dthe node's path + current_path = f"{current_path}/{node.path}" if current_path else node.path - # If the node has children, recurse on each child if node.has_blogs: paths_to_search.append(current_path) - if node.children and len(node.children) > 0: + if node.children: for child in node.children: fetch_paths(child, current_path) @@ -85,78 +93,91 @@ def find_files_to_upload(items: list): fetch_paths(item) files = [] + extension = "md" - extension = 'md' - for path in paths_to_search: folder_path = Path(path) - folder_files = folder_path.glob(f"*.{extension}") - for file in folder_files: - if 'readme.md' not in str(file).lower(): - files.append({ - 'path': str(file), - 'time': datetime.fromtimestamp(os.path.getmtime(file)).strftime("%Y-%m-%d") - }) - + for file in folder_path.glob(f"*.{extension}"): + if "readme.md" not in str(file).lower(): + files.append( + { + "path": str(file), + "time": datetime.fromtimestamp(os.path.getmtime(file)).strftime( + "%Y-%m-%d" + ), + } + ) return files def build_blog_object(file_obj: dict) -> StrapiBlog: - filepath = file_obj['path'] - with open(filepath, 'r') as file: + filepath = file_obj["path"] + with open(filepath, "r") as file: content = file.read() - blog = StrapiBlog(content, filepath, file_obj['time']) - return blog + return StrapiBlog(content, filepath, file_obj["time"]) + def upload_blog(blog: StrapiBlog): - base_url = urljoin(BASE_URL, 'api/blogs') + """Uploads or updates a blog to Strapi v5.""" + base_url = urljoin(BASE_URL, "api/blogs") filepath = blog.get_filepath() - search_url = base_url + f"?filters[filepath][$eqi]={filepath}&publicationState=preview" + search_url = f"{base_url}?filters[filepath][$eqi]={filepath}" + session = requests.Session() if filepath in existing_filepaths_discovered: - existing_filepaths_discovered[filepath]['discovered'] = True + existing_filepaths_discovered[filepath]["discovered"] = True response = session.get(search_url, headers=headers) + if response.status_code != 200: + print(f"❌ Error fetching blog {filepath}: {response.text}") + return + + existing = response.json().get("data", []) + print(f"📤 Uploading filepath: {filepath}") + + if existing: + # Blog already exists + blog_id = existing[0]["documentId"] + blog.set_slug_url(existing[0].get("slug_url")) + blog.set_published_at(existing[0].get("publishedAt")) + meta_desc = existing[0].get("meta_desc") + if meta_desc: + blog.meta_desc = meta_desc + else: + blog.meta_desc = blog.title - if response.status_code == 200: - responses = json.loads(response.text)['data'] - print(f'Uploading filepath: {blog.get_filepath()}') - if len(responses) > 0: - # Blog already exists at this filepath - id = json.loads(response.text)['data'][0]['id'] - - blog.set_slug_url(json.loads(response.text)['data'][0]['attributes']['slug_url']) - blog.set_published_at(json.loads(response.text)['data'][0]['attributes']['publishedAt']) + url = f"{base_url}/{blog_id}" + create_response = session.put( + url, headers=headers, data=json.dumps(blog.get_post_json()) + ) + else: + # New blog + create_response = session.post( + base_url, headers=headers, data=json.dumps(blog.get_post_json()) + ) - url = f"{base_url}/{id}" - create_response = session.put(url, headers=headers, data=json.dumps(blog.get_post_json())) - else: - # Its a new blog - url = base_url - create_response = session.post(url, headers=headers, data=json.dumps(blog.get_post_json())) + if create_response.status_code not in (200, 201): + print(f"❌ Failed to upload blog: {filepath}", create_response.text) + exit(1) - if not create_response.status_code == 200: - print(f'Error in parsing blog: {filepath}') - print(create_response.text) - exit(1) def delete_old_blogs(): - global existing_filepaths_discovered, BASE_URL + """Deletes blogs that were not re-uploaded.""" + global existing_filepaths_discovered - base_url = urljoin(BASE_URL, 'api/blogs') + base_url = urljoin(BASE_URL, "api/blogs") session = requests.Session() - for filepath in existing_filepaths_discovered: - if not existing_filepaths_discovered[filepath]['discovered']: - print(f"Deleting filepath: {filepath}") - id = existing_filepaths_discovered[filepath]['id'] - if id > 0: - url = f"{base_url}/{id}" + for filepath, info in existing_filepaths_discovered.items(): + if not info["discovered"]: + print(f"🗑️ Deleting filepath: {filepath}") + blog_id = info["id"] + if blog_id: + url = f"{base_url}/{blog_id}" response = session.delete(url, headers=headers) - if response.status_code != 200: - print(f'Error in deleting blog: {filepath}') - print(response.text) + if response.status_code not in (200, 204): + print(f"⚠️ Error deleting {filepath}: {response.text}") if __name__ == "__main__": @@ -167,10 +188,10 @@ def delete_old_blogs(): files = find_files_to_upload(items) - print('Uploading blogs') + print("📦 Uploading blogs...") for file in tqdm(files): blog = build_blog_object(file) upload_blog(blog) - print('Deleting blogs') + print("🧹 Cleaning up deleted blogs...") delete_old_blogs()