Compare commits

4 Commits
1.0.0 ... main

Author SHA1 Message Date
aabcfdff88 Обновить README.md 2026-02-16 20:58:08 +03:00
DarkShyMW
c9a3d40458 тесты 2025-08-23 10:11:53 +03:00
DarkShyMW
d01d58cc7f Merge branch 'main' of https://github.com/DarkShyMW/derpibooru-telegram-bot 2025-08-23 09:54:43 +03:00
DarkShyMW
16ee57b158 Add tests 2025-08-23 09:54:40 +03:00
8 changed files with 164 additions and 37 deletions

33
.github/workflows/tests.yml vendored Normal file
View File

@@ -0,0 +1,33 @@
name: Python application
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.11]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest
- name: Run selected tests
run: |
pytest tests/test_fetch.py tests/test_post.py --maxfail=1 --disable-warnings -q

7
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -1,5 +1,4 @@
# 🐴 Derpibooru Telegram Bot # 🐴 Derpibooru Telegram Bot
Telegram-бот, который автоматически публикует изображения с [Derpibooru](https://derpibooru.org) по заданным тегам. Telegram-бот, который автоматически публикует изображения с [Derpibooru](https://derpibooru.org) по заданным тегам.
Особенности: Особенности:

101
app.py
View File

@@ -3,26 +3,28 @@ import aiohttp
import logging import logging
import random import random
import json import json
from datetime import datetime, timedelta
from telegram import Bot
from pathlib import Path
import sys import sys
from datetime import datetime, timedelta
from pathlib import Path
from telegram import Bot
# === Настройки === # === Настройки ===
TELEGRAM_TOKEN = "************************" TELEGRAM_TOKEN = "токен_вашегог_аккаунта" # Токен от Telegram
CHANNEL_ID = -100*********** # числовой ID канала CHANNEL_ID = -100000000000000 # числовой ID канала
DERPIBOORU_TOKEN = "токен от дерпибуру" DERPIBOORU_TOKEN = "xxxxxxxxxxxxxxxx" # Токен от DerpiBooru
TAGS_LIST = [ TAGS_LIST = [
["тег1"], ["gay"],
["тег2"], ["female"],
["тег3"], ["vulva"],
["тег4"] ["creampie"],
] ]
DERPYBOORU_API_SEARCH = "https://derpibooru.org/api/v1/json/search/images"
DERPIBOORU_API = "https://derpibooru.org/api/v1/json/search/images"
SENT_IMAGES_FILE = Path("sent_images.json") SENT_IMAGES_FILE = Path("sent_images.json")
FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения
# Логи # === Логи ===
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s", format="%(asctime)s [%(levelname)s] %(message)s",
@@ -34,7 +36,7 @@ logging.basicConfig(
bot = Bot(token=TELEGRAM_TOKEN) bot = Bot(token=TELEGRAM_TOKEN)
# Загрузка уже отправленных изображений # === Загружаем уже отправленные изображения ===
if SENT_IMAGES_FILE.exists(): if SENT_IMAGES_FILE.exists():
try: try:
with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f: with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f:
@@ -47,11 +49,12 @@ else:
async def fetch_image(session, tags): async def fetch_image(session, tags):
"""Запрос изображения с Derpibooru по тегам."""
params = { params = {
"q": " ".join(tags), "q": " ".join(tags),
"per_page": 50, "per_page": 50,
"page": 1, "page": 1,
"key": DERPIBOORU_TOKEN "key": DERPIBOORU_TOKEN,
} }
if FILTER_ID: if FILTER_ID:
params["filter_id"] = FILTER_ID params["filter_id"] = FILTER_ID
@@ -59,10 +62,11 @@ async def fetch_image(session, tags):
backoff = 1 backoff = 1
while True: while True:
try: try:
async with session.get(DERPYBOORU_API_SEARCH, params=params) as response: async with session.get(DERPIBOORU_API, params=params) as response:
if response.status == 200: if response.status == 200:
data = await response.json() data = await response.json()
images = data.get("images", []) images = data.get("images", [])
if not images: if not images:
logging.warning("Ничего не найдено по тегам: %s", tags) logging.warning("Ничего не найдено по тегам: %s", tags)
return None return None
@@ -71,6 +75,7 @@ async def fetch_image(session, tags):
for img in images: for img in images:
reps = img.get("representations", {}) reps = img.get("representations", {})
url = reps.get("full") or reps.get("large") or reps.get("medium") url = reps.get("full") or reps.get("large") or reps.get("medium")
if not url: if not url:
continue continue
if url not in sent_images: if url not in sent_images:
@@ -80,16 +85,21 @@ async def fetch_image(session, tags):
logging.info("Выбрано изображение: %s", url) logging.info("Выбрано изображение: %s", url)
return url, author, source, img_tags return url, author, source, img_tags
logging.info("Все найденные изображения уже отправлены для тегов: %s", tags) logging.info("Все найденные изображения уже отправлены: %s", tags)
return None return None
elif response.status in [500, 501]: elif response.status in (500, 501):
logging.warning("Ошибка %s от Derpibooru. Ждем %s секунд перед повтором...", response.status, backoff) logging.warning(
"Ошибка %s от Derpibooru. Ждем %s секунд...",
response.status, backoff
)
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900) backoff = min(backoff * 2, 900)
else: else:
logging.error("Ошибка API Derpibooru: %s", response.status) logging.error("Ошибка API Derpibooru: %s", response.status)
return None return None
except aiohttp.ClientError as e: except aiohttp.ClientError as e:
logging.error("Ошибка запроса к Derpibooru: %s", e) logging.error("Ошибка запроса к Derpibooru: %s", e)
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
@@ -97,19 +107,31 @@ async def fetch_image(session, tags):
async def post_image(tags=None): async def post_image(tags=None):
"""Публикация изображения в канал."""
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
if tags is None: if tags is None:
tags = random.choice(TAGS_LIST) tags = random.choice(TAGS_LIST)
result = await fetch_image(session, tags) result = await fetch_image(session, tags)
if result: if result:
url, author, source, img_tags = result url, author, source, img_tags = result
caption = f"Автор: {author}\nИсточник: {source}\nТеги: {', '.join(img_tags)}" caption = (
f"Автор: {author}\n"
f"Источник: {source}\n"
f"Теги: {', '.join(img_tags)}"
)
try: try:
await bot.send_photo(chat_id=CHANNEL_ID, photo=url, caption=caption) await bot.send_photo(
chat_id=CHANNEL_ID,
photo=url,
caption=caption
)
logging.info("Отправлено изображение: %s", url) logging.info("Отправлено изображение: %s", url)
sent_images.add(url) sent_images.add(url)
with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f: with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f:
json.dump(list(sent_images), f) json.dump(list(sent_images), f)
except Exception as e: except Exception as e:
logging.error("Ошибка при отправке изображения: %s", e) logging.error("Ошибка при отправке изображения: %s", e)
else: else:
@@ -117,15 +139,19 @@ async def post_image(tags=None):
async def scheduler(): async def scheduler():
"""Публикует изображение каждый час (в начале часа)."""
while True: while True:
now = datetime.now() now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
# Если бот запущен не в начале часа — публикуем сразу
if now.minute != 0 or now.second != 0: if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу") logging.info("Пропущенный час — публикуем сразу")
await post_image() await post_image()
# Ждем до следующей 00 минуты # Ждем до следующего часа
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds() wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds) logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds)
await asyncio.sleep(wait_seconds) await asyncio.sleep(wait_seconds)
@@ -135,26 +161,26 @@ async def scheduler():
async def command_listener(): async def command_listener():
"""Слушает команды из stdin (postnow, posttags, timetopost)."""
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
while True: while True:
cmd = await loop.run_in_executor(None, sys.stdin.readline) cmd = await loop.run_in_executor(None, sys.stdin.readline)
cmd = cmd.strip().lower() cmd = cmd.strip().lower()
if cmd == "postnow" or cmd == ">> postnow":
logging.info("Команда postnow получена, публикуем случайное изображение...") if cmd in {"postnow", ">> postnow"}:
await post_image() logging.info("Команда postnow получена — публикуем...")
elif cmd.startswith("posttags "):
tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена, публикуем изображение с тегами: %s", tags)
await post_image(tags)
elif cmd.startswith("timetopost"):
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image() await post_image()
# Ждем до следующей 00 минуты elif cmd.startswith("posttags "):
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена: %s", tags)
await post_image(tags)
elif cmd.startswith("timetopost"):
now = datetime.now()
next_run = (now + timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_seconds = (next_run - datetime.now()).total_seconds() wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Времени до публикации: %s", wait_seconds) logging.info("Времени до публикации: %s", wait_seconds)
@@ -163,6 +189,7 @@ async def command_listener():
async def main(): async def main():
"""Главный запуск: планировщик + команды."""
await asyncio.gather( await asyncio.gather(
scheduler(), scheduler(),
command_listener() command_listener()

View File

@@ -1,2 +1,4 @@
python-telegram-bot==20.5 pytest
aiohttp==3.9.5 pytest-asyncio
aiohttp
aioresponses

0
tests/__init__.py Normal file
View File

37
tests/test_fetch.py Normal file
View File

@@ -0,0 +1,37 @@
from unittest.mock import AsyncMock, MagicMock
import pytest
import app
app.sent_images = set() # сброс уже отправленных изображений
@pytest.mark.asyncio
async def test_fetch_image_success():
fake_tags = ["gay"]
# Ответ от API
mock_response_data = {
"images": [
{
"representations": {"full": "http://example.com/img.jpg"},
"uploader": "tester",
"view_url": "http://derpibooru.org/img/1",
"tags": ["gay", "pony"]
}
]
}
# Мок объекта ответа, поддерживающий async with
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value=mock_response_data)
# Мок объекта session.get, который возвращает mock_response через __aenter__
mock_session = MagicMock()
mock_session.get.return_value.__aenter__.return_value = mock_response
url, author, source, img_tags = await app.fetch_image(mock_session, fake_tags)
assert url == "http://example.com/img.jpg"
assert author == "tester"
assert source == "http://derpibooru.org/img/1"
assert "gay" in img_tags

22
tests/test_post.py Normal file
View File

@@ -0,0 +1,22 @@
# tests/test_post.py
import pytest
from unittest.mock import AsyncMock, patch
import app
@pytest.mark.asyncio
async def test_post_image_success():
# Мокаем fetch_image
with patch("app.fetch_image", new_callable=AsyncMock) as mock_fetch:
mock_fetch.return_value = (
"http://example.com/img.jpg",
"tester",
"http://derpibooru.org/img/1",
["gay", "pony"]
)
# Заменяем весь объект bot на мок
mock_bot = AsyncMock()
with patch.object(app, "bot", new=mock_bot):
await app.post_image()
mock_bot.send_photo.assert_awaited_once()