add moderator command
This commit is contained in:
@@ -1,11 +1,10 @@
|
||||
from aiogram import Bot, Router
|
||||
from aiogram.enums import ChatMemberStatus
|
||||
from aiogram import Router, Bot
|
||||
from aiogram.filters import ChatMemberUpdatedFilter, JOIN_TRANSITION
|
||||
from aiogram.filters import JOIN_TRANSITION, ChatMemberUpdatedFilter
|
||||
from aiogram.types import ChatMemberUpdated
|
||||
|
||||
from bot.utils.db import UseDB
|
||||
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,254 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from aiogram import Router, exceptions, types
|
||||
from aiogram.filters import Command
|
||||
|
||||
from bot.utils.logger import get_logger, log_event
|
||||
|
||||
logger = get_logger(__name__)
|
||||
router = Router()
|
||||
|
||||
|
||||
async def is_admin(message: types.Message) -> bool:
|
||||
return (
|
||||
(await message.chat.get_member(message.from_user.id)) in
|
||||
(await message.chat.get_administrators())
|
||||
)
|
||||
|
||||
|
||||
@router.message(Command("mute"))
|
||||
async def mute_user(message: types.Message):
|
||||
"""
|
||||
Мутит пользователя на указанное количество минут.
|
||||
|
||||
:param message: Объект сообщения Telegram.
|
||||
"""
|
||||
try:
|
||||
if not message.reply_to_message:
|
||||
await message.reply(
|
||||
"Эта команда должна быть ответом на сообщение пользователя."
|
||||
)
|
||||
return
|
||||
|
||||
if not await is_admin(message):
|
||||
await message.reply("❌ У вас нет прав для мута участников.")
|
||||
logger.warning(
|
||||
f"Пользователь {message.from_user.mention} попытался "
|
||||
f"использовать команду mute без прав."
|
||||
)
|
||||
return
|
||||
|
||||
args = message.text.split()
|
||||
if len(args) < 1:
|
||||
await message.reply("Укажите длительность мута в минутах.")
|
||||
return
|
||||
|
||||
try:
|
||||
duration = int(args[1])
|
||||
except ValueError or IndexError:
|
||||
await message.reply("Длительность должна быть числом.")
|
||||
return
|
||||
|
||||
reason = " ".join(args[2:]) or "Причина не указана"
|
||||
until_date = timedelta(minutes=duration)
|
||||
|
||||
try:
|
||||
await message.bot.restrict_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=message.reply_to_message.from_user.id,
|
||||
permissions=types.ChatPermissions(can_send_messages=False),
|
||||
until_date=until_date
|
||||
)
|
||||
except exceptions.TelegramAPIError:
|
||||
await message.reply("❌ У меня нет прав для выполнения этой команды.")
|
||||
return
|
||||
|
||||
await message.answer(
|
||||
f"🔇 Пользователь {message.reply_to_message.from_user.first_name} был "
|
||||
f"замучен на **{duration} минут**.\n**Причина:** {reason}"
|
||||
)
|
||||
logger.info(
|
||||
f"Пользователь {message.reply_to_message.from_user.first_name} замучен "
|
||||
f"модератором {message.from_user.first_name}."
|
||||
)
|
||||
await log_event(
|
||||
bot=message.bot,
|
||||
chat=message.chat,
|
||||
user=message.reply_to_message.from_user,
|
||||
reason=reason,
|
||||
performed_by=message.from_user,
|
||||
event_description="Пользователь замучен"
|
||||
)
|
||||
await message.delete()
|
||||
|
||||
except Exception as e:
|
||||
await message.reply(f"❌ Произошла ошибка: {e}")
|
||||
logger.error(f"❌ Произошла ошибка: {e}")
|
||||
|
||||
|
||||
@router.message(Command("unmute"))
|
||||
async def unmute_user(message: types.Message):
|
||||
"""
|
||||
Снимает мут с пользователя.
|
||||
|
||||
:param message: Объект сообщения Telegram.
|
||||
"""
|
||||
try:
|
||||
if not message.reply_to_message:
|
||||
await message.reply(
|
||||
"Эта команда должна быть ответом на сообщение пользователя."
|
||||
)
|
||||
return
|
||||
|
||||
if not await is_admin(message):
|
||||
await message.reply("❌ У вас нет прав для снятия мута с участников.")
|
||||
return
|
||||
|
||||
try:
|
||||
await message.bot.restrict_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=message.reply_to_message.from_user.id,
|
||||
permissions=types.ChatPermissions(can_send_messages=True)
|
||||
)
|
||||
except exceptions.TelegramAPIError:
|
||||
await message.reply("❌ У меня нет прав для выполнения этой команды.")
|
||||
return
|
||||
|
||||
await message.answer(
|
||||
f"✅ Тайм-аут успешно снят с пользователя "
|
||||
f"{message.reply_to_message.from_user.first_name}."
|
||||
)
|
||||
logger.info(
|
||||
f"Пользователь {message.reply_to_message.from_user.first_name} размучен "
|
||||
f"модератором {message.from_user.first_name}."
|
||||
)
|
||||
await log_event(
|
||||
bot=message.bot,
|
||||
chat=message.chat,
|
||||
user=message.reply_to_message.from_user,
|
||||
reason="Отсутствует",
|
||||
performed_by=message.from_user,
|
||||
event_description="Пользователь размучен"
|
||||
)
|
||||
await message.delete()
|
||||
|
||||
except Exception as e:
|
||||
await message.reply(f"❌ Произошла ошибка: {e}")
|
||||
logger.error(f"❌ Произошла ошибка: {e}")
|
||||
|
||||
|
||||
@router.message(Command("ban"))
|
||||
async def ban_user(message: types.Message):
|
||||
"""
|
||||
Банит пользователя.
|
||||
|
||||
:param message: Объект сообщения Telegram.
|
||||
"""
|
||||
try:
|
||||
if not message.reply_to_message:
|
||||
await message.reply(
|
||||
"Эта команда должна быть ответом на сообщение пользователя."
|
||||
)
|
||||
return
|
||||
|
||||
if not await is_admin(message):
|
||||
await message.reply("❌ У вас нет прав для бана участников.")
|
||||
logger.warning(
|
||||
f"Пользователь {message.from_user.mention} попытался "
|
||||
f"использовать команду ban без прав."
|
||||
)
|
||||
return
|
||||
|
||||
reason = message.text[1:] or "Причина не указана"
|
||||
|
||||
try:
|
||||
await message.bot.ban_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=message.reply_to_message.from_user.id
|
||||
)
|
||||
except exceptions.TelegramAPIError:
|
||||
await message.reply("❌ У меня нет прав для выполнения этой команды.")
|
||||
return
|
||||
|
||||
await message.answer(
|
||||
f"🔨 Пользователь {message.reply_to_message.from_user.first_name} был "
|
||||
f"заблокирован.\n**Причина:** {reason}"
|
||||
)
|
||||
logger.info(
|
||||
f"Пользователь {message.reply_to_message.from_user.first_name} забанен "
|
||||
f"модератором {message.from_user.first_name}."
|
||||
)
|
||||
await log_event(
|
||||
bot=message.bot,
|
||||
chat=message.chat,
|
||||
user=message.reply_to_message.from_user,
|
||||
reason=reason,
|
||||
performed_by=message.from_user,
|
||||
event_description="Пользователь забанен"
|
||||
)
|
||||
await message.delete()
|
||||
|
||||
except Exception as e:
|
||||
await message.reply(f"❌ Произошла ошибка: {e}")
|
||||
logger.error(f"❌ Произошла ошибка: {e}")
|
||||
|
||||
|
||||
@router.message(Command("unban"))
|
||||
async def unban_user(message: types.Message):
|
||||
"""
|
||||
Разбан пользователя.
|
||||
|
||||
:param message: Объект сообщения Telegram.
|
||||
"""
|
||||
try:
|
||||
args = message.text.split()
|
||||
if len(args) < 2:
|
||||
await message.reply("Укажите имя пользователя для разбана.")
|
||||
return
|
||||
|
||||
user_name = args[1]
|
||||
try:
|
||||
banned_users = await message.bot.get_chat_administrators(message.chat.id)
|
||||
except exceptions.TelegramAPIError:
|
||||
await message.reply("❌ У меня нет прав для выполнения этой команды.")
|
||||
return
|
||||
|
||||
for ban_entry in banned_users:
|
||||
if ban_entry.user.username == user_name:
|
||||
try:
|
||||
await message.bot.unban_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=ban_entry.user.id
|
||||
)
|
||||
except exceptions.TelegramAPIError:
|
||||
await message.reply(
|
||||
"❌ У меня нет прав для выполнения этой команды."
|
||||
)
|
||||
return
|
||||
|
||||
await message.answer(
|
||||
f"✅ Пользователь {ban_entry.user.first_name} успешно "
|
||||
f"разблокирован."
|
||||
)
|
||||
logger.info(
|
||||
f"Пользователь {ban_entry.user.first_name} разбанен "
|
||||
f"модератором {message.from_user.first_name}."
|
||||
)
|
||||
await log_event(
|
||||
bot=message.bot,
|
||||
chat=message.chat,
|
||||
user=ban_entry.user,
|
||||
reason="Отсутствует",
|
||||
performed_by=message.from_user,
|
||||
event_description="Пользователь разбанен"
|
||||
)
|
||||
await message.delete()
|
||||
return
|
||||
|
||||
await message.reply(
|
||||
"❌ Пользователь с таким именем не найден в списке банов."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
await message.reply(f"❌ Произошла ошибка: {e}")
|
||||
logger.error(f"❌ Произошла ошибка: {e}")
|
||||
Reference in New Issue
Block a user