Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions app/auth/user_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from fastapi import APIRouter, Depends, HTTPException, status
from .firebase_auth import firebase_auth
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The firebase_auth import is unused in this file and can be removed to improve code cleanliness.

from .dependencies import get_current_user, require_admin
from typing import Dict, Any, List
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To enable strong typing for the action field in BulkUserRequest, Literal should be imported from the typing module.

Suggested change
from typing import Dict, Any, List
from typing import Dict, Any, List, Literal

from pydantic import BaseModel, EmailStr
from firebase_admin import auth

router = APIRouter(prefix="/users", tags=["user-management"])


class UserUpdateRequest(BaseModel):
email: EmailStr
first_name: str
last_name: str


class BulkUserRequest(BaseModel):
user_ids: List[str]
action: str
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a plain string for the action field is prone to errors. By using Literal['disable', 'delete'], you make the API contract explicit and leverage FastAPI's built-in validation to ensure only valid actions are submitted.

Suggested change
action: str
action: Literal["disable", "delete"]

Comment on lines +17 to +19
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate the action field to prevent unintended operations.

The action field accepts any string value without validation, which is a security risk. Only "disable" and "delete" are handled in the code, but invalid actions would silently succeed without performing any operation.

🔎 Proposed fix using Literal for action validation
+from typing import Dict, Any, List, Literal
-from typing import Dict, Any, List
 
 class BulkUserRequest(BaseModel):
     user_ids: List[str]
-    action: str
+    action: Literal["disable", "delete"]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class BulkUserRequest(BaseModel):
user_ids: List[str]
action: str
from typing import Dict, Any, List, Literal
class BulkUserRequest(BaseModel):
user_ids: List[str]
action: Literal["disable", "delete"]
🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 17 to 19, the BulkUserRequest.model
currently allows any string for action which can silently accept unsupported
operations; change the type of action to a constrained type (e.g.,
typing.Literal["disable","delete"] or a small Enum) so Pydantic will validate
input and reject invalid actions, update the import (from typing import Literal
or add Enum) and adjust any callers/tests if needed to use the new typed values;
ensure validation errors are surfaced to callers rather than allowing no-op
behavior.



