diff --git a/cogs/utility.py b/cogs/utility.py index d14aa97baa..0f9230f9a5 100644 --- a/cogs/utility.py +++ b/cogs/utility.py @@ -1382,7 +1382,7 @@ def _parse_level(name): @permissions.command(name="override") @checks.has_permissions(PermissionLevel.OWNER) - async def permissions_override(self, ctx, command_name: str.lower, *, level_name: str): + async def permissions_override(self, ctx, command_name: str.lower, *, level_name: str = None): """ Change a permission level for a specific command. @@ -1396,8 +1396,16 @@ async def permissions_override(self, ctx, command_name: str.lower, *, level_name - `{prefix}perms remove override reply` - `{prefix}perms remove override plugin enabled` + You can also override multiple commands at once using: + - `{prefix}perms override bulk` + You can retrieve a single or all command level override(s), see`{prefix}help permissions get`. """ + if command_name == "bulk": + return await self._bulk_override_flow(ctx) + + if level_name is None: + raise commands.MissingRequiredArgument(DummyParam("level_name")) command = self.bot.get_command(command_name) if command is None: @@ -1432,6 +1440,188 @@ async def permissions_override(self, ctx, command_name: str.lower, *, level_name ) return await ctx.send(embed=embed) + async def _bulk_override_flow(self, ctx): + embed = discord.Embed( + title="Bulk Override", + description=( + "Please list the commands you want to override. " + "You can list multiple commands separated by spaces or newlines.\n" + "Example: `reply, block, unblock`." + ), + color=self.bot.main_color, + ) + await ctx.send(embed=embed) + + try: + msg = await self.bot.wait_for( + "message", + check=lambda m: m.author == ctx.author and m.channel == ctx.channel, + timeout=120.0, + ) + + except asyncio.TimeoutError: + return await ctx.send( + embed=discord.Embed(title="Error", description="Timed out.", color=self.bot.error_color) + ) + + raw_commands = msg.content.replace(",", " ").replace("\n", " ").split(" ") + # Filter empty strings from split + raw_commands = [c for c in raw_commands if c.strip()] + + if self.bot.prefix: + # Strip prefix from commands if present + raw_commands = [ + c[len(self.bot.prefix) :] if c.startswith(self.bot.prefix) else c for c in raw_commands + ] + + found_commands = [] + invalid_commands = [] + + for cmd_name in raw_commands: + cmd = self.bot.get_command(cmd_name) + if cmd: + found_commands.append(cmd) + else: + invalid_commands.append(cmd_name) + + if invalid_commands: + embed = discord.Embed( + title="Invalid Commands Found", + description=f"The following commands were not found:\n`{', '.join(invalid_commands)}`\n\n" + "Do you want to continue with the valid commands? (y/n)", + color=self.bot.error_color, + ) + await ctx.send(embed=embed) + try: + msg = await self.bot.wait_for( + "message", + check=lambda m: m.author == ctx.author and m.channel == ctx.channel, + timeout=60.0, + ) + if msg.content.lower() not in ("y", "yes"): + return await ctx.send( + embed=discord.Embed( + title="Operation Aborted", + description="No changes have been applied.", + color=self.bot.error_color, + ) + ) + except asyncio.TimeoutError: + return await ctx.send( + embed=discord.Embed(title="Error", description="Timed out.", color=self.bot.error_color) + ) + + if not found_commands: + return await ctx.send( + embed=discord.Embed( + title="Error", + description="No valid commands provided. Aborting.", + color=self.bot.error_color, + ) + ) + + # Expand subcommands + final_commands = set() + + def add_command_recursive(cmd): + final_commands.add(cmd) + if hasattr(cmd, "commands"): + for sub in cmd.commands: + add_command_recursive(sub) + + for cmd in found_commands: + add_command_recursive(cmd) + + embed = discord.Embed( + title="Select Permission Level", + description=( + f"Found {len(final_commands)} commands (including subcommands).\n" + "What permission level should these commands be set to? (e.g. `Owner`, `Admin`, `Moderator`, `Supporter`, `User`)" + ), + color=self.bot.main_color, + ) + await ctx.send(embed=embed) + + try: + msg = await self.bot.wait_for( + "message", + check=lambda m: m.author == ctx.author and m.channel == ctx.channel, + timeout=60.0, + ) + + except asyncio.TimeoutError: + return await ctx.send( + embed=discord.Embed(title="Error", description="Timed out.", color=self.bot.error_color) + ) + + level_name = msg.content + level = self._parse_level(level_name) + if level == PermissionLevel.INVALID: + return await ctx.send( + embed=discord.Embed( + title="Error", + description=f"Invalid permission level: `{level_name}`. Aborting.", + color=self.bot.error_color, + ) + ) + + # Confirmation + command_list_str = ", ".join( + f"`{c.qualified_name}`" for c in sorted(final_commands, key=lambda x: x.qualified_name) + ) + + # Truncate if too long for embed description + if len(command_list_str) > 2000: + command_list_str = command_list_str[:1997] + "..." + + embed = discord.Embed( + title="Confirm Bulk Override", + description=f"**Level:** {level.name}\n\n**Commands:**\n{command_list_str}\n\n" + "Type `confirm` to apply these changes or `cancel` to abort.", + color=self.bot.main_color, + ) + await ctx.send(embed=embed) + + try: + msg = await self.bot.wait_for( + "message", + check=lambda m: m.author == ctx.author + and m.channel == ctx.channel + and m.content.lower() in ("confirm", "cancel"), + timeout=30.0, + ) + except asyncio.TimeoutError: + return await ctx.send( + embed=discord.Embed( + title="Error", description="Timed out. No changes applied.", color=self.bot.error_color + ) + ) + + if msg.content.lower() == "cancel": + return await ctx.send( + embed=discord.Embed( + title="Operation Aborted", + description="No changes have been applied.", + color=self.bot.error_color, + ) + ) + + # Apply changes + count = 0 + for cmd in final_commands: + self.bot.config["override_command_level"][cmd.qualified_name] = level.name + count += 1 + + await self.bot.config.update() + + await ctx.send( + embed=discord.Embed( + title="Success", + description=f"Successfully updated permissions for {count} commands.", + color=self.bot.main_color, + ) + ) + @permissions.command(name="add", usage="[command/level] [name] [user/role]") @checks.has_permissions(PermissionLevel.OWNER) async def permissions_add(