add admin settings command

This commit is contained in:
2024-12-23 14:06:50 +07:00
parent a642b5de39
commit 993775eaad
2 changed files with 371 additions and 3 deletions
+365 -3
View File
@@ -1,11 +1,72 @@
from aiogram import Bot, Router
from aiogram.enums import ChatMemberStatus
from aiogram.filters import JOIN_TRANSITION, ChatMemberUpdatedFilter
from aiogram.types import ChatMemberUpdated
from aiogram.filters import (
JOIN_TRANSITION,
ChatMemberUpdatedFilter,
Command,
BaseFilter
)
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import StatesGroup, State
from aiogram.types import (
ChatMemberUpdated,
InlineKeyboardMarkup,
InlineKeyboardButton,
Message,
CallbackQuery
)
from bot.utils.db import UseDB
from bot.utils.logger import get_logger
router = Router()
logger = get_logger(__name__)
db = UseDB("telegram_bot")
class IgnoreState(StatesGroup):
waiting_for_user_id = State()
waiting_for_channel_id = State()
async def is_admin(message: Message) -> bool:
"""
Проверяет, является ли пользователь администратором в текущем чате.
:param message: Объект сообщения
:return: True, если пользователь администратор, иначе False
"""
return (
(await message.chat.get_member(message.from_user.id)) in
(await message.chat.get_administrators())
)
@router.callback_query()
async def callback_query_handler(callback_query: CallbackQuery, state: FSMContext):
"""
Главный обработчик всех callback_data.
:param callback_query: Объект обратного вызова
:param state: FSMContext - Контекст состояния
"""
data = callback_query.data
if data == "set_language":
await set_language_callback(callback_query)
elif data == "set_action":
await set_action_callback(callback_query)
elif data.startswith("set_action_"):
if "_delete" in data or "_mute" in data or "_ban" in data:
await save_action_callback(callback_query)
else:
await set_action_category_callback(callback_query)
elif data == "set_ignore":
await set_ignore_callback(callback_query)
elif data == "ignore_user":
await start_ignore_user(callback_query, state)
elif data == "ignore_channel":
await start_ignore_channel(callback_query, state)
@router.my_chat_member(ChatMemberUpdatedFilter(member_status_changed=JOIN_TRANSITION))
@@ -40,7 +101,6 @@ async def bot_added(event: ChatMemberUpdated, bot: Bot):
parse_mode="HTML"
)
db = UseDB("telegram_bot")
if not db.find_document({"chat_id": event.chat.id}):
db.insert_document(
{
@@ -72,3 +132,305 @@ async def bot_added(event: ChatMemberUpdated, bot: Bot):
"logs": []
}
)
@router.message(Command("settings"))
async def settings_handler(message: Message):
"""
Обработчик команды /settings.
Показывает текущие настройки и кнопки для редактирования.
:param message: Объект сообщения
"""
if not await is_admin(message):
await message.answer(
"⛔ У вас нет прав администратора для выполнения этой команды."
)
return
chat_id = str(message.chat.id)
settings = db.find_document({"chat_id": chat_id})[0] or {}
keyboard = InlineKeyboardMarkup(row_width=1, inline_keyboard=[
[
InlineKeyboardButton(
text="🌍 Переключить язык",
callback_data="set_language"
),
InlineKeyboardButton(
text="🚫 Настроить действия",
callback_data="set_action"
),
InlineKeyboardButton(
text="🙅 Игнорируемые объекты",
callback_data="set_ignore"
)
]
])
response = (
f"⚙️ <b>Настройки чата:</b>\n"
f"🌍 <b>Язык:</b> {settings.get('language', 'ru')}\n"
f"🚫 <b>Действия на спам:</b> {
settings.get('actions', {}).get('spam', 'Не настроено')
}\n"
f"💬 <b>Действия на маты:</b> {
settings.get('actions', {}).get('mat', 'Не настроено')
}\n"
f"🔞 <b>Действия на NSFW:</b> {
settings.get('actions', {}).get('nsfw', 'Не настроено')
}\n"
)
await message.answer(response, reply_markup=keyboard, parse_mode="HTML")
async def set_language_callback(callback_query: CallbackQuery):
"""
Переключение языка чата.
:param callback_query: Объект обратного вызова
"""
if not await is_admin(callback_query.message):
await callback_query.answer(
"⛔ Только администраторы могут переключать язык.",
show_alert=True
)
return
chat_id = str(callback_query.message.chat.id)
settings = db.find_document({"chat_id": chat_id})[0] or {}
current_language = settings.get("language", "ru")
new_language = "en" if current_language == "ru" else "ru"
db.update_document(
{"chat_id": chat_id},
{"language": new_language}
)
await callback_query.message.edit_text(
f"✨ Язык успешно переключён на {new_language.upper()}!",
reply_markup=None
)
async def set_action_callback(callback_query: CallbackQuery):
"""
Обработчик настройки действий.
:param callback_query: Объект обратного вызова
"""
if not await is_admin(callback_query.message):
await callback_query.answer(
"⛔ Только администраторы могут настраивать действия.",
show_alert=True
)
return
keyboard = InlineKeyboardMarkup(row_width=1, inline_keyboard=[
[
InlineKeyboardButton(
text="Настроить спам",
callback_data="set_action_spam"
),
InlineKeyboardButton(
text="Настроить маты",
callback_data="set_action_mat"
),
InlineKeyboardButton(
text="Настроить NSFW",
callback_data="set_action_nsfw"
)
]
])
await callback_query.message.edit_text(
"Выберите категорию для настройки:",
reply_markup=keyboard
)
async def set_ignore_callback(callback_query: CallbackQuery):
"""
Обработчик настройки игнорируемых объектов.
:param callback_query: Объект обратного вызова
"""
if not await is_admin(callback_query.message):
await callback_query.answer(
"⛔ Только администраторы могут настраивать игнорируемые объекты.",
show_alert=True
)
return
keyboard = InlineKeyboardMarkup(row_width=1, inline_keyboard=[
[
InlineKeyboardButton(
text="Добавить пользователя",
callback_data="ignore_user"
),
InlineKeyboardButton(
text="Добавить канал",
callback_data="ignore_channel"
)
]
])
await callback_query.message.edit_text(
"Выберите, кого или что игнорировать:",
reply_markup=keyboard
)
async def set_action_category_callback(callback_query: CallbackQuery):
"""
Настройка определённой категории действий.
:param callback_query: Объект обратного вызова
"""
category = callback_query.data.split("_")[-1]
keyboard = InlineKeyboardMarkup(row_width=1, inline_keyboard=[
[
InlineKeyboardButton(
text="Удаление",
callback_data=f"set_action_{category}_delete"
),
InlineKeyboardButton(
text="Мут",
callback_data=f"set_action_{category}_mute"
),
InlineKeyboardButton(
text="Бан",
callback_data=f"set_action_{category}_ban"
)
]
])
await callback_query.message.edit_text(
f"Выберите действие для категории {category.upper()}:",
reply_markup=keyboard
)
async def save_action_callback(callback_query: CallbackQuery):
"""
Сохраняет выбранное действие в базе данных.
:param callback_query: Объект обратного вызова
"""
data = callback_query.data.split("_")
category = data[2]
action = data[3]
chat_id = str(callback_query.message.chat.id)
settings = db.find_document({"chat_id": chat_id})[0] or {}
actions = settings.get("actions", {})
if category not in actions:
actions[category] = []
actions[category]["actions"].append(action)
db.update_document(
{"chat_id": chat_id},
{"actions": actions}
)
await callback_query.answer(
f"Действие '{action}' добавлено в категорию '{category}'.",
show_alert=True
)
async def start_ignore_user(callback_query: CallbackQuery, state: FSMContext):
"""
Запускает процесс добавления пользователя в игнор.
:param callback_query: Объект обратного вызова
:param state: Состояние FSM
"""
await callback_query.message.answer(
"Введите ID или тег пользователя для игнорирования:"
)
await state.set_state(IgnoreState.waiting_for_user_id)
async def start_ignore_channel(callback_query: CallbackQuery, state: FSMContext):
"""
Запускает процесс добавления канала в игнор.
:param callback_query: Объект обратного вызова
:param state: Состояние FSM
"""
await callback_query.message.answer("Введите ID канала для игнорирования:")
await state.set_state(IgnoreState.waiting_for_channel_id)
async def ignore_user_input(message: Message, state: FSMContext):
"""
Обрабатывает ввод пользователя для добавления в игнор.
:param message: Объект сообщения
:param state: Состояние FSM
"""
user_id = message.text.strip()
chat_id = str(message.chat.id)
settings = db.find_document({"chat_id": chat_id})[0] or {}
ignored_users = settings.get("ignored_users", [])
ignored_users.append(user_id)
db.update_document(
{"chat_id": chat_id},
{"ignored_users": ignored_users}
)
await message.answer(f"Пользователь {user_id} добавлен в игнорируемые.")
await state.clear()
async def ignore_channel_input(message: Message, state: FSMContext):
"""
Обрабатывает ввод канала для добавления в игнор.
:param message: Объект сообщения
:param state: Состояние FSM
"""
channel_id = message.text.strip()
chat_id = str(message.chat.id)
settings = db.find_document({"chat_id": chat_id})[0] or {}
ignored_channels = settings.get("ignored_channels", [])
ignored_channels.append(channel_id)
db.update_document(
{"chat_id": chat_id},
{"ignored_channels": ignored_channels}
)
await message.answer(f"Канал {channel_id} добавлен в игнорируемые.")
await state.clear()
class MultiStateFilter(BaseFilter):
async def __call__(self, message: Message, state: FSMContext):
current_state = await state.get_state()
return current_state in IgnoreState
@router.message(MultiStateFilter())
async def message_handler(message: Message, state: FSMContext):
"""
Обработчик сообщений для состояний FSM.
:param message: Объект сообщения
:param state: FSMContext - Контекст состояния
"""
current_state = await state.get_state()
if current_state == IgnoreState.waiting_for_user_id.state:
await ignore_user_input(message, state)
elif current_state == IgnoreState.waiting_for_channel_id.state:
await ignore_channel_input(message, state)
else:
return
+6
View File
@@ -10,6 +10,12 @@ router = Router()
async def is_admin(message: types.Message) -> bool:
"""
Проверяет, является ли пользователь администратором в текущем чате.
:param message: Объект сообщения
:return: True, если пользователь администратор, иначе False
"""
return (
(await message.chat.get_member(message.from_user.id)) in
(await message.chat.get_administrators())