""" fgs-agent — First Gain Stage metal discovery pipeline Standalone FastAPI service; n8n just POSTs to /api/fgs/run on a schedule. Endpoints: POST /api/fgs/run — trigger a discovery run (async, returns run_id) GET /api/fgs/status — current run status GET /api/fgs/picks — read metal-picks.json GET /api/fgs/health — liveness check """ import asyncio import json import logging import math import re import time import uuid from datetime import datetime, timezone from typing import Any, Optional import aiosqlite import httpx import uvicorn from fastapi import BackgroundTasks, FastAPI from fastapi.responses import JSONResponse from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("fgs-agent") # ── Config ────────────────────────────────────────────────────────────────── class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") lastfm_username: str = "nick2day" lastfm_api_key: str = "" searxng_url: str = "http://localhost:8080" ollama_url: str = "http://localhost:11434" picks_path: str = "/home/nick2day/.n8n/metal-picks.json" dedup_db: str = "/home/nick2day/fgs-agent/dedup.db" port: int = 8766 settings = Settings() # ── SEARXNG QUERY BANK ─────────────────────────────────────────────────────── # All 29 queries — dynamic year, rotating subgenre index SUBGENRES = [ "death metal", "black metal", "doom metal", "thrash metal", "sludge metal", "progressive metal", "blackened death metal", "post-metal", "funeral doom", "grindcore", "melodic death metal", "technical death metal", "atmospheric black metal", "war metal", "drone metal", "stoner metal", "gothic metal", "power metal", "viking metal", "brutal death metal", ] def build_queries() -> list[str]: year = datetime.now().year # rotate subgenre every 6 hours so successive runs hit different niches idx = int(time.time() // 21600) % len(SUBGENRES) sg = SUBGENRES[idx] sg2 = SUBGENRES[(idx + 1) % len(SUBGENRES)] sg3 = SUBGENRES[(idx + 2) % len(SUBGENRES)] return [ # Subgenre new releases f'{sg} new album release {year}', f'{sg2} new album {year}', f'{sg3} full album stream {year}', # Bandcamp label searches f'site:bandcamp.com "sentient ruin" new {year}', f'site:bandcamp.com "20 buck spin" new {year}', f'site:bandcamp.com "prosthetic records" {year}', f'site:bandcamp.com "unique leader records" {year}', f'site:bandcamp.com "redefining darkness" {year}', f'site:bandcamp.com "profound lore" {year}', f'site:bandcamp.com "season of mist" {year}', f'site:bandcamp.com "century media" {year}', f'site:bandcamp.com "nuclear blast" {year}', # Bandcamp genre pages f'site:bandcamp.com {sg} album {year}', f'site:bandcamp.com {sg2} album {year}', # Metal reviews / coverage f'site:metal-archives.com new album review {year}', f'site:metalinjection.net new album {year}', f'site:cvltnation.com new album {year}', f'site:decibelmag.com new album stream {year}', f'site:invisibleoranges.com album review {year}', f'site:brooklynvegan.com {sg} new album {year}', # Obscure/underground f'underground {sg} demo release {year}', f'independent {sg} album bandcamp {year}', f'new {sg} EP release {year}', # Michigan / regional bias f'Michigan metal band new album {year}', f'Detroit metal band new release {year}', # Best-of / roundup (useful for discovery) f'best {sg} albums {year}', f'new {sg} releases {year} bandcamp', # Last.fm tag pages f'site:last.fm {sg} new {year}', # Full-album YouTube f'"{sg}" "full album" {year} new band', ] # ── NOISE FILTERS ──────────────────────────────────────────────────────────── NOISE_PATTERNS = [ re.compile(r'wikipedia\.org', re.I), re.compile(r'\bin heavy metal music\b', re.I), re.compile(r'album release calendar', re.I), re.compile(r'most anticipated', re.I), re.compile(r'\byear in review\b', re.I), re.compile(r'^top \d+', re.I), re.compile(r'albums you need to hear', re.I), re.compile(r'upcoming.*releases', re.I), re.compile(r'\bto release\b.*album', re.I), re.compile(r'\bannounce[sd]?\b.*album', re.I), re.compile(r'\bdelayed\b', re.I), re.compile(r'best.*releases', re.I), re.compile(r'\d+ metal albums.*20\d\d', re.I), re.compile(r'shatner', re.I), re.compile(r'mark your calendars', re.I), re.compile(r'\bvideo game\b', re.I), re.compile(r'\bsoundtrack\b', re.I), re.compile(r'\btribute\b', re.I), re.compile(r'\bcovers album\b', re.I), re.compile(r'heavy metal music - wikipedia', re.I), re.compile(r'in \d{4} - wikipedia', re.I), ] def is_noise(title: str, url: str) -> bool: text = f"{title} {url}" return any(p.search(text) for p in NOISE_PATTERNS) def parse_title(title: str) -> tuple[str, str]: """Extract (artist, album) from common title formats.""" # "Album Title, by Artist Name" — Bandcamp format m = re.match(r'^(.+?),\s+by\s+(.+)$', title, re.I) if m: return m.group(2).strip(), m.group(1).strip() # "Artist - Album" or "Artist – Album" m = re.match(r'^([^-–]+?)\s*[-–]\s*(.+)$', title) if m and len(m.group(1)) < 60: return m.group(1).strip(), m.group(2).strip() return '', title.strip() def artist_from_url(url: str) -> str: """Extract artist slug from Bandcamp URL.""" m = re.match(r'https?://([^.]+)\.bandcamp\.com', url, re.I) if m: return m.group(1).replace('-', ' ') return '' # ── DEDUP ──────────────────────────────────────────────────────────────────── async def init_dedup_db(): async with aiosqlite.connect(settings.dedup_db) as db: await db.execute( "CREATE TABLE IF NOT EXISTS seen (key TEXT PRIMARY KEY, ts INTEGER)" ) await db.commit() async def filter_unseen(candidates: list[dict]) -> list[dict]: """Remove already-seen albums. Returns only fresh candidates.""" async with aiosqlite.connect(settings.dedup_db) as db: fresh = [] for c in candidates: key = f"{c.get('artist','').lower()}::{c.get('album','').lower()}" if not key or key == '::': key = c.get('url', c.get('album', '')) row = await db.execute("SELECT 1 FROM seen WHERE key=?", (key,)) if not await row.fetchone(): fresh.append(c) return fresh async def mark_seen(picks: list[dict]): async with aiosqlite.connect(settings.dedup_db) as db: ts = int(time.time()) for p in picks: key = f"{p.get('artist','').lower()}::{p.get('album','').lower()}" if key and key != '::': await db.execute( "INSERT OR IGNORE INTO seen (key, ts) VALUES (?, ?)", (key, ts) ) await db.commit() # ── PIPELINE STAGES ────────────────────────────────────────────────────────── async def fetch_lastfm(client: httpx.AsyncClient) -> dict: """Fetch top artists + recent tracks from Last.fm.""" base = "https://ws.audioscrobbler.com/2.0/" params_top = { "method": "user.gettopartists", "user": settings.lastfm_username, "api_key": settings.lastfm_api_key, "period": "12month", "limit": "100", "format": "json", } params_recent = { "method": "user.getrecenttracks", "user": settings.lastfm_username, "api_key": settings.lastfm_api_key, "limit": "1000", "format": "json", } top_r, recent_r = await asyncio.gather( client.get(base, params=params_top, timeout=15), client.get(base, params=params_recent, timeout=15), ) return { "top": top_r.json() if top_r.status_code == 200 else {}, "recent": recent_r.json() if recent_r.status_code == 200 else {}, } def build_taste_profile(lastfm: dict) -> dict: """Build taste centroid from Last.fm data.""" top_artists_raw = lastfm.get("top", {}).get("topartists", {}).get("artist", []) recent_tracks_raw = lastfm.get("recent", {}).get("recenttracks", {}).get("track", []) top_artists = [ {"name": a["name"], "playcount": int(a.get("playcount", 0)), "rank": int(a.get("@attr", {}).get("rank", 99))} for a in top_artists_raw[:100] ] recent_map: dict[str, int] = {} for t in recent_tracks_raw: name = (t.get("artist") or {}).get("#text", "") if name: recent_map[name] = recent_map.get(name, 0) + 1 recent_artists = sorted(recent_map.items(), key=lambda x: -x[1])[:20] return { "topArtists": top_artists, "recentArtists": [{"name": n, "plays": p} for n, p in recent_artists], } async def search_candidates(client: httpx.AsyncClient) -> list[dict]: """Run all SearXNG queries concurrently, collect and deduplicate results.""" queries = build_queries() log.info(f"Running {len(queries)} SearXNG queries") async def one_query(q: str) -> list[dict]: try: r = await client.get( f"{settings.searxng_url}/search", params={"q": q, "format": "json", "time_range": "month"}, timeout=20, ) if r.status_code != 200: return [] data = r.json() results = data.get("results", [])[:15] # top 15 per query out = [] for res in results: title = res.get("title", "") url = res.get("url", "") if is_noise(title, url): continue artist, album = parse_title(title) if not artist: artist = artist_from_url(url) out.append({ "title": title, "url": url, "artist": artist, "album": album, "source": q[:60], "content": (res.get("content") or "")[:200], }) return out except Exception as e: log.warning(f"SearXNG query failed: {q[:40]} — {e}") return [] results = await asyncio.gather(*[one_query(q) for q in queries]) all_results = [r for batch in results for r in batch] # Deduplicate by URL seen_urls: set[str] = set() unique = [] for r in all_results: u = r["url"] if u and u not in seen_urls: seen_urls.add(u) unique.append(r) log.info(f"Collected {len(unique)} unique candidates from search") return unique async def embed_text(client: httpx.AsyncClient, text: str) -> list[float]: try: r = await client.post( f"{settings.ollama_url}/api/embeddings", json={"model": "nomic-embed-text", "prompt": text}, timeout=30, ) if r.status_code == 200: return r.json().get("embedding", []) except Exception as e: log.warning(f"Embed failed: {e}") return [] def cosine_similarity(a: list[float], b: list[float]) -> float: if not a or not b or len(a) != len(b): return 0.0 dot = sum(x * y for x, y in zip(a, b)) mag_a = math.sqrt(sum(x * x for x in a)) mag_b = math.sqrt(sum(x * x for x in b)) if mag_a == 0 or mag_b == 0: return 0.0 return dot / (mag_a * mag_b) async def score_candidates( client: httpx.AsyncClient, candidates: list[dict], taste: dict ) -> list[dict]: """Embed candidates and score against taste centroid.""" if not candidates: return [] # Build taste centroid from top artists top_names = [a["name"] for a in taste.get("topArtists", [])[:30]] centroid_text = "Heavy metal artist: " + ", ".join(top_names) log.info("Embedding taste centroid") centroid_vec = await embed_text(client, centroid_text) if not centroid_vec: log.warning("Centroid embedding failed — scoring by 0") for c in candidates: c["embedScore"] = 0.0 return candidates log.info(f"Scoring {len(candidates)} candidates against taste centroid") scored = [] # Batch embed in groups to avoid hammering Ollama for i, c in enumerate(candidates): text = f"Metal album: {c['artist']} - {c['album']}" if c.get("artist") else f"Metal album: {c['album']}" vec = await embed_text(client, text) c["embedScore"] = round(cosine_similarity(centroid_vec, vec), 4) if vec else 0.0 if i % 20 == 0: log.info(f" scored {i}/{len(candidates)}") scored.append(c) scored.sort(key=lambda x: -x.get("embedScore", 0)) return scored async def prefilter_candidates( client: httpx.AsyncClient, candidates: list[dict] ) -> list[dict]: """Use mistral-nemo to prefilter — keep actual album releases, drop noise.""" if not candidates: return [] # Send in batches of 40 batch = candidates[:80] items_for_prompt = [ {"index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "url": c.get("url", "")} for i, c in enumerate(batch) ] prompt = f"""You are a metal music expert. Review these {len(items_for_prompt)} items from web searches. Return ONLY a JSON array of items that are actual metal album or EP releases (not news articles, not Wikipedia, not lists). For each item kept, include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0). Items: {json.dumps(items_for_prompt, indent=2)} Return ONLY valid JSON array starting with [""" body = { "model": "mistral-nemo:latest", "messages": [{"role": "user", "content": prompt}], "stream": False, "options": {"temperature": 0.1, "num_predict": 4000}, } try: r = await client.post( f"{settings.ollama_url}/api/chat", json=body, timeout=120, ) raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else "" except Exception as e: log.error(f"Prefilter failed: {e}") return candidates[:40] # fall through with top candidates # Extract JSON from response parsed = extract_json_array(raw) if not parsed: log.warning("Prefilter returned no parseable JSON — using all candidates") return candidates[:40] log.info(f"Prefilter kept {len(parsed)}/{len(batch)} candidates") # Map back to full candidate objects with prefilter data enriched = [] for p in parsed: idx = p.get("index") if idx is None or idx >= len(batch): continue orig = batch[idx].copy() orig["artist"] = p.get("artist") or orig.get("artist", "") orig["album"] = p.get("album") or orig.get("album", "") orig["prefilter"] = { "subgenre": p.get("subgenre", "unknown"), "confidence": p.get("confidence", 0.5), } enriched.append(orig) return enriched async def curate_picks( client: httpx.AsyncClient, candidates: list[dict], taste: dict ) -> list[dict]: """Use mistral-nemo to produce final scored picks with full metadata.""" if not candidates: return [] top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:20] top_artists = [a["name"] for a in taste.get("topArtists", [])[:20]] recent_artists = [a["name"] for a in taste.get("recentArtists", [])[:10]] year = datetime.now().year items_for_prompt = [ { "index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "subgenre": (c.get("prefilter") or {}).get("subgenre", "unknown"), "url": c.get("url", ""), "source": c.get("source", ""), } for i, c in enumerate(top) ] prompt = f"""You are a heavy metal curator reviewing NEW {year} releases found via web search. IMPORTANT: Only score albums from the list below. Do NOT recommend albums not in this list. Do NOT recommend classic albums. These must be actual {year} releases found at the provided URLs. If you are not sure an item is a real new release, skip it. nick2day's taste profile: - Top artists (12-month): {', '.join(top_artists)} - Recent listens: {', '.join(recent_artists)} New {year} releases to evaluate (use ONLY these): {json.dumps(items_for_prompt, indent=2)} For each item worth recommending, output a JSON object with: index (integer, from above), artist, album, score (0-100, based on fit to taste profile), subgenre, obscurity (underground/indie/major), similar_to (2-3 artists from taste profile), why (1 sentence), review_angle (unique angle), tags (string array), url (copy from above) Skip items without a clear artist + album. Skip classics or non-{year} releases. Return ONLY a JSON array starting with [""" body = { "model": "mistral-nemo:latest", "messages": [{"role": "user", "content": prompt}], "stream": False, "options": {"temperature": 0.3, "num_predict": 8000}, } # Build index → candidate map for provenance validation top_by_index = {i: c for i, c in enumerate(top)} known_urls = {c.get("url", "") for c in top if c.get("url")} raw = "" for attempt in range(3): try: r = await client.post( f"{settings.ollama_url}/api/chat", json=body, timeout=180, ) raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else "" picks = extract_json_array(raw) if picks: # Validate provenance: url must match a search result OR index must be valid validated = [] for p in picks: if p.get("score", 0) < 60: continue idx = p.get("index") url = p.get("url", "") if idx is not None and idx in top_by_index: # Merge URL from original candidate if LLM dropped it if not url: p["url"] = top_by_index[idx].get("url", "") validated.append(p) elif url and url in known_urls: validated.append(p) else: log.debug(f"Dropped hallucinated pick: {p.get('artist')} — {p.get('album')}") log.info(f"Curator returned {len(validated)} validated picks (attempt {attempt+1})") if validated: return validated except Exception as e: log.error(f"Curator attempt {attempt+1} failed: {e}") await asyncio.sleep(2) log.warning("Curator failed all attempts") return [] def extract_json_array(text: str) -> list: """Robustly extract a JSON array from LLM output.""" if not text: return [] # Strip markdown fences cleaned = re.sub(r'```(?:json)?\s*', '', text).strip() # Try direct parse try: data = json.loads(cleaned) if isinstance(data, list): return data if isinstance(data, dict): for v in data.values(): if isinstance(v, list): return v except Exception: pass # Find first [ to last ] start = cleaned.find('[') end = cleaned.rfind(']') if start != -1 and end > start: try: data = json.loads(cleaned[start:end+1]) if isinstance(data, list): return data except Exception: pass return [] async def write_picks(new_picks: list[dict], run_stats: dict): """Append new picks to metal-picks.json.""" try: with open(settings.picks_path, "r") as f: store = json.load(f) except Exception: store = {"allPicks": [], "runs": [], "lastUpdated": ""} # Add run record run = { "timestamp": datetime.now(timezone.utc).isoformat(), "source": "fgs-agent", "picksAdded": len(new_picks), "candidatesSearched": run_stats.get("candidates", 0), "candidatesFiltered": run_stats.get("filtered", 0), } store["runs"] = ([run] + store.get("runs", []))[:10] store["allPicks"] = (new_picks + store.get("allPicks", []))[:200] store["lastUpdated"] = run["timestamp"] with open(settings.picks_path, "w") as f: json.dump(store, f, indent=2) log.info(f"Wrote {len(new_picks)} new picks to {settings.picks_path}") # ── RUN STATE ──────────────────────────────────────────────────────────────── run_state: dict[str, Any] = { "status": "idle", "run_id": None, "started": None, "finished": None, "picks_added": 0, "candidates_found": 0, "error": None, "log": [], } _run_lock = asyncio.Lock() async def pipeline_run(run_id: str): global run_state def log_step(msg: str): log.info(msg) run_state["log"].append({"ts": datetime.now(timezone.utc).isoformat(), "msg": msg}) async with _run_lock: run_state.update({ "status": "running", "run_id": run_id, "started": datetime.now(timezone.utc).isoformat(), "finished": None, "picks_added": 0, "candidates_found": 0, "error": None, "log": [], }) try: async with httpx.AsyncClient() as client: log_step("Fetching Last.fm data") lastfm = await fetch_lastfm(client) taste = build_taste_profile(lastfm) log_step(f"Taste profile: {len(taste['topArtists'])} top artists, {len(taste['recentArtists'])} recent") log_step("Searching candidates") candidates = await search_candidates(client) run_state["candidates_found"] = len(candidates) if not candidates: log_step("No candidates found — ending run") run_state["status"] = "done" run_state["finished"] = datetime.now(timezone.utc).isoformat() return log_step("Filtering seen albums") fresh = await filter_unseen(candidates) log_step(f"{len(fresh)} fresh candidates (of {len(candidates)} total)") if not fresh: log_step("All candidates already seen — ending run") run_state["status"] = "done" run_state["finished"] = datetime.now(timezone.utc).isoformat() return log_step("Scoring by embedding similarity") scored = await score_candidates(client, fresh, taste) log_step("Prefiltering with Mistral") prefiltered = await prefilter_candidates(client, scored) log_step(f"{len(prefiltered)} candidates passed prefilter") log_step("Curating picks with Mistral") picks = await curate_picks(client, prefiltered, taste) log_step(f"{len(picks)} picks scored ≥60") if picks: # Mark all prefiltered candidates as seen (not just picks) to avoid re-processing await mark_seen(prefiltered) await write_picks(picks, { "candidates": len(candidates), "filtered": len(prefiltered), }) run_state.update({ "status": "done", "finished": datetime.now(timezone.utc).isoformat(), "picks_added": len(picks), }) log_step(f"Run complete — {len(picks)} new picks added") except Exception as e: log.exception(f"Pipeline run failed: {e}") run_state.update({ "status": "error", "finished": datetime.now(timezone.utc).isoformat(), "error": str(e), }) # ── API ─────────────────────────────────────────────────────────────────────── app = FastAPI(title="fgs-agent", description="First Gain Stage metal discovery") @app.on_event("startup") async def startup(): await init_dedup_db() log.info(f"fgs-agent started on port {settings.port}") @app.post("/api/fgs/run") async def trigger_run(background_tasks: BackgroundTasks): if run_state["status"] == "running": return JSONResponse({"status": "already_running", "run_id": run_state["run_id"]}, status_code=409) run_id = str(uuid.uuid4())[:8] background_tasks.add_task(pipeline_run, run_id) return {"status": "started", "run_id": run_id} @app.get("/api/fgs/status") async def get_status(): return run_state @app.get("/api/fgs/picks") async def get_picks(): try: with open(settings.picks_path, "r") as f: return json.load(f) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.get("/api/fgs/health") async def health(): return {"status": "ok", "agent": "fgs-agent"} # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": uvicorn.run("agent:app", host="0.0.0.0", port=settings.port, reload=False)