Create app.py

This commit is contained in:
DarkShy
2025-08-23 03:54:47 +03:00
committed by GitHub
parent a781ca68d0
commit 700f1fb52d

164
app.py Normal file
View File

@@ -0,0 +1,164 @@
import asyncio
import aiohttp
import logging
import random
import json
from datetime import datetime, timedelta
from telegram import Bot
from pathlib import Path
import sys
# === Настройки ===
TELEGRAM_TOKEN = "************************"
CHANNEL_ID = -100*********** # числовой ID канала
DERPIBOORU_TOKEN = "токен от дерпибуру"
TAGS_LIST = [
["тег1"],
["тег2"],
["тег3"],
["тег4"]
]
DERPYBOORU_API_SEARCH = "https://derpibooru.org/api/v1/json/search/images"
SENT_IMAGES_FILE = Path("sent_images.json")
FILTER_ID = 56027 # ID фильтра, разрешающего NSFW изображения
# Логи
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("bot.log"),
logging.StreamHandler()
]
)
bot = Bot(token=TELEGRAM_TOKEN)
# Загрузка уже отправленных изображений
if SENT_IMAGES_FILE.exists():
try:
with SENT_IMAGES_FILE.open("r", encoding="utf-8") as f:
data = f.read().strip()
sent_images = set(json.loads(data)) if data else set()
except json.JSONDecodeError:
sent_images = set()
else:
sent_images = set()
async def fetch_image(session, tags):
params = {
"q": " ".join(tags),
"per_page": 50,
"page": 1,
"key": DERPIBOORU_TOKEN
}
if FILTER_ID:
params["filter_id"] = FILTER_ID
backoff = 1
while True:
try:
async with session.get(DERPYBOORU_API_SEARCH, params=params) as response:
if response.status == 200:
data = await response.json()
images = data.get("images", [])
if not images:
logging.warning("Ничего не найдено по тегам: %s", tags)
return None
random.shuffle(images)
for img in images:
reps = img.get("representations", {})
url = reps.get("full") or reps.get("large") or reps.get("medium")
if not url:
continue
if url not in sent_images:
author = img.get("uploader")
source = img.get("view_url")
img_tags = img.get("tags", [])
logging.info("Выбрано изображение: %s", url)
return url, author, source, img_tags
logging.info("Все найденные изображения уже отправлены для тегов: %s", tags)
return None
elif response.status in [500, 501]:
logging.warning("Ошибка %s от Derpibooru. Ждем %s секунд перед повтором...", response.status, backoff)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
else:
logging.error("Ошибка API Derpibooru: %s", response.status)
return None
except aiohttp.ClientError as e:
logging.error("Ошибка запроса к Derpibooru: %s", e)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 900)
async def post_image(tags=None):
async with aiohttp.ClientSession() as session:
if tags is None:
tags = random.choice(TAGS_LIST)
result = await fetch_image(session, tags)
if result:
url, author, source, img_tags = result
caption = f"Автор: {author}\nИсточник: {source}\nТеги: {', '.join(img_tags)}"
try:
await bot.send_photo(chat_id=CHANNEL_ID, photo=url, caption=caption)
logging.info("Отправлено изображение: %s", url)
sent_images.add(url)
with SENT_IMAGES_FILE.open("w", encoding="utf-8") as f:
json.dump(list(sent_images), f)
except Exception as e:
logging.error("Ошибка при отправке изображения: %s", e)
else:
logging.info("Изображение не найдено для тегов: %s", tags)
async def scheduler():
while True:
now = datetime.now()
# Публикуем догоняющее изображение, если запуск не в начале часа
if now.minute != 0 or now.second != 0:
logging.info("Пропущенный час — публикуем сразу")
await post_image()
# Ждем до следующей 00 минуты
next_run = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
wait_seconds = (next_run - datetime.now()).total_seconds()
logging.info("Ждем %.0f секунд до следующей публикации", wait_seconds)
await asyncio.sleep(wait_seconds)
# Публикуем по расписанию
await post_image()
async def command_listener():
loop = asyncio.get_event_loop()
while True:
cmd = await loop.run_in_executor(None, sys.stdin.readline)
cmd = cmd.strip().lower()
if cmd == "postnow" or cmd == ">> postnow":
logging.info("Команда postnow получена, публикуем случайное изображение...")
await post_image()
elif cmd.startswith("posttags "):
tags = cmd[len("posttags "):].split()
logging.info("Команда posttags получена, публикуем изображение с тегами: %s", tags)
await post_image(tags)
else:
logging.info("Неизвестная команда: %s", cmd)
async def main():
await asyncio.gather(
scheduler(),
command_listener()
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Бот остановлен вручную")