191 lines
6.9 KiB
Python
191 lines
6.9 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
import requests
|
|
|
|
# =========================
|
|
# Configuration
|
|
# =========================
|
|
|
|
DISCORD_USER_ID_TO_PING = "736200712169455656"
|
|
DISCORD_MENTION = f"<@{DISCORD_USER_ID_TO_PING}>"
|
|
|
|
PRODUCT_URL = f"{os.environ.get('PRODUCT_URL', '').rstrip('/')}.js"
|
|
TARGET_VARIANT_ID_RAW = os.environ.get("TARGET_VARIANT_ID", "")
|
|
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL", "")
|
|
|
|
CHECK_INTERVAL_SECONDS = 30
|
|
POST_DETECTION_COOLDOWN_SECONDS = 600
|
|
|
|
# =========================
|
|
# Logging Setup
|
|
# =========================
|
|
|
|
logger = logging.getLogger("FormDStockMonitor")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
log_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
# Write logs to a rotating file so disk space is not exhausted
|
|
file_handler = RotatingFileHandler("formd_monitor.log", maxBytes=5 * 1024 * 1024, backupCount=2)
|
|
file_handler.setFormatter(log_format)
|
|
logger.addHandler(file_handler)
|
|
|
|
# Also show logs directly in the terminal
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(log_format)
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
def _require_env(name: str, value: str) -> str:
|
|
"""Return the env var value; raises if missing or empty."""
|
|
value = (value or "").strip()
|
|
if not value:
|
|
raise RuntimeError(f"Missing required environment variable: {name}")
|
|
return value
|
|
|
|
|
|
def _normalize_variant_id(raw: str) -> str:
|
|
"""Return a string form of the variant id; raises if invalid."""
|
|
raw = (raw or "").strip()
|
|
if not raw:
|
|
raise RuntimeError("TARGET_VARIANT_ID is empty")
|
|
try:
|
|
return str(int(raw))
|
|
except ValueError as exc:
|
|
raise RuntimeError("TARGET_VARIANT_ID must be an integer-like value") from exc
|
|
|
|
|
|
def validate_config() -> tuple[str, str, str]:
|
|
"""Validate required config and return normalized (product_url, variant_id, webhook_url)."""
|
|
product_url = _require_env("PRODUCT_URL", PRODUCT_URL)
|
|
variant_id = _normalize_variant_id(TARGET_VARIANT_ID_RAW)
|
|
webhook_url = _require_env("DISCORD_WEBHOOK_URL", DISCORD_WEBHOOK_URL)
|
|
|
|
if not webhook_url.startswith("https://"):
|
|
raise RuntimeError("DISCORD_WEBHOOK_URL must start with https://")
|
|
|
|
return product_url, variant_id, webhook_url
|
|
|
|
|
|
# =========================
|
|
# Discord Notification
|
|
# =========================
|
|
|
|
def send_discord_notification(product_name, variant_name, checkout_link):
|
|
"""Sends a stock alert to Discord using a webhook."""
|
|
logger.critical("Stock detected! Sending Discord notification.")
|
|
|
|
payload = {"content": (f"{DISCORD_MENTION}\n"
|
|
"🚨 **RESTOCK ALERT** 🚨\n\n"
|
|
f"**{product_name}**\n"
|
|
f"Variant: {variant_name}\n\n"
|
|
f"👉 [ONE-CLICK CHECKOUT]({checkout_link})")}
|
|
|
|
try:
|
|
response = requests.post(DISCORD_WEBHOOK_URL, json=payload, timeout=10)
|
|
|
|
if response.status_code == 204:
|
|
logger.info("Discord notification sent successfully.")
|
|
else:
|
|
logger.error("Discord notification failed. "
|
|
f"HTTP status code: {response.status_code}; response: {response.text[:500]}")
|
|
|
|
except Exception as exception:
|
|
logger.error("Failed to send Discord notification due to an exception.", exc_info=exception)
|
|
|
|
|
|
def send_startup_test_ping():
|
|
"""Sends a one-time hello-world message to validate webhook + mentions."""
|
|
logger.info("Sending startup Discord test ping...")
|
|
|
|
payload = {"content": (f"{DISCORD_MENTION} Hello world! "
|
|
"✅ Webhook connection is working and user mention should ping.")}
|
|
|
|
try:
|
|
response = requests.post(DISCORD_WEBHOOK_URL, json=payload, timeout=10)
|
|
|
|
if response.status_code == 204:
|
|
logger.info("Startup Discord test ping sent successfully.")
|
|
else:
|
|
logger.error("Startup Discord test ping failed. "
|
|
f"HTTP status code: {response.status_code}; response: {response.text[:500]}")
|
|
|
|
except Exception as exception:
|
|
logger.error("Failed to send startup test ping due to an exception.", exc_info=exception)
|
|
|
|
|
|
# =========================
|
|
# Stock Monitoring Logic
|
|
# =========================
|
|
|
|
def check_stock_continuously():
|
|
"""Continuously checks the product JSON endpoint to see if the target variant is available for purchase."""
|
|
product_url, target_variant_id, _ = validate_config()
|
|
|
|
logger.info(f"Stock monitor started. Watching variant ID: {target_variant_id}")
|
|
|
|
request_headers = {"User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/120.0.0.0 Safari/537.36"), "Accept": "application/json", }
|
|
|
|
while True:
|
|
try:
|
|
response = requests.get(product_url, headers=request_headers, timeout=10)
|
|
|
|
if response.status_code != 200:
|
|
logger.error(f"HTTP request failed. Status code: {response.status_code}")
|
|
time.sleep(CHECK_INTERVAL_SECONDS)
|
|
continue
|
|
|
|
product_data = response.json()
|
|
product_name = product_data.get("title", "FormD T1")
|
|
variants = product_data.get("variants", [])
|
|
|
|
target_variant = next((variant for variant in variants if str(variant.get("id")) == target_variant_id),
|
|
None, )
|
|
|
|
if not target_variant:
|
|
logger.error(f"Variant ID {target_variant_id} was not found in the product data.")
|
|
time.sleep(CHECK_INTERVAL_SECONDS)
|
|
continue
|
|
|
|
variant_name = target_variant.get("title", "Unknown Variant")
|
|
is_in_stock = target_variant.get("available", False)
|
|
|
|
if is_in_stock:
|
|
logger.warning(f"Stock available! Variant detected: {variant_name}")
|
|
|
|
checkout_link = f"https://formdt1.com/cart/{target_variant_id}:1"
|
|
|
|
send_discord_notification(product_name, variant_name, checkout_link)
|
|
|
|
logger.info("Cooldown started to prevent duplicate alerts.")
|
|
time.sleep(POST_DETECTION_COOLDOWN_SECONDS)
|
|
|
|
else:
|
|
logger.info(f"Out of stock. Checked variant: {variant_name}")
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error("Network connection error. Unable to reach the product server.")
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.warning("Request timed out. The server may be slow or blocking requests.")
|
|
|
|
except Exception:
|
|
logger.exception("An unexpected error occurred during the stock check.")
|
|
|
|
time.sleep(CHECK_INTERVAL_SECONDS)
|
|
|
|
|
|
# =========================
|
|
# Application Entry Point
|
|
# =========================
|
|
|
|
if __name__ == "__main__":
|
|
validate_config()
|
|
send_startup_test_ping()
|
|
check_stock_continuously()
|