@router.get("/profile")
async def get_user_profile(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Get detailed user profile information"""
try:
user_record = auth.get_user(current_user["id"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current_user dictionary returned by the get_current_user dependency uses 'uid' as the key for the user ID, not 'id'. Accessing current_user["id"] will result in a KeyError and cause the endpoint to fail.

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The current user dict coming from the authentication dependency exposes a uid field (not id), so indexing current_user["id"] will raise a KeyError at runtime and prevent the profile from being fetched; use current_user["uid"] instead to look up the Firebase user. [logic error]

Severity Level: Minor ⚠️

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])
Why it matters? ⭐

The dependency get_current_user returns the dict produced by firebase_auth.verify_token, which contains "uid" (see app/auth/firebase_auth.py: lines returning {"uid": ...}) and does not provide an "id" key. Indexing current_user["id"] will raise a KeyError at runtime. Changing to current_user["uid"] fixes a real runtime bug.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 26:26
**Comment:**
	*Logic Error: The current user dict coming from the authentication dependency exposes a `uid` field (not `id`), so indexing `current_user["id"]` will raise a KeyError at runtime and prevent the profile from being fetched; use `current_user["uid"]` instead to look up the Firebase user.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
Comment on lines +31 to +32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing custom claims directly with [] can cause a KeyError if a claim is not set for a user. It's safer to use the .get() method to avoid this and provide a graceful fallback (it will return null in the JSON if the key is missing).

Suggested change
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"first_name": custom_claims.get("first_name"),
"last_name": custom_claims.get("last_name"),

Comment on lines +31 to +32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Custom Claims Null Safety

Direct dictionary access on custom_claims without null checking causes KeyError when claims are empty or missing. Service crashes with 500 error when users lack first_name/last_name claims, making authentication service unreliable for users without complete profile data.

            "first_name": custom_claims.get("first_name", "") if custom_claims else "",
            "last_name": custom_claims.get("last_name", "") if custom_claims else "",
Commitable Suggestion
Suggested change
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"first_name": custom_claims.get("first_name", "") if custom_claims else "",
"last_name": custom_claims.get("last_name", "") if custom_claims else "",
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling
Context References
  • get_user_profile function - no defensive checks for missing custom claims in app/auth/user_management.py

Comment on lines +27 to +32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: auth.get_custom_user_claims can return None or omit first_name/last_name for some users, and indexing like custom_claims["first_name"] will raise a KeyError; normalizing to an empty dict and using .get(..., "") avoids runtime failures for users without these claims. [possible bug]

Severity Level: Critical 🚨

Suggested change
custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
custom_claims = auth.get_custom_user_claims(user_record.uid) or {}
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name", ""),
"last_name": custom_claims.get("last_name", ""),
Why it matters? ⭐

firebase_admin.auth.get_custom_user_claims can return None or omit keys. The current code indexes custom_claims["first_name"] / ["last_name"] directly, which will raise a KeyError (or TypeError if custom_claims is None). Normalizing to {} and using .get(..., "") prevents runtime errors for users without those claims. This is a defensive fix that prevents 500s for legitimate users.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 27:32
**Comment:**
	*Possible Bug: `auth.get_custom_user_claims` can return `None` or omit `first_name`/`last_name` for some users, and indexing like `custom_claims["first_name"]` will raise a KeyError; normalizing to an empty dict and using `.get(..., "")` avoids runtime failures for users without these claims.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
Comment on lines +25 to +35
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix incorrect Firebase Admin SDK usage and handle missing custom claims.

Two critical issues:

  1. Line 27: auth.get_custom_user_claims() does not exist in the Firebase Admin SDK. Custom claims are accessed via user_record.custom_claims.

  2. Lines 31-32: Direct dictionary access will raise KeyError if custom claims are None or if first_name/last_name keys don't exist.

🔎 Proposed fix
     try:
         user_record = auth.get_user(current_user["id"])
-        custom_claims = auth.get_custom_user_claims(user_record.uid)
+        custom_claims = user_record.custom_claims or {}
         return {
             "id": user_record.uid,
             "email": user_record.email,
-            "first_name": custom_claims["first_name"],
-            "last_name": custom_claims["last_name"],
+            "first_name": custom_claims.get("first_name"),
+            "last_name": custom_claims.get("last_name"),
             "is_active": not user_record.disabled,
             "created_at": str(user_record.user_metadata.creation_timestamp)
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
user_record = auth.get_user(current_user["id"])
custom_claims = auth.get_custom_user_claims(user_record.uid)
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims["first_name"],
"last_name": custom_claims["last_name"],
"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
try:
user_record = auth.get_user(current_user["id"])
custom_claims = user_record.custom_claims or {}
return {
"id": user_record.uid,
"email": user_record.email,
"first_name": custom_claims.get("first_name"),
"last_name": custom_claims.get("last_name"),
"is_active": not user_record.disabled,
"created_at": str(user_record.user_metadata.creation_timestamp)
}
🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 25 to 35, replace the incorrect call
to auth.get_custom_user_claims(...) with reading user_record.custom_claims and
guard against None or missing keys; after fetching user_record =
auth.get_user(current_user["id"]), read claims = user_record.custom_claims or {}
and use claims.get("first_name") and claims.get("last_name") (or provide safe
defaults like None or empty string) when building the return dict, so the code
never raises KeyError if custom claims are absent.

except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
Comment on lines +36 to +37
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a generic Exception is too broad and can hide underlying issues. It's better to catch specific exceptions, like auth.UserNotFoundError. Additionally, exposing raw exception messages via str(e) can be a security risk. Provide a generic error message for unexpected server errors.

Suggested change
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
except auth.UserNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
except Exception:
# It's good practice to log the actual error here for debugging purposes.
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred.")



@router.put("/profile")
async def update_user_profile(
update_data: UserUpdateRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Update current user's profile information"""
try:
user_record = auth.get_user(current_user["id"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Similar to the profile retrieval endpoint, the current_user dictionary uses 'uid' as the key for the user ID, not 'id'. This will cause a KeyError.

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: As with the profile GET handler, the current user data produced by the auth dependency contains a uid field rather than id, so using current_user["id"] here will raise a KeyError and break profile updates; it should reference current_user["uid"] instead. [logic error]

Severity Level: Minor ⚠️

Suggested change
user_record = auth.get_user(current_user["id"])
user_record = auth.get_user(current_user["uid"])
Why it matters? ⭐

Same root cause as suggestion 1: get_current_user returns a dict with "uid" (from verify_token) not "id". The update_user_profile function uses current_user["id"] and will raise KeyError. Switching to current_user["uid"] resolves a real logic/runtime error.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 47:47
**Comment:**
	*Logic Error: As with the profile GET handler, the current user data produced by the auth dependency contains a `uid` field rather than `id`, so using `current_user["id"]` here will raise a KeyError and break profile updates; it should reference `current_user["uid"]` instead.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

if update_data.email != user_record.email:
auth.update_user(user_record.uid, email=update_data.email)
auth.set_custom_user_claims(user_record.uid, {
"first_name": update_data.first_name,
"last_name": update_data.last_name,
"role": current_user.get("role", "user")
})
Comment on lines +50 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom Claims Data Loss

Complete custom claims replacement overwrites existing claims beyond first_name, last_name, and role. User loses additional custom claims causing feature degradation. Authorization and personalization data permanently lost during profile updates.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Postconditions
Context References
  • update_user_profile function - set_custom_user_claims replaces all claims in app/auth/user_management.py

Comment on lines +46 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Risk of overwriting existing custom claims.

The set_custom_user_claims call replaces all custom claims with only first_name, last_name, and role. If other custom claims exist (e.g., permissions, metadata), they will be lost.

🔎 Proposed fix to preserve existing claims
     try:
         user_record = auth.get_user(current_user["id"])
         if update_data.email != user_record.email:
             auth.update_user(user_record.uid, email=update_data.email)
+        
+        # Preserve existing custom claims
+        existing_claims = user_record.custom_claims or {}
+        updated_claims = {
+            **existing_claims,
+            "first_name": update_data.first_name,
+            "last_name": update_data.last_name,
+            "role": current_user.get("role", "user")
+        }
-        auth.set_custom_user_claims(user_record.uid, {
-            "first_name": update_data.first_name,
-            "last_name": update_data.last_name,
-            "role": current_user.get("role", "user")
-        })
+        auth.set_custom_user_claims(user_record.uid, updated_claims)
         return {"message": "Profile updated successfully"}
🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 46 to 54, the call to
set_custom_user_claims unconditionally overwrites all existing custom claims;
instead, fetch the current custom claims from user_record (or auth.get_user(...)
if not present), merge them with the new keys ("first_name", "last_name",
"role") so existing keys like "permissions" or "metadata" are preserved, and
then call set_custom_user_claims with the merged dictionary; ensure you handle
cases where user_record.custom_claims is None and validate/normalize types
before setting.

return {"message": "Profile updated successfully"}
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
Comment on lines +56 to +57
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The exception handling is too broad. Catching specific exceptions like auth.UserNotFoundError would provide more accurate error responses. Also, returning HTTP_400_BAD_REQUEST for any server-side failure might be misleading; a 5xx error would be more appropriate for internal errors. Avoid exposing raw exception details.

Suggested change
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except auth.UserNotFoundError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
except Exception:
# It's good practice to log the actual error here for debugging purposes.
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to update profile.")



@router.post("/bulk-action")
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Perform bulk actions on multiple users (admin only)"""
if current_user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
Comment on lines +61 to +67
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This endpoint performs a manual authorization check for the admin role. It's better to use the require_admin dependency, which is already imported. This approach centralizes authorization logic, makes the endpoint's intent clearer, and is more idiomatic for FastAPI.

Suggested change
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Perform bulk actions on multiple users (admin only)"""
if current_user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
async def bulk_user_action(
request: BulkUserRequest,
_: Dict[str, Any] = Depends(require_admin)
):
"""Perform bulk actions on multiple users (admin only)"""

Comment on lines +60 to +67
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Authorization Race Condition

Authorization check occurs after function execution begins rather than using dependency injection. This creates a race condition where unauthorized users can trigger function execution before authorization validation, potentially allowing timing attacks or resource consumption before being blocked.

@router.post("/bulk-action")
async def bulk_user_action(
    request: BulkUserRequest,
    current_user: Dict[str, Any] = Depends(require_admin)
):
    """Perform bulk actions on multiple users (admin only)"""
Commitable Suggestion
Suggested change
@router.post("/bulk-action")
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Perform bulk actions on multiple users (admin only)"""
if current_user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
@router.post("/bulk-action")
async def bulk_user_action(
request: BulkUserRequest,
current_user: Dict[str, Any] = Depends(require_admin)
):
"""Perform bulk actions on multiple users (admin only)"""
Standards
  • CWE-862
  • OWASP-A01
  • NIST-SSDF-PW.1
Context References
  • require_admin dependency exists but not used - should replace inline check in app/auth/dependencies.py

Comment on lines +60 to +67
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use the require_admin dependency instead of manual role check.

The require_admin dependency is already imported but not used. Replace the manual role check with the dependency for consistency and better separation of concerns.

🔎 Proposed refactor
 @router.post("/bulk-action")
 async def bulk_user_action(
     request: BulkUserRequest,
-    current_user: Dict[str, Any] = Depends(get_current_user)
+    current_user: Dict[str, Any] = Depends(require_admin)
 ):
     """Perform bulk actions on multiple users (admin only)"""
-    if current_user.get("role") != "admin":
-        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
     results = []
🧰 Tools
🪛 Ruff (0.14.10)

63-63: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 60 to 67, replace the manual admin
role check with the existing require_admin dependency: remove the current_user
role-if-check and change the route signature to inject current_user via
Depends(require_admin) so the dependency enforces admin access; ensure the
function parameter uses the correct type and name (e.g., current_user: Dict[str,
Any] = Depends(require_admin)) and remove the now-unneeded HTTPException branch.

results = []
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The bulk action endpoint silently accepts any action string and just returns an empty results list when the action is unsupported, which is misleading and hides misuse of the API; it should explicitly reject invalid actions with a 400 error before processing. [logic error]

Severity Level: Minor ⚠️

Suggested change
results = []
if request.action not in {"disable", "delete"}:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported bulk action"
)
Why it matters? ⭐

As written, unsupported actions are silently ignored and callers get an empty/partial results list which is misleading. Validating request.action early and returning HTTP 400 for unsupported actions is a correctness improvement and prevents clients from thinking the call succeeded when it did nothing.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** app/auth/user_management.py
**Line:** 68:68
**Comment:**
	*Logic Error: The bulk action endpoint silently accepts any `action` string and just returns an empty `results` list when the action is unsupported, which is misleading and hides misuse of the API; it should explicitly reject invalid actions with a 400 error before processing.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

for user_id in request.user_ids:
try:
if request.action == "disable":
auth.update_user(user_id, disabled=True)
results.append({"user_id": user_id, "status": "disabled"})
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
Comment on lines +71 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Action Type Validation

Bulk action logic only handles 'disable' and 'delete' actions but lacks validation for invalid action types. Unrecognized actions silently skip processing without error indication, creating incomplete business rule enforcement.

Standards
  • Business-Rule-Input-Validation
  • Logic-Verification-Complete-Coverage

except Exception as e:
results.append({"user_id": user_id, "status": "error", "message": str(e)})
Comment on lines +77 to +78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The exception handling here is too broad. Catching specific exceptions like auth.UserNotFoundError will provide more meaningful error messages in the bulk operation results. Also, avoid exposing raw exception details in the response.

Suggested change
except Exception as e:
results.append({"user_id": user_id, "status": "error", "message": str(e)})
except auth.UserNotFoundError:
results.append({"user_id": user_id, "status": "error", "message": "User not found"})
except Exception:
results.append({"user_id": user_id, "status": "error", "message": "An unexpected error occurred."})

Comment on lines +69 to +78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequential User Processing Bottleneck

Sequential Firebase auth operations in loop create O(n) blocking I/O pattern. Each user operation waits for Firebase API response before processing next user. Bulk operations with hundreds of users will cause significant response delays and poor scalability.

    import asyncio
    
    async def process_user(user_id: str, action: str):
        try:
            if action == "disable":
                auth.update_user(user_id, disabled=True)
                return {"user_id": user_id, "status": "disabled"}
            elif action == "delete":
                auth.delete_user(user_id)
                return {"user_id": user_id, "status": "deleted"}
        except Exception as e:
            return {"user_id": user_id, "status": "error", "message": str(e)}
    
    tasks = [process_user(user_id, request.action) for user_id in request.user_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)
Commitable Suggestion
Suggested change
for user_id in request.user_ids:
try:
if request.action == "disable":
auth.update_user(user_id, disabled=True)
results.append({"user_id": user_id, "status": "disabled"})
elif request.action == "delete":
auth.delete_user(user_id)
results.append({"user_id": user_id, "status": "deleted"})
except Exception as e:
results.append({"user_id": user_id, "status": "error", "message": str(e)})
import asyncio
async def process_user(user_id: str, action: str):
try:
if action == "disable":
auth.update_user(user_id, disabled=True)
return {"user_id": user_id, "status": "disabled"}
elif action == "delete":
auth.delete_user(user_id)
return {"user_id": user_id, "status": "deleted"}
except Exception as e:
return {"user_id": user_id, "status": "error", "message": str(e)}
tasks = [process_user(user_id, request.action) for user_id in request.user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Async-Processing
  • Algorithmic-Complexity-Linear-Optimization

return {"results": results}
Comment on lines +68 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add safeguards for bulk operations.

Several concerns with the current implementation:

  1. No limit on bulk size: Processing unlimited users could cause timeouts or resource exhaustion.
  2. User deletion is permanent: No confirmation, soft-delete option, or audit trail for this destructive operation.
  3. Bare exception catching: Loses error context for debugging.
🔎 Proposed improvements

Add a limit check at the beginning:

    # Add after the docstring
    MAX_BULK_SIZE = 100
    if len(request.user_ids) > MAX_BULK_SIZE:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, 
            detail=f"Cannot process more than {MAX_BULK_SIZE} users at once"
        )

Consider more specific exception handling in the loop:

         try:
             if request.action == "disable":
                 auth.update_user(user_id, disabled=True)
                 results.append({"user_id": user_id, "status": "disabled"})
             elif request.action == "delete":
                 auth.delete_user(user_id)
                 results.append({"user_id": user_id, "status": "deleted"})
+        except auth.UserNotFoundError as e:
+            results.append({"user_id": user_id, "status": "error", "message": "User not found"})
         except Exception as e:
             results.append({"user_id": user_id, "status": "error", "message": str(e)})

For the deletion operation, consider implementing an audit log or requiring additional confirmation.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.10)

77-77: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In app/auth/user_management.py around lines 68 to 79, the bulk user operation
lacks safeguards: add a MAX_BULK_SIZE constant (e.g., 100) and check
len(request.user_ids) immediately to raise HTTPException 400 when exceeded;
replace the broad try/except with more specific exception handling (catch
auth-specific exceptions and fallback to Exception only to log full traceback)
and ensure errors are logged with context (user_id and exception) rather than
just returning str(e); change delete behavior to a safer approach by either
implementing soft-delete (marking a deleted flag), requiring an explicit
confirmation flag on the request for permanent deletes, and record audit entries
for deletes (user_id, actor, action, timestamp) before performing deletion;
ensure the response still collects per-user results but that destructive actions
enforce confirmation/audit and that failures are logged with full error context.