import asyncio import aiohttp import logging import random import json import sys from datetime import datetime, timedelta from pathlib import Path from telegram import Bot # === Настройки === TELEGRAM_TOKEN = "токен_вашего_тг_аккаунта" # Токен от Telegram CHANNEL_ID = -100000000000000 # числовой ID канала DERPIBOORU_TOKEN = "xxxxxxxxxxxxxxxx" # Токен от DerpiBooru TAGS_LIST = [ ["gay"], ["female"], ["vulva"], ["creampie"], ] DERPIBOORU_API = "https://derpibooru.org/api/v1/json/search/images" SENT_IMAGES_FILE = Path("sent_images.json") FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения # === Логи === logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("bot.log"), logging.StreamHandler() ] ) bot = Bot(token=TELEGRAM_TOKEN) # === Загружаем уже отправленные изображения === if SENT_IMAGES_FILE.exists(): try: with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f: data = f.read().strip() sent_images = set(json.loads(data)) if data else set() except json.JSONDecodeError: sent_images = set() else: sent_images = set() async def fetch_image(session, tags): """Запрос изображения с Derpibooru по тегам.""" params = { "q": " ".join(tags), "per_page": 50, "page": 1, "key": DERPIBOORU_TOKEN, } if FILTER_ID: params["filter_id"] = FILTER_ID backoff = 1 while True: try: async with session.get(DERPIBOORU_API, params=params) as response: if response.status == 200: data = await response.json() images = data.get("images", []) if not images: logging.warning("Ничего не найдено по тегам: %s", tags) return None random.shuffle(images) for img in images: reps = img.get("representations", {}) url = reps.get("full") or reps.get("large") or reps.get("medium") if not url: continue if url not in sent_images: author = img.get("uploader") source = img.get("view_url") img_tags = img.get("tags", []) logging.info("Выбрано изображение: %s", url) return url, author, source, img_tags logging.info("Все найденные изображения уже отправлены: %s", tags) return None elif response.status in (500, 501): logging.warning( "Ошибка %s от Derpibooru. Ждем %s секунд...", response.status, backoff ) await asyncio.sleep(backoff) backoff = min(backoff * 2, 900) else: logging.error("Ошибка API Derpibooru: %s", response.status) return None except aiohttp.ClientError as e: logging.error("Ошибка запроса к Derpibooru: %s", e) await asyncio.sleep(backoff) backoff = min(backoff * 2, 900) async def post_image(tags=None): """Публикация изображения в канал.""" async with aiohttp.ClientSession() as session: if tags is None: tags = random.choice(TAGS_LIST) result = await fetch_image(session, tags) if result: url, author, source, img_tags = result caption = ( f"Автор: {author}\n" f"Источник: {source}\n" f"Теги: {', '.join(img_tags)}" ) try: await bot.send_photo( chat_id=CHANNEL_ID, photo=url, caption=caption ) logging.info("Отправлено изображение: %s", url) sent_images.add(url) with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f: json.dump(list(sent_images), f) except Exception as e: logging.error("Ошибка при отправке изображения: %s", e) else: logging.info("Изображение не найдено для тегов: %s", tags) async def scheduler(): """Публикует изображение каждый час (в начале часа).""" while True: now = datetime.now() # Если бот запущен не в начале часа — публикуем сразу if now.minute != 0 or now.second != 0: logging.info("Пропущенный час — публикуем сразу") await post_image() # Ждем до следующего часа next_run = (now + timedelta(hours=1)).replace( minute=0, second=0, microsecond=0 ) wait_seconds = (next_run - datetime.now()).total_seconds() logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds) await asyncio.sleep(wait_seconds) # Публикуем по расписанию await post_image() async def command_listener(): """Слушает команды из stdin (postnow, posttags, timetopost).""" loop = asyncio.get_event_loop() while True: cmd = await loop.run_in_executor(None, sys.stdin.readline) cmd = cmd.strip().lower() if cmd in {"postnow", ">> postnow"}: logging.info("Команда postnow получена — публикуем...") await post_image() elif cmd.startswith("posttags "): tags = cmd[len("posttags "):].split() logging.info("Команда posttags получена: %s", tags) await post_image(tags) elif cmd.startswith("timetopost"): now = datetime.now() next_run = (now + timedelta(hours=1)).replace( minute=0, second=0, microsecond=0 ) wait_seconds = (next_run - datetime.now()).total_seconds() logging.info("Времени до публикации: %s", wait_seconds) else: logging.info("Неизвестная команда: %s", cmd) async def main(): """Главный запуск: планировщик + команды.""" await asyncio.gather( scheduler(), command_listener() ) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Бот остановлен вручную")