Files
furry_giffer_bot_mastodon/giffer_bot.py
2026-01-09 20:44:49 +03:00

792 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import io
import os
import re
import time
import random
import logging
import socket
import tempfile
import subprocess
import json
from logging.handlers import RotatingFileHandler
from typing import Optional, Tuple, Dict, Any, Callable, TypeVar, List
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
import requests
from requests.adapters import HTTPAdapter
try:
from urllib3.util.retry import Retry
except Exception:
Retry = None # type: ignore
try:
import imageio_ffmpeg
except Exception:
imageio_ffmpeg = None # type: ignore
# =======================
# ENV loader (Windows-friendly)
# =======================
def load_env_file(path: str = "config.env") -> None:
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#") or line.startswith(";"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip()
v = v.strip()
if v and v[0] not in "\"'":
m = re.search(r"\s+#", v)
if m:
v = v[: m.start()].rstrip()
v = v.strip().strip('"').strip("'")
if k:
os.environ.setdefault(k, v)
load_env_file()
# =======================
# CONFIG
# =======================
MASTODON_BASE_URL = os.getenv("MASTODON_BASE_URL", "https://bronyfurry.com").strip().rstrip("/")
MASTODON_ACCESS_TOKEN = os.getenv("MASTODON_ACCESS_TOKEN", "").strip()
FURBOORU_BASE_URL = os.getenv("FURBOORU_BASE_URL", "https://furbooru.org").strip().rstrip("/")
FURBOORU_API_KEY = os.getenv("FURBOORU_API_KEY", "").strip()
SAFE_FILTER_ID = os.getenv("SAFE_FILTER_ID", "").strip()
NSFW_FILTER_ID = os.getenv("NSFW_FILTER_ID", "").strip()
USER_AGENT = os.getenv("USER_AGENT", "giffer-bot/3.3 (by @giffer@bronyfurry.com)").strip()
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "30"))
# Rate-limit / anti-spam
USER_COOLDOWN_SEC = int(os.getenv("USER_COOLDOWN_SEC", "20"))
GLOBAL_RATE_PER_SEC = float(os.getenv("GLOBAL_RATE_PER_SEC", "1.0"))
GLOBAL_BURST = int(os.getenv("GLOBAL_BURST", "3"))
# Network hardening
CONNECT_TIMEOUT = float(os.getenv("CONNECT_TIMEOUT", "5"))
READ_TIMEOUT = float(os.getenv("READ_TIMEOUT", "20"))
SOCKET_DEFAULT_TIMEOUT = float(os.getenv("SOCKET_DEFAULT_TIMEOUT", "25"))
# Watchdog for Mastodon calls
MASTO_CALL_TIMEOUT = float(os.getenv("MASTO_CALL_TIMEOUT", "25"))
# Media download safety
MAX_GIF_BYTES = int(os.getenv("MAX_GIF_BYTES", str(25 * 1024 * 1024))) # 25MB
DOWNLOAD_TIMEOUT = float(os.getenv("DOWNLOAD_TIMEOUT", "40"))
# Wait for Mastodon media processing
MEDIA_PROCESS_MAX_WAIT = float(os.getenv("MEDIA_PROCESS_MAX_WAIT", "60"))
NSFW_VISIBILITY = os.getenv("NSFW_VISIBILITY", "public").strip() # public/unlisted/private/direct
# Logging
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_FILE = os.getenv("LOG_FILE", "giffer.log").strip() or "giffer.log"
# State (for "do not reply twice")
STATE_FILE = os.getenv("STATE_FILE", "giffer_state.json").strip() or "giffer_state.json"
PROCESSED_CACHE_MAX = int(os.getenv("PROCESSED_CACHE_MAX", "800")) # how many status_ids to remember
# =======================
# LOGGING
# =======================
logger = logging.getLogger("giffer")
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
logger.handlers.clear()
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(fmt)
logger.addHandler(ch)
fh = RotatingFileHandler(LOG_FILE, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)
# =======================
# GLOBAL NETWORK SETTINGS
# =======================
socket.setdefaulttimeout(SOCKET_DEFAULT_TIMEOUT)
def make_session() -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": USER_AGENT})
if Retry is not None:
retry = Retry(
total=3,
connect=3,
read=2,
status=2,
backoff_factor=0.6,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=("GET", "POST"),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=10, pool_maxsize=10)
else:
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
s.mount("https://", adapter)
s.mount("http://", adapter)
return s
http = make_session()
_executor = ThreadPoolExecutor(max_workers=4)
T = TypeVar("T")
def run_with_timeout(fn: Callable[[], T], timeout: float, what: str) -> Optional[T]:
fut = _executor.submit(fn)
try:
return fut.result(timeout=timeout)
except FuturesTimeoutError:
logger.error("Timeout: %s (>%ss). Continuing.", what, timeout)
return None
except Exception as e:
logger.error("Error in %s: %s", what, e)
return None
# =======================
# STATE (persist last_seen + processed status_ids)
# =======================
def load_state() -> Dict[str, Any]:
if not os.path.exists(STATE_FILE):
return {"last_seen_notif_id": None, "processed_status_ids": []}
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if "last_seen_notif_id" not in data:
data["last_seen_notif_id"] = None
if "processed_status_ids" not in data or not isinstance(data["processed_status_ids"], list):
data["processed_status_ids"] = []
return data
except Exception as e:
logger.warning("Failed to read state file (%s). Starting fresh.", e)
return {"last_seen_notif_id": None, "processed_status_ids": []}
def save_state(state: Dict[str, Any]) -> None:
try:
tmp = STATE_FILE + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False)
os.replace(tmp, STATE_FILE)
except Exception as e:
logger.warning("Failed to write state file: %s", e)
def remember_processed(state: Dict[str, Any], status_id: int) -> None:
arr = state.get("processed_status_ids", [])
arr.append(int(status_id))
# keep last N
if len(arr) > PROCESSED_CACHE_MAX:
arr = arr[-PROCESSED_CACHE_MAX:]
state["processed_status_ids"] = arr
def is_processed(state: Dict[str, Any], status_id: int) -> bool:
try:
return int(status_id) in set(state.get("processed_status_ids", []))
except Exception:
return False
# =======================
# RATE LIMIT HELPERS
# =======================
_user_last: Dict[str, float] = {}
_global_tokens = float(GLOBAL_BURST)
_global_last = time.monotonic()
def user_allowed(acct: str) -> bool:
now = time.time()
last = _user_last.get(acct, 0.0)
if now - last < USER_COOLDOWN_SEC:
logger.info("Cooldown hit for user=%s (wait %.1fs)", acct, USER_COOLDOWN_SEC - (now - last))
return False
_user_last[acct] = now
return True
def global_wait_if_needed() -> None:
global _global_tokens, _global_last
now = time.monotonic()
elapsed = now - _global_last
_global_last = now
_global_tokens = min(float(GLOBAL_BURST), _global_tokens + elapsed * GLOBAL_RATE_PER_SEC)
if _global_tokens >= 1.0:
_global_tokens -= 1.0
return
need = (1.0 - _global_tokens) / max(GLOBAL_RATE_PER_SEC, 0.001)
logger.info("Global rate-limit wait %.2fs", need)
time.sleep(max(0.0, need))
_global_tokens = 0.0
# =======================
# TEXT HELPERS
# =======================
def strip_html(html: str) -> str:
text = re.sub(r"<[^>]+>", " ", html)
return re.sub(r"\s+", " ", text).strip()
def parse_query(content_html: str) -> Tuple[str, bool]:
text = strip_html(content_html).lower()
text = re.sub(r"@\s*[a-z0-9_]+(?:@[a-z0-9\.\-]+)?", " ", text, flags=re.I)
text = re.sub(r"\s+", " ", text).strip()
is_nsfw = bool(re.search(r"\bnsfw\b", text))
text = re.sub(r"\bnsfw\b", " ", text).strip()
text = re.sub(r"\s+", " ", text).strip()
return text, is_nsfw
def split_tags(query: str) -> List[str]:
q = (query or "").strip()
if not q:
return []
q = q.replace(",", " ")
parts = re.findall(r'"([^"]+)"|(\S+)', q)
tokens = []
for a, b in parts:
t = (a or b).strip()
if t:
tokens.append(t)
tokens = [t for t in tokens if t.lower() not in {"random", "rnd"}]
return tokens
def safe_visibility(v: str) -> str:
v = (v or "public").strip().lower()
return v if v in {"public", "unlisted", "private", "direct"} else "public"
def make_alt_text(img: Dict[str, Any], query: str, is_nsfw: bool) -> str:
tags = img.get("tags", [])
if isinstance(tags, list):
raw = [str(t).strip() for t in tags]
elif isinstance(tags, str):
raw = [t.strip() for t in tags.split(",")]
else:
raw = []
block = {"safe", "questionable", "explicit", "nsfw", "sfw", "animated", "gif"}
cleaned = []
for t in raw:
if not t:
continue
t2 = t.lower().strip()
if t2 in block:
continue
cleaned.append(t2.replace("_", " "))
seen = set()
uniq = []
for t in cleaned:
if t in seen:
continue
seen.add(t)
uniq.append(t)
prefix = "NSFW animated GIF" if is_nsfw else "Animated GIF"
if uniq:
return (f"{prefix}: " + ", ".join(uniq[:20]))[:900]
if query:
return f"{prefix} (from query): {query[:900]}"
return prefix
def source_link(img: Dict[str, Any]) -> str:
vu = img.get("view_url")
if isinstance(vu, str) and vu.startswith("http"):
return vu
img_id = img.get("id")
if img_id is not None:
return f"{FURBOORU_BASE_URL}/images/{img_id}"
return FURBOORU_BASE_URL
# =======================
# FURBOORU
# =======================
def furbooru_search_gif(query: str, nsfw: bool) -> Optional[Dict[str, Any]]:
global_wait_if_needed()
url = f"{FURBOORU_BASE_URL}/api/v1/json/search/images"
base_tags = ["animated", "gif"] if nsfw else ["safe", "animated", "gif"]
user_tags = split_tags(query)
all_tags = user_tags + base_tags
q = ", ".join(all_tags) if all_tags else ", ".join(base_tags)
params: Dict[str, Any] = {"q": q, "per_page": 50, "page": 1}
if FURBOORU_API_KEY:
params["key"] = FURBOORU_API_KEY
if nsfw and NSFW_FILTER_ID:
params["filter_id"] = NSFW_FILTER_ID
if (not nsfw) and SAFE_FILTER_ID:
params["filter_id"] = SAFE_FILTER_ID
backoff = 1.0
for attempt in range(4):
logger.info("Furbooru search attempt=%d q=%s", attempt + 1, q)
try:
r = http.get(url, params=params, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT))
except requests.RequestException as e:
logger.error("Furbooru request error: %s", e)
return None
if r.status_code == 429:
logger.warning("Furbooru 429 Too Many Requests. Backoff %.1fs", backoff)
time.sleep(backoff)
backoff *= 2
continue
if r.status_code >= 400:
logger.error("Furbooru HTTP %s: %r", r.status_code, (r.text or "")[:200])
return None
try:
data = r.json()
except Exception as e:
logger.error("Furbooru JSON parse error: %s", e)
return None
images = data.get("images", [])
candidates = []
for img in images:
if img.get("format") != "gif":
continue
if not img.get("thumbnails_generated", False):
continue
reps = img.get("representations") or {}
if not any(reps.get(k) for k in ("full", "large", "medium", "small", "thumb")):
continue
candidates.append(img)
logger.info("Furbooru found=%d candidates=%d", len(images), len(candidates))
return random.choice(candidates) if candidates else None
return None
def representation_candidates(img: Dict[str, Any]) -> List[str]:
reps = img.get("representations") or {}
keys = ["full", "large", "medium", "small", "thumb"]
out: List[str] = []
for k in keys:
u = reps.get(k)
if isinstance(u, str) and u.startswith("http") and u not in out:
out.append(u)
return out
def download_bytes(url: str, max_bytes: int) -> Optional[bytes]:
logger.info("Downloading %s", url)
try:
with http.get(url, timeout=(CONNECT_TIMEOUT, DOWNLOAD_TIMEOUT), stream=True) as r:
if r.status_code >= 400:
logger.error("Download HTTP %s", r.status_code)
return None
total = 0
chunks = []
for chunk in r.iter_content(chunk_size=64 * 1024):
if not chunk:
continue
total += len(chunk)
if total > max_bytes:
raise ValueError(f"File too large (> {max_bytes} bytes)")
chunks.append(chunk)
logger.info("Downloaded bytes=%d", total)
return b"".join(chunks)
except ValueError:
raise
except requests.RequestException as e:
logger.error("Download error: %s", e)
return None
# =======================
# GIF -> MP4 (ffmpeg)
# =======================
def find_ffmpeg_exe() -> Optional[str]:
for cmd in ("ffmpeg", "ffmpeg.exe"):
try:
p = subprocess.run([cmd, "-version"], capture_output=True, text=True)
if p.returncode == 0:
return cmd
except Exception:
pass
if imageio_ffmpeg is not None:
try:
return imageio_ffmpeg.get_ffmpeg_exe()
except Exception:
return None
return None
def gif_bytes_to_mp4(gif_bytes: bytes) -> Optional[bytes]:
ffmpeg = find_ffmpeg_exe()
if not ffmpeg:
logger.error("FFmpeg not found. Install ffmpeg or `pip install imageio-ffmpeg`.")
return None
with tempfile.TemporaryDirectory() as td:
in_gif = os.path.join(td, "in.gif")
out_mp4 = os.path.join(td, "out.mp4")
with open(in_gif, "wb") as f:
f.write(gif_bytes)
cmd = [
ffmpeg, "-y",
"-i", in_gif,
"-vf", "scale=trunc(iw/2)*2:trunc(ih/2)*2",
"-movflags", "+faststart",
"-pix_fmt", "yuv420p",
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "28",
"-an",
out_mp4
]
try:
p = subprocess.run(cmd, capture_output=True, text=True)
except Exception as e:
logger.error("FFmpeg execution error: %s", e)
return None
if p.returncode != 0:
logger.error("FFmpeg failed: %s", (p.stderr or "")[-400:])
return None
try:
with open(out_mp4, "rb") as f:
mp4 = f.read()
logger.info("Converted GIF->MP4 bytes=%d", len(mp4))
return mp4
except Exception as e:
logger.error("Reading MP4 failed: %s", e)
return None
# =======================
# MASTODON
# =======================
def init_mastodon():
if not MASTODON_ACCESS_TOKEN:
raise SystemExit("MASTODON_ACCESS_TOKEN is empty. Put it into config.env or env vars.")
from mastodon import Mastodon
return Mastodon(
access_token=MASTODON_ACCESS_TOKEN,
api_base_url=MASTODON_BASE_URL,
user_agent=USER_AGENT,
request_timeout=max(CONNECT_TIMEOUT + READ_TIMEOUT, 10.0),
)
def upload_media(mastodon, data: bytes, mime: str, alt_text: str) -> Optional[int]:
bio = io.BytesIO(data)
bio.name = "giffer." + ("mp4" if mime == "video/mp4" else "gif")
logger.info("Uploading media mime=%s (alt_len=%d)", mime, len(alt_text))
def _call():
return mastodon.media_post(media_file=bio, mime_type=mime, description=alt_text)
media = run_with_timeout(_call, MASTO_CALL_TIMEOUT, f"mastodon.media_post({mime})")
if not media:
return None
return int(media["id"])
def wait_media_ready(mastodon, media_id: int, max_wait: float) -> bool:
start = time.time()
delay = 0.6
while time.time() - start < max_wait:
att = run_with_timeout(lambda: mastodon.media(media_id), MASTO_CALL_TIMEOUT, "mastodon.media(get)")
if att and att.get("url"):
return True
time.sleep(delay)
delay = min(delay * 1.4, 3.0)
return False
def post_reply_safe(mastodon, status_id: int, text: str, visibility: str) -> None:
def _call():
return mastodon.status_post(text, in_reply_to_id=status_id, visibility=visibility)
run_with_timeout(_call, MASTO_CALL_TIMEOUT, "mastodon.status_post(reply)")
def post_status_with_media(mastodon, status_id: int, text: str, media_id: int,
visibility: str, nsfw: bool) -> bool:
if nsfw:
def _call():
return mastodon.status_post(
text,
in_reply_to_id=status_id,
media_ids=[media_id],
sensitive=True,
spoiler_text="NSFW",
visibility=visibility,
)
res = run_with_timeout(_call, MASTO_CALL_TIMEOUT, "mastodon.status_post(nsfw)")
return res is not None
else:
def _call():
return mastodon.status_post(
text,
in_reply_to_id=status_id,
media_ids=[media_id],
visibility=visibility,
)
res = run_with_timeout(_call, MASTO_CALL_TIMEOUT, "mastodon.status_post(sfw)")
return res is not None
def is_mastodon_422_unsupported(err_msg: str) -> bool:
s = (err_msg or "").lower()
return (" 422" in s or "unprocessable" in s) and ("not supported" in s or "gif" in s or "supported" in s)
def upload_gif_then_mp4_fallback(mastodon, img: Dict[str, Any], alt: str) -> Tuple[Optional[int], str]:
urls = representation_candidates(img)
if not urls:
return None, ""
last_err = ""
# GIF attempts (big -> small)
for u in urls:
try:
gif_bytes = download_bytes(u, MAX_GIF_BYTES)
if not gif_bytes:
last_err = "download failed"
continue
mid = upload_media(mastodon, gif_bytes, "image/gif", alt)
if mid is not None:
return mid, "image/gif"
last_err = "upload timeout/none"
except Exception as e:
last_err = str(e)
if is_mastodon_422_unsupported(last_err):
logger.warning("GIF rejected (422) for %s: %s. Trying smaller...", u, last_err)
continue
logger.error("GIF upload failed hard: %s", last_err)
break
# MP4 fallback (use smallest downloadable gif)
logger.info("Trying MP4 fallback...")
gif_bytes = None
for u in reversed(urls):
b = download_bytes(u, MAX_GIF_BYTES)
if b:
gif_bytes = b
break
if not gif_bytes:
logger.error("Cannot download any GIF for MP4 conversion. Last error: %s", last_err)
return None, ""
mp4 = gif_bytes_to_mp4(gif_bytes)
if not mp4:
return None, ""
mid = upload_media(mastodon, mp4, "video/mp4", alt)
if mid is None:
return None, ""
return mid, "video/mp4"
# =======================
# MAIN LOOP
# =======================
def reply_text(prefix: str, query: str, src: str, used_mime: str) -> str:
q = query if query else "random"
note = "\n(конвертировано в MP4 из-за ограничений инстанса)" if used_mime == "video/mp4" else ""
return f"{prefix} по запросу: `{q}`{note}\nОриг: {src}"
def main():
state = load_state()
last_seen_id = state.get("last_seen_notif_id", None)
# Convert to int if it's stored as string
try:
if last_seen_id is not None:
last_seen_id = int(last_seen_id)
except Exception:
last_seen_id = None
processed_set = set(int(x) for x in state.get("processed_status_ids", []) if isinstance(x, int) or str(x).isdigit())
# normalize back into state list
state["processed_status_ids"] = list(processed_set)[-PROCESSED_CACHE_MAX:]
save_state(state)
mastodon = init_mastodon()
from mastodon import MastodonError
logger.info("Starting giffer bot on %s (Furbooru: %s)", MASTODON_BASE_URL, FURBOORU_BASE_URL)
logger.info("Loaded state: last_seen_notif_id=%s processed_cache=%d", last_seen_id, len(state["processed_status_ids"]))
while True:
logger.info("Polling mentions... since_id=%s", last_seen_id)
def _fetch():
return mastodon.notifications(types=["mention"], since_id=last_seen_id)
notifs = run_with_timeout(_fetch, MASTO_CALL_TIMEOUT, "mastodon.notifications(mention)")
if notifs is None:
time.sleep(CHECK_INTERVAL)
continue
logger.info("Got %d mention notifications", len(notifs))
for notif in reversed(notifs):
notif_id = notif.get("id")
if notif_id is not None:
try:
last_seen_id = int(notif_id)
state["last_seen_notif_id"] = last_seen_id
save_state(state) # <- important: persist immediately
except Exception:
pass
acct = notif.get("account", {}).get("acct", "unknown")
status = notif.get("status") or {}
status_id = status.get("id")
if not status_id:
continue
try:
status_id_int = int(status_id)
except Exception:
continue
# ---- NEW: don't reply twice ----
if status_id_int in processed_set:
logger.info("Skipping already processed status_id=%s", status_id_int)
continue
# mark as processed early to prevent double-processing if we crash mid-way
processed_set.add(status_id_int)
remember_processed(state, status_id_int)
save_state(state)
if not user_allowed(acct):
continue
query, is_nsfw = parse_query(status.get("content", ""))
logger.info("Mention from=%s nsfw=%s query=%r status_id=%s", acct, is_nsfw, query, status_id_int)
try:
img = furbooru_search_gif(query, is_nsfw)
if not img:
post_reply_safe(
mastodon,
status_id_int,
f"Ничего не нашёл по запросу: `{query or 'random'}` 😿",
safe_visibility(status.get("visibility", "public")),
)
continue
src = source_link(img)
alt = make_alt_text(img, query, is_nsfw)
media_id, used_mime = upload_gif_then_mp4_fallback(mastodon, img, alt)
if not media_id:
post_reply_safe(
mastodon,
status_id_int,
"Не смог загрузить (инстанс отклоняет GIF/видео или проблемы сети). Попробуй другой запрос 🙏",
safe_visibility(status.get("visibility", "public")),
)
continue
logger.info("Uploaded media_id=%s mime=%s. Waiting processing...", media_id, used_mime)
if not wait_media_ready(mastodon, media_id, max_wait=MEDIA_PROCESS_MAX_WAIT):
post_reply_safe(
mastodon,
status_id_int,
"Медиа загрузилось, но Mastodon слишком долго его обрабатывает. Попробуй ещё раз через минуту 🙏",
safe_visibility(status.get("visibility", "public")),
)
continue
visibility = safe_visibility(NSFW_VISIBILITY if is_nsfw else status.get("visibility", "public"))
text = reply_text("NSFW" if is_nsfw else "GIF", query, src, used_mime)
posted = post_status_with_media(mastodon, status_id_int, text, media_id, visibility, nsfw=is_nsfw)
if posted:
logger.info("Posted successfully for user=%s", acct)
else:
logger.error("Failed to post status for user=%s (status_id=%s)", acct, status_id_int)
except ValueError as e:
post_reply_safe(
mastodon,
status_id_int,
f"Не могу загрузить: {e}",
safe_visibility(status.get("visibility", "public")),
)
except requests.RequestException as e:
post_reply_safe(
mastodon,
status_id_int,
f"Ошибка сети: {e}",
safe_visibility(status.get("visibility", "public")),
)
except MastodonError as e:
logger.error("Mastodon error: %s", e)
except Exception as e:
logger.exception("Unexpected error: %s", e)
post_reply_safe(
mastodon,
status_id_int,
f"Неожиданная ошибка: {e}",
safe_visibility(status.get("visibility", "public")),
)
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()