You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

943 lines
34 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
fgs-agent — First Gain Stage metal discovery pipeline
Standalone FastAPI service; n8n just POSTs to /api/fgs/run on a schedule.
Endpoints:
POST /api/fgs/run — trigger a discovery run (async, returns run_id)
GET /api/fgs/status — current run status
GET /api/fgs/picks — read metal-picks.json
GET /api/fgs/health — liveness check
"""
import asyncio
import json
import logging
import math
import re
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
import aiosqlite
import httpx
import uvicorn
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("fgs-agent")
# ── Config ──────────────────────────────────────────────────────────────────
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
lastfm_username: str = "nick2day"
lastfm_api_key: str = ""
searxng_url: str = "http://localhost:8080"
ollama_url: str = "http://localhost:11434"
picks_path: str = "/home/nick2day/.n8n/metal-picks.json"
dedup_db: str = "/home/nick2day/fgs-agent/dedup.db"
port: int = 8766
settings = Settings()
# ── SEARXNG QUERY BANK ───────────────────────────────────────────────────────
SUBGENRES = [
"death metal", "black metal", "doom metal", "thrash metal", "sludge metal",
"progressive metal", "blackened death metal", "post-metal", "funeral doom", "grindcore",
"melodic death metal", "technical death metal", "atmospheric black metal", "war metal",
"drone metal", "stoner metal", "gothic metal", "power metal", "viking metal", "brutal death metal",
"deathcore", "mathcore", "noise rock", "crust punk", "black doom", "death doom",
"dissonant death metal", "ambient black metal", "depressive black metal", "crossover thrash",
]
# Labels to search on Bandcamp — no time_range, broad results
BANDCAMP_LABELS = [
"sentient ruin",
"20 buck spin",
"prosthetic records",
"unique leader records",
"redefining darkness",
"profound lore",
"season of mist",
"century media",
"nuclear blast",
"relapse records",
"metal blade records",
"dark descent records",
"iron bonehead",
"les acteurs de l'ombre",
"vrasubatlat",
"me saco un ojo",
"hells headbangers",
"invictus productions",
"blood harvest",
"chaos records",
"adagio 830",
"floga records",
"sewer rot records",
"lavadome productions",
"memento mori",
]
# Review/news sites — use time_range year to get fresh but not stale
REVIEW_SITES = [
"metalinjection.net",
"cvltnation.com",
"decibelmag.com",
"invisibleoranges.com",
"brooklynvegan.com",
"heavyblogisheavy.com",
"nocturnalcult.com",
"themetalcrypt.com",
"angrymetalguy.com",
"sputnikmusic.com",
"nocleansinging.com",
"rateyourmusic.com",
"terrorizer.com",
"kerrang.com",
]
def build_queries() -> list[str]:
year = datetime.now().year
# Rotate through 6 subgenres per run (changes every 4 hours)
idx = int(time.time() // 14400) % len(SUBGENRES)
sgs = [SUBGENRES[(idx + i) % len(SUBGENRES)] for i in range(6)]
# Rotate through label batches (every 6 hours, 5 labels at a time)
label_idx = int(time.time() // 21600) % len(BANDCAMP_LABELS)
labels = [BANDCAMP_LABELS[(label_idx + i) % len(BANDCAMP_LABELS)] for i in range(5)]
# Rotate review sites (every 12 hours, 4 sites at a time)
site_idx = int(time.time() // 43200) % len(REVIEW_SITES)
sites = [REVIEW_SITES[(site_idx + i) % len(REVIEW_SITES)] for i in range(4)]
queries = []
# Subgenre releases — mix of year-scoped and unscoped for breadth
for sg in sgs[:3]:
queries.append(f'{sg} new album release {year}')
for sg in sgs[3:]:
queries.append(f'{sg} new album bandcamp')
# Bandcamp direct — no time_range, catches more
for sg in sgs[:2]:
queries.append(f'site:bandcamp.com {sg} album {year}')
queries.append(f'site:bandcamp.com {sg} new release')
# Label searches — broad, no time restriction
for label in labels:
queries.append(f'site:bandcamp.com "{label}"')
# Review sites with year scope
for site in sites:
queries.append(f'site:{site} album review {year}')
queries.append(f'site:{site} new album {year}')
# Bandcamp underground / demo scene
for sg in sgs[:2]:
queries.append(f'underground {sg} demo {year} bandcamp')
queries.append(f'new {sg} EP {year} bandcamp')
# Michigan / regional
queries.append(f'Michigan metal band new album {year}')
queries.append(f'Detroit metal band new release {year}')
queries.append(f'Midwest metal new release {year} bandcamp')
# Metal Archives new reviews (no time_range — MA is evergreen)
queries.append(f'site:metal-archives.com album review {year}')
queries.append(f'site:metal-archives.com new band {year}')
# Broad discovery
queries.append(f'best underground metal albums {year}')
queries.append(f'new metal releases {year} bandcamp full stream')
queries.append(f'hidden gem metal album {year}')
queries.append(f'obscure metal release {year}')
queries.append(f'metal full album stream {year} new band')
return queries
# ── NOISE FILTERS ────────────────────────────────────────────────────────────
NOISE_PATTERNS = [
re.compile(r'wikipedia\.org', re.I),
re.compile(r'\bin heavy metal music\b', re.I),
re.compile(r'album release calendar', re.I),
re.compile(r'most anticipated', re.I),
re.compile(r'\byear in review\b', re.I),
re.compile(r'^top \d+', re.I),
re.compile(r'albums you need to hear', re.I),
re.compile(r'upcoming.*releases', re.I),
re.compile(r'\bto release\b.*album', re.I),
re.compile(r'\bannounce[sd]?\b.*album', re.I),
re.compile(r'\bdelayed\b', re.I),
re.compile(r'best.*releases', re.I),
re.compile(r'\d+ metal albums.*20\d\d', re.I),
re.compile(r'shatner', re.I),
re.compile(r'mark your calendars', re.I),
re.compile(r'\bvideo game\b', re.I),
re.compile(r'\bsoundtrack\b', re.I),
re.compile(r'\btribute\b', re.I),
re.compile(r'\bcovers album\b', re.I),
re.compile(r'heavy metal music - wikipedia', re.I),
re.compile(r'in \d{4} - wikipedia', re.I),
]
def is_noise(title: str, url: str) -> bool:
text = f"{title} {url}"
return any(p.search(text) for p in NOISE_PATTERNS)
LABEL_SUFFIXES = re.compile(
r'\s*[-|]\s*(bandcamp|records|recordings|productions|music|metal|label|distro|'
r'sentient ruin|dark descent|20 buck spin|relapse|prosthetic|nuclear blast|'
r'season of mist|century media|profound lore|iron bonehead|hells headbangers|'
r'blood harvest|invictus productions|metal blade|redefining darkness)\b.*$',
re.I
)
def clean_name(s: str) -> str:
"""Strip label suffixes and trim whitespace."""
return LABEL_SUFFIXES.sub('', s).strip().strip('|').strip()
def parse_title(title: str) -> tuple[str, str]:
"""Extract (artist, album) from common title formats."""
# "Album Title, by Artist Name" — Bandcamp standard format
m = re.match(r'^(.+?),\s+by\s+(.+)$', title, re.I)
if m:
return clean_name(m.group(2)), clean_name(m.group(1))
# Bandcamp search result: "Album Title | Artist Name — Label - Bandcamp"
# Stop group 2 at em-dash so label suffix doesn't bleed in
pipe_m = re.match(r'^(.+?)\s*\|\s*([^—–]+?)(?:\s*[—–].+)?$', title)
if pipe_m:
left = clean_name(pipe_m.group(1))
right = clean_name(pipe_m.group(2))
# Bandcamp puts album first, artist second
if right and len(right) < 60:
return right, left
# "Artist - Album" or "Artist Album"
m = re.match(r'^([^-|]+?)\s*[-]\s*(.+)$', title)
if m and len(m.group(1)) < 60:
return clean_name(m.group(1)), clean_name(m.group(2))
return '', clean_name(title)
def artist_from_url(url: str) -> str:
"""Extract artist slug from Bandcamp URL."""
m = re.match(r'https?://([^.]+)\.bandcamp\.com', url, re.I)
if m:
return m.group(1).replace('-', ' ').title()
return ''
_BAD_FIELD = re.compile(
r'bandcamp|recordings?|productions?|distro|\|', re.I
)
# Known review/blog domains — artist should never be the site name
_REVIEW_DOMAINS = re.compile(
r'metalinjection\.net|cvltnation\.com|decibelmag\.com|invisibleoranges\.com|'
r'brooklynvegan\.com|heavyblogisheavy\.com|angrymetalguy\.com|sputnikmusic\.com|'
r'nocleansinging\.com|meatmeadmetal\.com|terrorizer\.com|kerrang\.com|'
r'loudwire\.com|nocturnalcult\.com|themetalcrypt\.com|rateyourmusic\.com',
re.I
)
def looks_like_bad_pick(p: dict) -> bool:
"""True if artist/album fields are clearly garbage."""
artist = p.get("artist", "")
album = p.get("album", "")
if not artist and not album:
return True
if _BAD_FIELD.search(artist):
return True
if "|" in album:
return True
if len(artist) > 70 or len(album) > 120:
return True
return False
# ── DEDUP ────────────────────────────────────────────────────────────────────
async def init_dedup_db():
async with aiosqlite.connect(settings.dedup_db) as db:
await db.execute(
"CREATE TABLE IF NOT EXISTS seen (key TEXT PRIMARY KEY, ts INTEGER)"
)
await db.commit()
async def filter_unseen(candidates: list[dict]) -> list[dict]:
"""Remove already-seen albums. Returns only fresh candidates."""
async with aiosqlite.connect(settings.dedup_db) as db:
fresh = []
for c in candidates:
key = f"{c.get('artist','').lower()}::{c.get('album','').lower()}"
if not key or key == '::':
key = c.get('url', c.get('album', ''))
row = await db.execute("SELECT 1 FROM seen WHERE key=?", (key,))
if not await row.fetchone():
fresh.append(c)
return fresh
async def mark_seen(picks: list[dict]):
async with aiosqlite.connect(settings.dedup_db) as db:
ts = int(time.time())
for p in picks:
key = f"{p.get('artist','').lower()}::{p.get('album','').lower()}"
if key and key != '::':
await db.execute(
"INSERT OR IGNORE INTO seen (key, ts) VALUES (?, ?)", (key, ts)
)
await db.commit()
# ── PIPELINE STAGES ──────────────────────────────────────────────────────────
async def fetch_lastfm(client: httpx.AsyncClient) -> dict:
"""Fetch top artists + recent tracks from Last.fm."""
base = "https://ws.audioscrobbler.com/2.0/"
params_top = {
"method": "user.gettopartists",
"user": settings.lastfm_username,
"api_key": settings.lastfm_api_key,
"period": "12month",
"limit": "100",
"format": "json",
}
params_recent = {
"method": "user.getrecenttracks",
"user": settings.lastfm_username,
"api_key": settings.lastfm_api_key,
"limit": "1000",
"format": "json",
}
top_r, recent_r = await asyncio.gather(
client.get(base, params=params_top, timeout=15),
client.get(base, params=params_recent, timeout=15),
)
return {
"top": top_r.json() if top_r.status_code == 200 else {},
"recent": recent_r.json() if recent_r.status_code == 200 else {},
}
def build_taste_profile(lastfm: dict) -> dict:
"""Build taste centroid from Last.fm data."""
top_artists_raw = lastfm.get("top", {}).get("topartists", {}).get("artist", [])
recent_tracks_raw = lastfm.get("recent", {}).get("recenttracks", {}).get("track", [])
top_artists = [
{"name": a["name"], "playcount": int(a.get("playcount", 0)), "rank": int(a.get("@attr", {}).get("rank", 99))}
for a in top_artists_raw[:100]
]
recent_map: dict[str, int] = {}
for t in recent_tracks_raw:
name = (t.get("artist") or {}).get("#text", "")
if name:
recent_map[name] = recent_map.get(name, 0) + 1
recent_artists = sorted(recent_map.items(), key=lambda x: -x[1])[:20]
return {
"topArtists": top_artists,
"recentArtists": [{"name": n, "plays": p} for n, p in recent_artists],
}
async def search_candidates(client: httpx.AsyncClient) -> list[dict]:
"""Run all SearXNG queries concurrently, collect and deduplicate results."""
queries = build_queries()
log.info(f"Running {len(queries)} SearXNG queries")
async def one_query(q: str) -> list[dict]:
# Use year time_range only for queries that include the current year literal;
# skip time_range for Bandcamp label/genre queries to maximise breadth
use_time_range = "site:bandcamp.com" not in q and "metal-archives" not in q
params: dict = {"q": q, "format": "json"}
if use_time_range:
params["time_range"] = "year"
try:
r = await client.get(
f"{settings.searxng_url}/search",
params=params,
timeout=25,
)
if r.status_code != 200:
return []
data = r.json()
results = data.get("results", [])[:25] # top 25 per query
out = []
for res in results:
title = res.get("title", "")
url = res.get("url", "")
if is_noise(title, url):
continue
artist, album = parse_title(title)
if not artist:
artist = artist_from_url(url)
out.append({
"title": title,
"url": url,
"artist": artist,
"album": album,
"source": q[:60],
"content": (res.get("content") or "")[:200],
})
return out
except Exception as e:
log.warning(f"SearXNG query failed: {q[:40]}{e}")
return []
results = await asyncio.gather(*[one_query(q) for q in queries])
all_results = [r for batch in results for r in batch]
# Deduplicate by URL
seen_urls: set[str] = set()
unique = []
for r in all_results:
u = r["url"]
if u and u not in seen_urls:
seen_urls.add(u)
unique.append(r)
log.info(f"Collected {len(unique)} unique candidates from search")
return unique
async def embed_text(client: httpx.AsyncClient, text: str) -> list[float]:
try:
r = await client.post(
f"{settings.ollama_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text},
timeout=30,
)
if r.status_code == 200:
return r.json().get("embedding", [])
except Exception as e:
log.warning(f"Embed failed: {e}")
return []
def cosine_similarity(a: list[float], b: list[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0 or mag_b == 0:
return 0.0
return dot / (mag_a * mag_b)
async def score_candidates(
client: httpx.AsyncClient, candidates: list[dict], taste: dict
) -> list[dict]:
"""Embed candidates and score against taste centroid."""
if not candidates:
return []
# Build taste centroid from top artists
top_names = [a["name"] for a in taste.get("topArtists", [])[:30]]
centroid_text = "Heavy metal artist: " + ", ".join(top_names)
log.info("Embedding taste centroid")
centroid_vec = await embed_text(client, centroid_text)
if not centroid_vec:
log.warning("Centroid embedding failed — scoring by 0")
for c in candidates:
c["embedScore"] = 0.0
return candidates
log.info(f"Scoring {len(candidates)} candidates against taste centroid")
scored = []
# Batch embed in groups to avoid hammering Ollama
for i, c in enumerate(candidates):
text = f"Metal album: {c['artist']} - {c['album']}" if c.get("artist") else f"Metal album: {c['album']}"
vec = await embed_text(client, text)
c["embedScore"] = round(cosine_similarity(centroid_vec, vec), 4) if vec else 0.0
if i % 20 == 0:
log.info(f" scored {i}/{len(candidates)}")
scored.append(c)
scored.sort(key=lambda x: -x.get("embedScore", 0))
return scored
async def prefilter_batch(
client: httpx.AsyncClient, batch: list[dict], batch_offset: int
) -> list[dict]:
"""Run prefilter on one batch. batch_offset adjusts index for provenance mapping."""
items_for_prompt = [
{"index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "url": c.get("url", "")}
for i, c in enumerate(batch)
]
prompt = f"""You are a metal music expert. Review these {len(items_for_prompt)} items from web searches.
Return ONLY a JSON array of items that are actual metal album or EP releases (not news, not Wikipedia, not lists, not tour dates).
For each kept item include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0).
Be inclusive — keep anything that looks like it could be a real release even if you're not certain.
Items:
{json.dumps(items_for_prompt, indent=2)}
Return ONLY valid JSON array starting with ["""
body = {
"model": "mistral-nemo:latest",
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 6000},
}
try:
r = await client.post(f"{settings.ollama_url}/api/chat", json=body, timeout=150)
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
except Exception as e:
log.warning(f"Prefilter batch failed: {e}")
return batch # pass through on error
parsed = extract_json_array(raw)
if not parsed:
log.warning(f"Prefilter batch {batch_offset}: no JSON — passing through")
return batch
enriched = []
for p in parsed:
idx = p.get("index")
if idx is None or idx >= len(batch):
continue
orig = batch[idx].copy()
orig["artist"] = p.get("artist") or orig.get("artist", "")
orig["album"] = p.get("album") or orig.get("album", "")
orig["prefilter"] = {
"subgenre": p.get("subgenre", "unknown"),
"confidence": p.get("confidence", 0.5),
}
enriched.append(orig)
return enriched
async def prefilter_candidates(
client: httpx.AsyncClient, candidates: list[dict]
) -> list[dict]:
"""Run prefilter on all candidates in parallel batches of 35."""
if not candidates:
return []
BATCH_SIZE = 35
batches = [candidates[i:i+BATCH_SIZE] for i in range(0, min(len(candidates), 200), BATCH_SIZE)]
log.info(f"Prefiltering {len(candidates)} candidates in {len(batches)} parallel batches")
# Run up to 3 batches concurrently to avoid OOMing Ollama
all_enriched = []
for chunk_start in range(0, len(batches), 3):
chunk = batches[chunk_start:chunk_start+3]
results = await asyncio.gather(*[
prefilter_batch(client, b, chunk_start * BATCH_SIZE + i * BATCH_SIZE)
for i, b in enumerate(chunk)
])
for r in results:
all_enriched.extend(r)
log.info(f"Prefilter kept {len(all_enriched)}/{len(candidates)} candidates")
return all_enriched
async def curate_picks(
client: httpx.AsyncClient, candidates: list[dict], taste: dict
) -> list[dict]:
"""Use mistral-nemo to produce final scored picks with full metadata."""
if not candidates:
return []
top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:30]
top_artists = [a["name"] for a in taste.get("topArtists", [])[:20]]
recent_artists = [a["name"] for a in taste.get("recentArtists", [])[:10]]
year = datetime.now().year
items_for_prompt = [
{
"index": i,
"artist": c.get("artist", ""),
"album": c.get("album", ""),
"subgenre": (c.get("prefilter") or {}).get("subgenre", "unknown"),
"url": c.get("url", ""),
"source": c.get("source", ""),
}
for i, c in enumerate(top)
]
prompt = f"""You are a heavy metal curator reviewing NEW {year} releases found via web search.
IMPORTANT: Only score albums from the list below. Do NOT recommend albums not in this list.
Do NOT recommend classic albums. These must be actual {year} releases found at the provided URLs.
If you are not sure an item is a real new release, skip it.
nick2day's taste profile:
- Top artists (12-month): {', '.join(top_artists)}
- Recent listens: {', '.join(recent_artists)}
New {year} releases to evaluate (use ONLY these):
{json.dumps(items_for_prompt, indent=2)}
For each item worth recommending, output a JSON object with:
index (integer, from above), artist, album, score (0-100, based on fit to taste profile),
subgenre, obscurity (underground/indie/major), similar_to (2-3 artists from taste profile),
why (1 sentence), review_angle (unique angle), tags (string array), url (copy from above)
Skip items without a clear artist + album. Skip classics or non-{year} releases.
Return ONLY a JSON array starting with ["""
body = {
"model": "mistral-nemo:latest",
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.3, "num_predict": 8000},
}
# Build index → candidate map for provenance validation
top_by_index = {i: c for i, c in enumerate(top)}
known_urls = {c.get("url", "") for c in top if c.get("url")}
raw = ""
for attempt in range(3):
try:
r = await client.post(
f"{settings.ollama_url}/api/chat",
json=body,
timeout=180,
)
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
picks = extract_json_array(raw)
if picks:
# Validate provenance: keep picks that reference a real input item
validated = []
for p in picks:
if p.get("score", 0) < 50:
continue
idx = p.get("index")
url = p.get("url", "")
if idx is not None and idx in top_by_index:
# Always use original URL from search result
p["url"] = top_by_index[idx].get("url", url)
validated.append(p)
elif url and any(url.startswith(ku[:40]) for ku in known_urls if ku):
# URL prefix match (handles trailing-slash variants)
validated.append(p)
else:
log.debug(f"Dropped hallucinated pick: {p.get('artist')}{p.get('album')}")
log.info(f"Curator returned {len(validated)} validated picks (attempt {attempt+1})")
if validated:
return validated
except Exception as e:
log.error(f"Curator attempt {attempt+1} failed: {e}")
await asyncio.sleep(2)
log.warning("Curator failed all attempts")
return []
def sanitise_picks(picks: list[dict], source_candidates: list[dict]) -> list[dict]:
"""Final cleanup pass: fix obscurity values, drop garbage artist labels."""
# Build URL → candidate map for fallback artist extraction
url_map = {c.get("url", ""): c for c in source_candidates if c.get("url")}
clean = []
for p in picks:
# Normalise obscurity to values the dashboard understands
obs = (p.get("obscurity") or "").lower()
if "underground" in obs or obs in ("high", "demo", "diy"):
p["obscurity"] = "high"
elif "indie" in obs or "cult" in obs or obs == "medium":
p["obscurity"] = "medium"
else:
p["obscurity"] = "low"
# If URL is a review article, artist shouldn't be the site/blog name
url = p.get("url", "")
if _REVIEW_DOMAINS.search(url):
domain_slug = re.sub(r'https?://(www\.)?', '', url).split('/')[0].split('.')[0]
if domain_slug.lower() in (p.get("artist") or "").lower().replace(' ', ''):
# artist IS the domain — try to recover from prefilter data
orig = url_map.get(url)
if orig and orig.get("artist"):
p["artist"] = orig["artist"]
p["album"] = orig.get("album") or p.get("album", "")
else:
# Can't recover — drop this pick
continue
# If artist looks bad, try to recover from search result
if _BAD_FIELD.search(p.get("artist", "")):
orig = url_map.get(p.get("url", ""))
if orig:
artist_fallback, album_fallback = parse_title(orig.get("title", ""))
if not artist_fallback:
artist_fallback = artist_from_url(orig.get("url", ""))
p["artist"] = artist_fallback or orig.get("artist", p["artist"])
if not p.get("album"):
p["album"] = album_fallback
if looks_like_bad_pick(p):
log.debug(f"Dropping bad pick after cleanup: {p.get('artist')} / {p.get('album')}")
continue
clean.append(p)
return clean
def extract_json_array(text: str) -> list:
"""Robustly extract a JSON array from LLM output."""
if not text:
return []
# Strip markdown fences
cleaned = re.sub(r'```(?:json)?\s*', '', text).strip()
# Try direct parse
try:
data = json.loads(cleaned)
if isinstance(data, list):
return data
if isinstance(data, dict):
for v in data.values():
if isinstance(v, list):
return v
except Exception:
pass
# Find first [ to last ]
start = cleaned.find('[')
end = cleaned.rfind(']')
if start != -1 and end > start:
try:
data = json.loads(cleaned[start:end+1])
if isinstance(data, list):
return data
except Exception:
pass
return []
async def write_picks(new_picks: list[dict], run_stats: dict):
"""Append new picks to metal-picks.json."""
try:
with open(settings.picks_path, "r") as f:
store = json.load(f)
except Exception:
store = {"allPicks": [], "runs": [], "lastUpdated": ""}
# Add run record
run = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": "fgs-agent",
"picksAdded": len(new_picks),
"candidatesSearched": run_stats.get("candidates", 0),
"candidatesFiltered": run_stats.get("filtered", 0),
}
store["runs"] = ([run] + store.get("runs", []))[:10]
# Append-only — never remove existing picks; dedup by artist+album key
existing = store.get("allPicks", [])
existing_keys = {
(p.get("artist", "").lower(), p.get("album", "").lower())
for p in existing
}
truly_new = [
p for p in new_picks
if (p.get("artist", "").lower(), p.get("album", "").lower()) not in existing_keys
]
store["allPicks"] = truly_new + existing
store["lastUpdated"] = run["timestamp"]
with open(settings.picks_path, "w") as f:
json.dump(store, f, indent=2)
log.info(f"Wrote {len(new_picks)} new picks to {settings.picks_path}")
# ── RUN STATE ────────────────────────────────────────────────────────────────
run_state: dict[str, Any] = {
"status": "idle",
"run_id": None,
"started": None,
"finished": None,
"picks_added": 0,
"candidates_found": 0,
"error": None,
"log": [],
}
_run_lock = asyncio.Lock()
async def pipeline_run(run_id: str):
global run_state
def log_step(msg: str):
log.info(msg)
run_state["log"].append({"ts": datetime.now(timezone.utc).isoformat(), "msg": msg})
async with _run_lock:
run_state.update({
"status": "running",
"run_id": run_id,
"started": datetime.now(timezone.utc).isoformat(),
"finished": None,
"picks_added": 0,
"candidates_found": 0,
"error": None,
"log": [],
})
try:
async with httpx.AsyncClient() as client:
log_step("Fetching Last.fm data")
lastfm = await fetch_lastfm(client)
taste = build_taste_profile(lastfm)
log_step(f"Taste profile: {len(taste['topArtists'])} top artists, {len(taste['recentArtists'])} recent")
log_step("Searching candidates")
candidates = await search_candidates(client)
run_state["candidates_found"] = len(candidates)
if not candidates:
log_step("No candidates found — ending run")
run_state["status"] = "done"
run_state["finished"] = datetime.now(timezone.utc).isoformat()
return
log_step("Filtering seen albums")
fresh = await filter_unseen(candidates)
log_step(f"{len(fresh)} fresh candidates (of {len(candidates)} total)")
if not fresh:
log_step("All candidates already seen — ending run")
run_state["status"] = "done"
run_state["finished"] = datetime.now(timezone.utc).isoformat()
return
log_step("Scoring by embedding similarity")
scored = await score_candidates(client, fresh, taste)
log_step("Prefiltering with Mistral")
prefiltered = await prefilter_candidates(client, scored)
log_step(f"{len(prefiltered)} candidates passed prefilter")
log_step("Curating picks with Mistral")
picks = await curate_picks(client, prefiltered, taste)
picks = sanitise_picks(picks, scored)
log_step(f"{len(picks)} picks after cleanup")
if picks:
# Mark only actual picks as seen — unselected candidates stay eligible for re-eval
await mark_seen(picks)
await write_picks(picks, {
"candidates": len(candidates),
"filtered": len(prefiltered),
})
run_state.update({
"status": "done",
"finished": datetime.now(timezone.utc).isoformat(),
"picks_added": len(picks),
})
log_step(f"Run complete — {len(picks)} new picks added")
except Exception as e:
log.exception(f"Pipeline run failed: {e}")
run_state.update({
"status": "error",
"finished": datetime.now(timezone.utc).isoformat(),
"error": str(e),
})
# ── API ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="fgs-agent", description="First Gain Stage metal discovery")
@app.on_event("startup")
async def startup():
await init_dedup_db()
log.info(f"fgs-agent started on port {settings.port}")
@app.post("/api/fgs/run")
async def trigger_run(background_tasks: BackgroundTasks):
if run_state["status"] == "running":
return JSONResponse({"status": "already_running", "run_id": run_state["run_id"]}, status_code=409)
run_id = str(uuid.uuid4())[:8]
background_tasks.add_task(pipeline_run, run_id)
return {"status": "started", "run_id": run_id}
@app.get("/api/fgs/status")
async def get_status():
return run_state
@app.get("/api/fgs/picks")
async def get_picks():
try:
with open(settings.picks_path, "r") as f:
return json.load(f)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
class RemoveRequest(BaseModel):
artist: str
album: str
@app.post("/api/fgs/picks/remove")
async def remove_pick(req: RemoveRequest):
"""User-initiated pick removal. Writes the pick key to dedup so it won't return."""
try:
with open(settings.picks_path, "r") as f:
store = json.load(f)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
key_a = req.artist.lower()
key_b = req.album.lower()
before = len(store.get("allPicks", []))
store["allPicks"] = [
p for p in store.get("allPicks", [])
if not (p.get("artist", "").lower() == key_a and p.get("album", "").lower() == key_b)
]
after = len(store["allPicks"])
with open(settings.picks_path, "w") as f:
json.dump(store, f, indent=2)
# Also add to dedup so it won't come back
await mark_seen([{"artist": req.artist, "album": req.album}])
return {"removed": before - after, "total": after}
@app.get("/api/fgs/health")
async def health():
return {"status": "ok", "agent": "fgs-agent"}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
uvicorn.run("agent:app", host="0.0.0.0", port=settings.port, reload=False)