Files
DarkShyMW 16ee57b158 Add tests
2025-08-23 09:54:40 +03:00

204 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import aiohttp
import logging
import random
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
from telegram import Bot
# === Настройки ===
TELEGRAM_TOKEN = "токен_вашегог_аккаунта" # Токен от Telegram
CHANNEL_ID = -100000000000000 # числовой ID канала
DERPIBOORU_TOKEN = "xxxxxxxxxxxxxxxx" # Токен от DerpiBooru
TAGS_LIST = [
["gay"],
["female"],
["vulva"],
["creampie"],
]
DERPIBOORU_API = "https://derpibooru.org/api/v1/json/search/images"
SENT_IMAGES_FILE = Path("sent_images.json")
FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения
# === Логи ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("bot.log"),
logging.StreamHandler()
]
)
bot = Bot(token=TELEGRAM_TOKEN)
# === Загружаем уже отправленные изображения ===
if SENT_IMAGES_FILE.exists():
try:
with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f:
data = f.read().strip()
sent_images = set(json.loads(data)) if data else set()
except json.JSONDecodeError:
sent_images = set()
else:
sent_images = set()
async def fetch_image(session, tags):
"""Запрос изображения с Derpibooru по тегам."""
params = {
"q": " ".join(tags),
"per_page": 50,
"page": 1,
"key": DERPIBOORU_TOKEN,
}
if FILTER_ID:
params["filter_id"] = FILTER_ID
backoff = 1
while True:
try:
async with session.get(DERPIBOORU_API, params=params) as response:
if response.status == 200:
data = await response.json()
images = data.get("images", [])
if not images:
logging.warning("Ничего не найдено по тегам: %s", tags)
return None
random.shuffle(images)
for img in images:
reps = img.get("representations", {})
url = reps.get("full") or reps.get("large") or reps.get("medium")
if not url:
continue
if url not in sent_images:
author = img.get("uploader")
source = img.get("view_url")
img_tags = img.get("tags", [])
logging.info("Выбрано изображение: %s", url)
return url, author, source, img_tags
logging.info("Все найденные изображения уже отправлены: %s", tags)
return None
elif response.status in (500, 501):
logging.warning(
"Ошибка %s от Derpibooru. Ждем %s секунд...",
response.status, backoff
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
else:
logging.error("Ошибка API Derpibooru: %s", response.status)
return None
except aiohttp.ClientError as e:
logging.error("Ошибка запроса к Derpibooru: %s", e)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
async def post_image(tags=None):
"""Публикация изображения в канал."""
async with aiohttp.ClientSession() as session:
if tags is None:
tags = random.choice(TAGS_LIST)
result = await fetch_image(session, tags)
if result:
url, author, source, img_tags = result
caption = (
f"Автор: {author}\n"
f"Источник: {source}\n"
f"Теги: {', '.join(img_tags)}"
)
try:
await bot.send_photo(
chat_id=CHANNEL_ID,
photo=url,
caption=caption
)
logging.info("Отправлено изображение: %s", url)
sent_images.add(url)
with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f:
json.dump(list(sent_images), f)
except Exception as e:
logging.error("Ошибка при отправке изображения: %s", e)
else:
logging.info("Изображение не найдено для тегов: %s", tags)
async def scheduler():
"""Публикует изображение каждый час (в начале часа)."""
while True:
now = datetime.now()
# Если бот запущен не в начале часа — публикуем сразу
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующего часа
next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds)
await asyncio.sleep(wait_seconds)
# Публикуем по расписанию
await post_image()
async def command_listener():
"""Слушает команды из stdin (postnow, posttags, timetopost)."""
loop = asyncio.get_event_loop()
while True:
cmd = await loop.run_in_executor(None, sys.stdin.readline)
cmd = cmd.strip().lower()
if cmd in {"postnow", ">> postnow"}:
logging.info("Команда postnow получена — публикуем...")
await post_image()
elif cmd.startswith("posttags "):
tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена: %s", tags)
await post_image(tags)
elif cmd.startswith("timetopost"):
now = datetime.now()
next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Времени до публикации: %s", wait_seconds)
else:
logging.info("Неизвестная команда: %s", cmd)
async def main():
"""Главный запуск: планировщик + команды."""
await asyncio.gather(
scheduler(),
command_listener()
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Бот остановлен вручную")