Files
derpibooru-telegram-bot/app.py
2025-08-23 09:40:39 +03:00

177 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import aiohttp
import logging
import random
import json
from datetime import datetime, timedelta
from telegram import Bot
from pathlib import Path
import sys
# === Настройки ===
TELEGRAM_TOKEN = "************************"
CHANNEL_ID = -100*********** # числовой ID канала
DERPIBOORU_TOKEN = "токен от дерпибуру"
TAGS_LIST = [
["тег1"],
["тег2"],
["тег3"],
["тег4"]
]
DERPYBOORU_API_SEARCH = "https://derpibooru.org/api/v1/json/search/images"
SENT_IMAGES_FILE = Path("sent_images.json")
FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения
# Логи
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("bot.log"),
logging.StreamHandler()
]
)
bot = Bot(token=TELEGRAM_TOKEN)
# Загрузка уже отправленных изображений
if SENT_IMAGES_FILE.exists():
try:
with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f:
data = f.read().strip()
sent_images = set(json.loads(data)) if data else set()
except json.JSONDecodeError:
sent_images = set()
else:
sent_images = set()
async def fetch_image(session, tags):
params = {
"q": " ".join(tags),
"per_page": 50,
"page": 1,
"key": DERPIBOORU_TOKEN
}
if FILTER_ID:
params["filter_id"] = FILTER_ID
backoff = 1
while True:
try:
async with session.get(DERPYBOORU_API_SEARCH, params=params) as response:
if response.status == 200:
data = await response.json()
images = data.get("images", [])
if not images:
logging.warning("Ничего не найдено по тегам: %s", tags)
return None
random.shuffle(images)
for img in images:
reps = img.get("representations", {})
url = reps.get("full") or reps.get("large") or reps.get("medium")
if not url:
continue
if url not in sent_images:
author = img.get("uploader")
source = img.get("view_url")
img_tags = img.get("tags", [])
logging.info("Выбрано изображение: %s", url)
return url, author, source, img_tags
logging.info("Все найденные изображения уже отправлены для тегов: %s", tags)
return None
elif response.status in [500, 501]:
logging.warning("Ошибка %s от Derpibooru. Ждем %s секунд перед повтором...", response.status, backoff)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
else:
logging.error("Ошибка API Derpibooru: %s", response.status)
return None
except aiohttp.ClientError as e:
logging.error("Ошибка запроса к Derpibooru: %s", e)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
async def post_image(tags=None):
async with aiohttp.ClientSession() as session:
if tags is None:
tags = random.choice(TAGS_LIST)
result = await fetch_image(session, tags)
if result:
url, author, source, img_tags = result
caption = f"Автор: {author}\nИсточник: {source}\nТеги: {', '.join(img_tags)}"
try:
await bot.send_photo(chat_id=CHANNEL_ID, photo=url, caption=caption)
logging.info("Отправлено изображение: %s", url)
sent_images.add(url)
with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f:
json.dump(list(sent_images), f)
except Exception as e:
logging.error("Ошибка при отправке изображения: %s", e)
else:
logging.info("Изображение не найдено для тегов: %s", tags)
async def scheduler():
while True:
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующей 00 минуты
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds)
await asyncio.sleep(wait_seconds)
# Публикуем по расписанию
await post_image()
async def command_listener():
loop = asyncio.get_event_loop()
while True:
cmd = await loop.run_in_executor(None, sys.stdin.readline)
cmd = cmd.strip().lower()
if cmd == "postnow" or cmd == ">> postnow":
logging.info("Команда postnow получена, публикуем случайное изображение...")
await post_image()
elif cmd.startswith("posttags "):
tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена, публикуем изображение с тегами: %s", tags)
await post_image(tags)
elif cmd.startswith("timetopost"):
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующей 00 минуты
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Времени до публикации: %s", wait_seconds)
else:
logging.info("Неизвестная команда: %s", cmd)
async def main():
await asyncio.gather(
scheduler(),
command_listener()
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Бот остановлен вручную")