Add tests

This commit is contained in:
DarkShyMW
2025-08-23 09:54:40 +03:00
parent 176dd968d5
commit 16ee57b158
6 changed files with 170 additions and 36 deletions

95
app.py
View File

@@ -3,26 +3,28 @@ import aiohttp
import logging
import random
import json
from datetime import datetime, timedelta
from telegram import Bot
from pathlib import Path
import sys
from datetime import datetime, timedelta
from pathlib import Path
from telegram import Bot
# === Настройки ===
TELEGRAM_TOKEN = "************************"
CHANNEL_ID = -100*********** # числовой ID канала
DERPIBOORU_TOKEN = "токен от дерпибуру"
TELEGRAM_TOKEN = "токен_вашегог_аккаунта" # Токен от Telegram
CHANNEL_ID = -100000000000000 # числовой ID канала
DERPIBOORU_TOKEN = "xxxxxxxxxxxxxxxx" # Токен от DerpiBooru
TAGS_LIST = [
["тег1"],
["тег2"],
["тег3"],
["тег4"]
["gay"],
["female"],
["vulva"],
["creampie"],
]
DERPYBOORU_API_SEARCH = "https://derpibooru.org/api/v1/json/search/images"
DERPIBOORU_API = "https://derpibooru.org/api/v1/json/search/images"
SENT_IMAGES_FILE = Path("sent_images.json")
FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения
# Логи
# === Логи ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
@@ -34,7 +36,7 @@ logging.basicConfig(
bot = Bot(token=TELEGRAM_TOKEN)
# Загрузка уже отправленных изображений
# === Загружаем уже отправленные изображения ===
if SENT_IMAGES_FILE.exists():
try:
with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f:
@@ -47,11 +49,12 @@ else:
async def fetch_image(session, tags):
"""Запрос изображения с Derpibooru по тегам."""
params = {
"q": " ".join(tags),
"per_page": 50,
"page": 1,
"key": DERPIBOORU_TOKEN
"key": DERPIBOORU_TOKEN,
}
if FILTER_ID:
params["filter_id"] = FILTER_ID
@@ -59,10 +62,11 @@ async def fetch_image(session, tags):
backoff = 1
while True:
try:
async with session.get(DERPYBOORU_API_SEARCH, params=params) as response:
async with session.get(DERPIBOORU_API, params=params) as response:
if response.status == 200:
data = await response.json()
images = data.get("images", [])
if not images:
logging.warning("Ничего не найдено по тегам: %s", tags)
return None
@@ -71,6 +75,7 @@ async def fetch_image(session, tags):
for img in images:
reps = img.get("representations", {})
url = reps.get("full") or reps.get("large") or reps.get("medium")
if not url:
continue
if url not in sent_images:
@@ -80,16 +85,21 @@ async def fetch_image(session, tags):
logging.info("Выбрано изображение: %s", url)
return url, author, source, img_tags
logging.info("Все найденные изображения уже отправлены для тегов: %s", tags)
logging.info("Все найденные изображения уже отправлены: %s", tags)
return None
elif response.status in [500, 501]:
logging.warning("Ошибка %s от Derpibooru. Ждем %s секунд перед повтором...", response.status, backoff)
elif response.status in (500, 501):
logging.warning(
"Ошибка %s от Derpibooru. Ждем %s секунд...",
response.status, backoff
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
else:
logging.error("Ошибка API Derpibooru: %s", response.status)
return None
except aiohttp.ClientError as e:
logging.error("Ошибка запроса к Derpibooru: %s", e)
await asyncio.sleep(backoff)
@@ -97,19 +107,31 @@ async def fetch_image(session, tags):
async def post_image(tags=None):
"""Публикация изображения в канал."""
async with aiohttp.ClientSession() as session:
if tags is None:
tags = random.choice(TAGS_LIST)
result = await fetch_image(session, tags)
if result:
url, author, source, img_tags = result
caption = f"Автор: {author}\nИсточник: {source}\nТеги: {', '.join(img_tags)}"
caption = (
f"Автор: {author}\n"
f"Источник: {source}\n"
f"Теги: {', '.join(img_tags)}"
)
try:
await bot.send_photo(chat_id=CHANNEL_ID, photo=url, caption=caption)
await bot.send_photo(
chat_id=CHANNEL_ID,
photo=url,
caption=caption
)
logging.info("Отправлено изображение: %s", url)
sent_images.add(url)
with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f:
json.dump(list(sent_images), f)
except Exception as e:
logging.error("Ошибка при отправке изображения: %s", e)
else:
@@ -117,15 +139,19 @@ async def post_image(tags=None):
async def scheduler():
"""Публикует изображение каждый час (в начале часа)."""
while True:
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
# Если бот запущен не в начале часа — публикуем сразу
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующей 00 минуты
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
# Ждем до следующего часа
next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds)
await asyncio.sleep(wait_seconds)
@@ -135,34 +161,35 @@ async def scheduler():
async def command_listener():
"""Слушает команды из stdin (postnow, posttags, timetopost)."""
loop = asyncio.get_event_loop()
while True:
cmd = await loop.run_in_executor(None, sys.stdin.readline)
cmd = cmd.strip().lower()
if cmd == "postnow" or cmd == ">> postnow":
logging.info("Команда postnow получена, публикуем случайное изображение...")
if cmd in {"postnow", ">> postnow"}:
logging.info("Команда postnow получена — публикуем...")
await post_image()
elif cmd.startswith("posttags "):
tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена, публикуем изображение с тегами: %s", tags)
logging.info("Команда posttags получена: %s", tags)
await post_image(tags)
elif cmd.startswith("timetopost"):
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующей 00 минуты
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Времени до публикации: %s", wait_seconds)
else:
logging.info("Неизвестная команда: %s", cmd)
async def main():
"""Главный запуск: планировщик + команды."""
await asyncio.gather(
scheduler(),
command_listener()