|
|
"""
|
|
|
fgs-agent — First Gain Stage metal discovery pipeline
|
|
|
Standalone FastAPI service; n8n just POSTs to /api/fgs/run on a schedule.
|
|
|
|
|
|
Endpoints:
|
|
|
POST /api/fgs/run — trigger a discovery run (async, returns run_id)
|
|
|
GET /api/fgs/status — current run status
|
|
|
GET /api/fgs/picks — read metal-picks.json
|
|
|
GET /api/fgs/health — liveness check
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import json
|
|
|
import logging
|
|
|
import math
|
|
|
import re
|
|
|
import time
|
|
|
import uuid
|
|
|
from datetime import datetime, timezone
|
|
|
from typing import Any, Optional
|
|
|
|
|
|
import aiosqlite
|
|
|
import httpx
|
|
|
import uvicorn
|
|
|
from fastapi import BackgroundTasks, FastAPI
|
|
|
from fastapi.responses import JSONResponse
|
|
|
from pydantic import BaseModel
|
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
log = logging.getLogger("fgs-agent")
|
|
|
|
|
|
|
|
|
# ── Config ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
|
|
|
|
|
lastfm_username: str = "nick2day"
|
|
|
lastfm_api_key: str = ""
|
|
|
searxng_url: str = "http://localhost:8080"
|
|
|
ollama_url: str = "http://localhost:11434"
|
|
|
picks_path: str = "/home/nick2day/.n8n/metal-picks.json"
|
|
|
dedup_db: str = "/home/nick2day/fgs-agent/dedup.db"
|
|
|
port: int = 8766
|
|
|
|
|
|
|
|
|
settings = Settings()
|
|
|
|
|
|
|
|
|
# ── SEARXNG QUERY BANK ───────────────────────────────────────────────────────
|
|
|
|
|
|
SUBGENRES = [
|
|
|
"death metal", "black metal", "doom metal", "thrash metal", "sludge metal",
|
|
|
"progressive metal", "blackened death metal", "post-metal", "funeral doom", "grindcore",
|
|
|
"melodic death metal", "technical death metal", "atmospheric black metal", "war metal",
|
|
|
"drone metal", "stoner metal", "gothic metal", "power metal", "viking metal", "brutal death metal",
|
|
|
"deathcore", "mathcore", "noise rock", "crust punk", "black doom", "death doom",
|
|
|
"dissonant death metal", "ambient black metal", "depressive black metal", "crossover thrash",
|
|
|
]
|
|
|
|
|
|
# Labels to search on Bandcamp — no time_range, broad results
|
|
|
BANDCAMP_LABELS = [
|
|
|
"sentient ruin",
|
|
|
"20 buck spin",
|
|
|
"prosthetic records",
|
|
|
"unique leader records",
|
|
|
"redefining darkness",
|
|
|
"profound lore",
|
|
|
"season of mist",
|
|
|
"century media",
|
|
|
"nuclear blast",
|
|
|
"relapse records",
|
|
|
"metal blade records",
|
|
|
"dark descent records",
|
|
|
"iron bonehead",
|
|
|
"les acteurs de l'ombre",
|
|
|
"vrasubatlat",
|
|
|
"me saco un ojo",
|
|
|
"hells headbangers",
|
|
|
"invictus productions",
|
|
|
"blood harvest",
|
|
|
"chaos records",
|
|
|
"adagio 830",
|
|
|
"floga records",
|
|
|
"sewer rot records",
|
|
|
"lavadome productions",
|
|
|
"memento mori",
|
|
|
]
|
|
|
|
|
|
# Review/news sites — use time_range year to get fresh but not stale
|
|
|
REVIEW_SITES = [
|
|
|
"metalinjection.net",
|
|
|
"cvltnation.com",
|
|
|
"decibelmag.com",
|
|
|
"invisibleoranges.com",
|
|
|
"brooklynvegan.com",
|
|
|
"heavyblogisheavy.com",
|
|
|
"nocturnalcult.com",
|
|
|
"themetalcrypt.com",
|
|
|
"angrymetalguy.com",
|
|
|
"sputnikmusic.com",
|
|
|
"nocleansinging.com",
|
|
|
"rateyourmusic.com",
|
|
|
"terrorizer.com",
|
|
|
"kerrang.com",
|
|
|
]
|
|
|
|
|
|
|
|
|
def build_queries() -> list[str]:
|
|
|
year = datetime.now().year
|
|
|
# Rotate through 6 subgenres per run (changes every 4 hours)
|
|
|
idx = int(time.time() // 14400) % len(SUBGENRES)
|
|
|
sgs = [SUBGENRES[(idx + i) % len(SUBGENRES)] for i in range(6)]
|
|
|
# Rotate through label batches (every 6 hours, 5 labels at a time)
|
|
|
label_idx = int(time.time() // 21600) % len(BANDCAMP_LABELS)
|
|
|
labels = [BANDCAMP_LABELS[(label_idx + i) % len(BANDCAMP_LABELS)] for i in range(5)]
|
|
|
# Rotate review sites (every 12 hours, 4 sites at a time)
|
|
|
site_idx = int(time.time() // 43200) % len(REVIEW_SITES)
|
|
|
sites = [REVIEW_SITES[(site_idx + i) % len(REVIEW_SITES)] for i in range(4)]
|
|
|
|
|
|
queries = []
|
|
|
|
|
|
# Subgenre releases — mix of year-scoped and unscoped for breadth
|
|
|
for sg in sgs[:3]:
|
|
|
queries.append(f'{sg} new album release {year}')
|
|
|
for sg in sgs[3:]:
|
|
|
queries.append(f'{sg} new album bandcamp')
|
|
|
|
|
|
# Bandcamp direct — no time_range, catches more
|
|
|
for sg in sgs[:2]:
|
|
|
queries.append(f'site:bandcamp.com {sg} album {year}')
|
|
|
queries.append(f'site:bandcamp.com {sg} new release')
|
|
|
|
|
|
# Label searches — broad, no time restriction
|
|
|
for label in labels:
|
|
|
queries.append(f'site:bandcamp.com "{label}"')
|
|
|
|
|
|
# Review sites with year scope
|
|
|
for site in sites:
|
|
|
queries.append(f'site:{site} album review {year}')
|
|
|
queries.append(f'site:{site} new album {year}')
|
|
|
|
|
|
# Bandcamp underground / demo scene
|
|
|
for sg in sgs[:2]:
|
|
|
queries.append(f'underground {sg} demo {year} bandcamp')
|
|
|
queries.append(f'new {sg} EP {year} bandcamp')
|
|
|
|
|
|
# Michigan / regional
|
|
|
queries.append(f'Michigan metal band new album {year}')
|
|
|
queries.append(f'Detroit metal band new release {year}')
|
|
|
queries.append(f'Midwest metal new release {year} bandcamp')
|
|
|
|
|
|
# Metal Archives new reviews (no time_range — MA is evergreen)
|
|
|
queries.append(f'site:metal-archives.com album review {year}')
|
|
|
queries.append(f'site:metal-archives.com new band {year}')
|
|
|
|
|
|
# Broad discovery
|
|
|
queries.append(f'best underground metal albums {year}')
|
|
|
queries.append(f'new metal releases {year} bandcamp full stream')
|
|
|
queries.append(f'hidden gem metal album {year}')
|
|
|
queries.append(f'obscure metal release {year}')
|
|
|
queries.append(f'metal full album stream {year} new band')
|
|
|
|
|
|
return queries
|
|
|
|
|
|
|
|
|
# ── NOISE FILTERS ────────────────────────────────────────────────────────────
|
|
|
|
|
|
NOISE_PATTERNS = [
|
|
|
re.compile(r'wikipedia\.org', re.I),
|
|
|
re.compile(r'\bin heavy metal music\b', re.I),
|
|
|
re.compile(r'album release calendar', re.I),
|
|
|
re.compile(r'most anticipated', re.I),
|
|
|
re.compile(r'\byear in review\b', re.I),
|
|
|
re.compile(r'^top \d+', re.I),
|
|
|
re.compile(r'albums you need to hear', re.I),
|
|
|
re.compile(r'upcoming.*releases', re.I),
|
|
|
re.compile(r'\bto release\b.*album', re.I),
|
|
|
re.compile(r'\bannounce[sd]?\b.*album', re.I),
|
|
|
re.compile(r'\bdelayed\b', re.I),
|
|
|
re.compile(r'best.*releases', re.I),
|
|
|
re.compile(r'\d+ metal albums.*20\d\d', re.I),
|
|
|
re.compile(r'shatner', re.I),
|
|
|
re.compile(r'mark your calendars', re.I),
|
|
|
re.compile(r'\bvideo game\b', re.I),
|
|
|
re.compile(r'\bsoundtrack\b', re.I),
|
|
|
re.compile(r'\btribute\b', re.I),
|
|
|
re.compile(r'\bcovers album\b', re.I),
|
|
|
re.compile(r'heavy metal music - wikipedia', re.I),
|
|
|
re.compile(r'in \d{4} - wikipedia', re.I),
|
|
|
]
|
|
|
|
|
|
|
|
|
def is_noise(title: str, url: str) -> bool:
|
|
|
text = f"{title} {url}"
|
|
|
return any(p.search(text) for p in NOISE_PATTERNS)
|
|
|
|
|
|
|
|
|
LABEL_SUFFIXES = re.compile(
|
|
|
r'\s*[-–|]\s*(bandcamp|records|recordings|productions|music|metal|label|distro|'
|
|
|
r'sentient ruin|dark descent|20 buck spin|relapse|prosthetic|nuclear blast|'
|
|
|
r'season of mist|century media|profound lore|iron bonehead|hells headbangers|'
|
|
|
r'blood harvest|invictus productions|metal blade|redefining darkness)\b.*$',
|
|
|
re.I
|
|
|
)
|
|
|
|
|
|
|
|
|
def clean_name(s: str) -> str:
|
|
|
"""Strip label suffixes and trim whitespace."""
|
|
|
return LABEL_SUFFIXES.sub('', s).strip().strip('|').strip()
|
|
|
|
|
|
|
|
|
def parse_title(title: str) -> tuple[str, str]:
|
|
|
"""Extract (artist, album) from common title formats."""
|
|
|
# "Album Title, by Artist Name" — Bandcamp standard format
|
|
|
m = re.match(r'^(.+?),\s+by\s+(.+)$', title, re.I)
|
|
|
if m:
|
|
|
return clean_name(m.group(2)), clean_name(m.group(1))
|
|
|
|
|
|
# Bandcamp search result: "Album Title | Artist Name — Label - Bandcamp"
|
|
|
# Stop group 2 at em-dash so label suffix doesn't bleed in
|
|
|
pipe_m = re.match(r'^(.+?)\s*\|\s*([^—–]+?)(?:\s*[—–].+)?$', title)
|
|
|
if pipe_m:
|
|
|
left = clean_name(pipe_m.group(1))
|
|
|
right = clean_name(pipe_m.group(2))
|
|
|
# Bandcamp puts album first, artist second
|
|
|
if right and len(right) < 60:
|
|
|
return right, left
|
|
|
|
|
|
# "Artist - Album" or "Artist – Album"
|
|
|
m = re.match(r'^([^-–|]+?)\s*[-–]\s*(.+)$', title)
|
|
|
if m and len(m.group(1)) < 60:
|
|
|
return clean_name(m.group(1)), clean_name(m.group(2))
|
|
|
|
|
|
return '', clean_name(title)
|
|
|
|
|
|
|
|
|
def artist_from_url(url: str) -> str:
|
|
|
"""Extract artist slug from Bandcamp URL."""
|
|
|
m = re.match(r'https?://([^.]+)\.bandcamp\.com', url, re.I)
|
|
|
if m:
|
|
|
return m.group(1).replace('-', ' ').title()
|
|
|
return ''
|
|
|
|
|
|
|
|
|
_BAD_FIELD = re.compile(
|
|
|
r'bandcamp|recordings?|productions?|distro|\|', re.I
|
|
|
)
|
|
|
|
|
|
# Known review/blog domains — artist should never be the site name
|
|
|
_REVIEW_DOMAINS = re.compile(
|
|
|
r'metalinjection\.net|cvltnation\.com|decibelmag\.com|invisibleoranges\.com|'
|
|
|
r'brooklynvegan\.com|heavyblogisheavy\.com|angrymetalguy\.com|sputnikmusic\.com|'
|
|
|
r'nocleansinging\.com|meatmeadmetal\.com|terrorizer\.com|kerrang\.com|'
|
|
|
r'loudwire\.com|nocturnalcult\.com|themetalcrypt\.com|rateyourmusic\.com',
|
|
|
re.I
|
|
|
)
|
|
|
|
|
|
|
|
|
def looks_like_bad_pick(p: dict) -> bool:
|
|
|
"""True if artist/album fields are clearly garbage."""
|
|
|
artist = p.get("artist", "")
|
|
|
album = p.get("album", "")
|
|
|
if not artist and not album:
|
|
|
return True
|
|
|
if _BAD_FIELD.search(artist):
|
|
|
return True
|
|
|
if "|" in album:
|
|
|
return True
|
|
|
if len(artist) > 70 or len(album) > 120:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
# ── DEDUP ────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def init_dedup_db():
|
|
|
async with aiosqlite.connect(settings.dedup_db) as db:
|
|
|
await db.execute(
|
|
|
"CREATE TABLE IF NOT EXISTS seen (key TEXT PRIMARY KEY, ts INTEGER)"
|
|
|
)
|
|
|
await db.commit()
|
|
|
|
|
|
|
|
|
async def filter_unseen(candidates: list[dict]) -> list[dict]:
|
|
|
"""Remove already-seen albums. Returns only fresh candidates."""
|
|
|
async with aiosqlite.connect(settings.dedup_db) as db:
|
|
|
fresh = []
|
|
|
for c in candidates:
|
|
|
key = f"{c.get('artist','').lower()}::{c.get('album','').lower()}"
|
|
|
if not key or key == '::':
|
|
|
key = c.get('url', c.get('album', ''))
|
|
|
row = await db.execute("SELECT 1 FROM seen WHERE key=?", (key,))
|
|
|
if not await row.fetchone():
|
|
|
fresh.append(c)
|
|
|
return fresh
|
|
|
|
|
|
|
|
|
async def mark_seen(picks: list[dict]):
|
|
|
async with aiosqlite.connect(settings.dedup_db) as db:
|
|
|
ts = int(time.time())
|
|
|
for p in picks:
|
|
|
key = f"{p.get('artist','').lower()}::{p.get('album','').lower()}"
|
|
|
if key and key != '::':
|
|
|
await db.execute(
|
|
|
"INSERT OR IGNORE INTO seen (key, ts) VALUES (?, ?)", (key, ts)
|
|
|
)
|
|
|
await db.commit()
|
|
|
|
|
|
|
|
|
# ── PIPELINE STAGES ──────────────────────────────────────────────────────────
|
|
|
|
|
|
async def fetch_lastfm(client: httpx.AsyncClient) -> dict:
|
|
|
"""Fetch top artists + recent tracks from Last.fm."""
|
|
|
base = "https://ws.audioscrobbler.com/2.0/"
|
|
|
params_top = {
|
|
|
"method": "user.gettopartists",
|
|
|
"user": settings.lastfm_username,
|
|
|
"api_key": settings.lastfm_api_key,
|
|
|
"period": "12month",
|
|
|
"limit": "100",
|
|
|
"format": "json",
|
|
|
}
|
|
|
params_recent = {
|
|
|
"method": "user.getrecenttracks",
|
|
|
"user": settings.lastfm_username,
|
|
|
"api_key": settings.lastfm_api_key,
|
|
|
"limit": "1000",
|
|
|
"format": "json",
|
|
|
}
|
|
|
top_r, recent_r = await asyncio.gather(
|
|
|
client.get(base, params=params_top, timeout=15),
|
|
|
client.get(base, params=params_recent, timeout=15),
|
|
|
)
|
|
|
return {
|
|
|
"top": top_r.json() if top_r.status_code == 200 else {},
|
|
|
"recent": recent_r.json() if recent_r.status_code == 200 else {},
|
|
|
}
|
|
|
|
|
|
|
|
|
def build_taste_profile(lastfm: dict) -> dict:
|
|
|
"""Build taste centroid from Last.fm data."""
|
|
|
top_artists_raw = lastfm.get("top", {}).get("topartists", {}).get("artist", [])
|
|
|
recent_tracks_raw = lastfm.get("recent", {}).get("recenttracks", {}).get("track", [])
|
|
|
|
|
|
top_artists = [
|
|
|
{"name": a["name"], "playcount": int(a.get("playcount", 0)), "rank": int(a.get("@attr", {}).get("rank", 99))}
|
|
|
for a in top_artists_raw[:100]
|
|
|
]
|
|
|
|
|
|
recent_map: dict[str, int] = {}
|
|
|
for t in recent_tracks_raw:
|
|
|
name = (t.get("artist") or {}).get("#text", "")
|
|
|
if name:
|
|
|
recent_map[name] = recent_map.get(name, 0) + 1
|
|
|
recent_artists = sorted(recent_map.items(), key=lambda x: -x[1])[:20]
|
|
|
|
|
|
return {
|
|
|
"topArtists": top_artists,
|
|
|
"recentArtists": [{"name": n, "plays": p} for n, p in recent_artists],
|
|
|
}
|
|
|
|
|
|
|
|
|
async def search_candidates(client: httpx.AsyncClient) -> list[dict]:
|
|
|
"""Run all SearXNG queries concurrently, collect and deduplicate results."""
|
|
|
queries = build_queries()
|
|
|
log.info(f"Running {len(queries)} SearXNG queries")
|
|
|
|
|
|
async def one_query(q: str) -> list[dict]:
|
|
|
# Use year time_range only for queries that include the current year literal;
|
|
|
# skip time_range for Bandcamp label/genre queries to maximise breadth
|
|
|
use_time_range = "site:bandcamp.com" not in q and "metal-archives" not in q
|
|
|
params: dict = {"q": q, "format": "json"}
|
|
|
if use_time_range:
|
|
|
params["time_range"] = "year"
|
|
|
try:
|
|
|
r = await client.get(
|
|
|
f"{settings.searxng_url}/search",
|
|
|
params=params,
|
|
|
timeout=25,
|
|
|
)
|
|
|
if r.status_code != 200:
|
|
|
return []
|
|
|
data = r.json()
|
|
|
results = data.get("results", [])[:25] # top 25 per query
|
|
|
out = []
|
|
|
for res in results:
|
|
|
title = res.get("title", "")
|
|
|
url = res.get("url", "")
|
|
|
if is_noise(title, url):
|
|
|
continue
|
|
|
artist, album = parse_title(title)
|
|
|
if not artist:
|
|
|
artist = artist_from_url(url)
|
|
|
out.append({
|
|
|
"title": title,
|
|
|
"url": url,
|
|
|
"artist": artist,
|
|
|
"album": album,
|
|
|
"source": q[:60],
|
|
|
"content": (res.get("content") or "")[:200],
|
|
|
})
|
|
|
return out
|
|
|
except Exception as e:
|
|
|
log.warning(f"SearXNG query failed: {q[:40]} — {e}")
|
|
|
return []
|
|
|
|
|
|
results = await asyncio.gather(*[one_query(q) for q in queries])
|
|
|
all_results = [r for batch in results for r in batch]
|
|
|
|
|
|
# Deduplicate by URL
|
|
|
seen_urls: set[str] = set()
|
|
|
unique = []
|
|
|
for r in all_results:
|
|
|
u = r["url"]
|
|
|
if u and u not in seen_urls:
|
|
|
seen_urls.add(u)
|
|
|
unique.append(r)
|
|
|
|
|
|
log.info(f"Collected {len(unique)} unique candidates from search")
|
|
|
return unique
|
|
|
|
|
|
|
|
|
async def embed_text(client: httpx.AsyncClient, text: str) -> list[float]:
|
|
|
try:
|
|
|
r = await client.post(
|
|
|
f"{settings.ollama_url}/api/embeddings",
|
|
|
json={"model": "nomic-embed-text", "prompt": text},
|
|
|
timeout=30,
|
|
|
)
|
|
|
if r.status_code == 200:
|
|
|
return r.json().get("embedding", [])
|
|
|
except Exception as e:
|
|
|
log.warning(f"Embed failed: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
def cosine_similarity(a: list[float], b: list[float]) -> float:
|
|
|
if not a or not b or len(a) != len(b):
|
|
|
return 0.0
|
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
|
mag_a = math.sqrt(sum(x * x for x in a))
|
|
|
mag_b = math.sqrt(sum(x * x for x in b))
|
|
|
if mag_a == 0 or mag_b == 0:
|
|
|
return 0.0
|
|
|
return dot / (mag_a * mag_b)
|
|
|
|
|
|
|
|
|
async def score_candidates(
|
|
|
client: httpx.AsyncClient, candidates: list[dict], taste: dict
|
|
|
) -> list[dict]:
|
|
|
"""Embed candidates and score against taste centroid."""
|
|
|
if not candidates:
|
|
|
return []
|
|
|
|
|
|
# Build taste centroid from top artists
|
|
|
top_names = [a["name"] for a in taste.get("topArtists", [])[:30]]
|
|
|
centroid_text = "Heavy metal artist: " + ", ".join(top_names)
|
|
|
|
|
|
log.info("Embedding taste centroid")
|
|
|
centroid_vec = await embed_text(client, centroid_text)
|
|
|
|
|
|
if not centroid_vec:
|
|
|
log.warning("Centroid embedding failed — scoring by 0")
|
|
|
for c in candidates:
|
|
|
c["embedScore"] = 0.0
|
|
|
return candidates
|
|
|
|
|
|
log.info(f"Scoring {len(candidates)} candidates against taste centroid")
|
|
|
scored = []
|
|
|
# Batch embed in groups to avoid hammering Ollama
|
|
|
for i, c in enumerate(candidates):
|
|
|
text = f"Metal album: {c['artist']} - {c['album']}" if c.get("artist") else f"Metal album: {c['album']}"
|
|
|
vec = await embed_text(client, text)
|
|
|
c["embedScore"] = round(cosine_similarity(centroid_vec, vec), 4) if vec else 0.0
|
|
|
if i % 20 == 0:
|
|
|
log.info(f" scored {i}/{len(candidates)}")
|
|
|
scored.append(c)
|
|
|
|
|
|
scored.sort(key=lambda x: -x.get("embedScore", 0))
|
|
|
return scored
|
|
|
|
|
|
|
|
|
async def prefilter_batch(
|
|
|
client: httpx.AsyncClient, batch: list[dict], batch_offset: int
|
|
|
) -> list[dict]:
|
|
|
"""Run prefilter on one batch. batch_offset adjusts index for provenance mapping."""
|
|
|
items_for_prompt = [
|
|
|
{"index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "url": c.get("url", "")}
|
|
|
for i, c in enumerate(batch)
|
|
|
]
|
|
|
prompt = f"""You are a metal music expert. Review these {len(items_for_prompt)} items from web searches.
|
|
|
Return ONLY a JSON array of items that are actual metal album or EP releases (not news, not Wikipedia, not lists, not tour dates).
|
|
|
For each kept item include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0).
|
|
|
Be inclusive — keep anything that looks like it could be a real release even if you're not certain.
|
|
|
|
|
|
Items:
|
|
|
{json.dumps(items_for_prompt, indent=2)}
|
|
|
|
|
|
Return ONLY valid JSON array starting with ["""
|
|
|
body = {
|
|
|
"model": "mistral-nemo:latest",
|
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
|
"stream": False,
|
|
|
"options": {"temperature": 0.1, "num_predict": 6000},
|
|
|
}
|
|
|
try:
|
|
|
r = await client.post(f"{settings.ollama_url}/api/chat", json=body, timeout=150)
|
|
|
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
|
|
|
except Exception as e:
|
|
|
log.warning(f"Prefilter batch failed: {e}")
|
|
|
return batch # pass through on error
|
|
|
|
|
|
parsed = extract_json_array(raw)
|
|
|
if not parsed:
|
|
|
log.warning(f"Prefilter batch {batch_offset}: no JSON — passing through")
|
|
|
return batch
|
|
|
|
|
|
enriched = []
|
|
|
for p in parsed:
|
|
|
idx = p.get("index")
|
|
|
if idx is None or idx >= len(batch):
|
|
|
continue
|
|
|
orig = batch[idx].copy()
|
|
|
orig["artist"] = p.get("artist") or orig.get("artist", "")
|
|
|
orig["album"] = p.get("album") or orig.get("album", "")
|
|
|
orig["prefilter"] = {
|
|
|
"subgenre": p.get("subgenre", "unknown"),
|
|
|
"confidence": p.get("confidence", 0.5),
|
|
|
}
|
|
|
enriched.append(orig)
|
|
|
return enriched
|
|
|
|
|
|
|
|
|
async def prefilter_candidates(
|
|
|
client: httpx.AsyncClient, candidates: list[dict]
|
|
|
) -> list[dict]:
|
|
|
"""Run prefilter on all candidates in parallel batches of 35."""
|
|
|
if not candidates:
|
|
|
return []
|
|
|
|
|
|
BATCH_SIZE = 35
|
|
|
batches = [candidates[i:i+BATCH_SIZE] for i in range(0, min(len(candidates), 200), BATCH_SIZE)]
|
|
|
log.info(f"Prefiltering {len(candidates)} candidates in {len(batches)} parallel batches")
|
|
|
|
|
|
# Run up to 3 batches concurrently to avoid OOMing Ollama
|
|
|
all_enriched = []
|
|
|
for chunk_start in range(0, len(batches), 3):
|
|
|
chunk = batches[chunk_start:chunk_start+3]
|
|
|
results = await asyncio.gather(*[
|
|
|
prefilter_batch(client, b, chunk_start * BATCH_SIZE + i * BATCH_SIZE)
|
|
|
for i, b in enumerate(chunk)
|
|
|
])
|
|
|
for r in results:
|
|
|
all_enriched.extend(r)
|
|
|
|
|
|
log.info(f"Prefilter kept {len(all_enriched)}/{len(candidates)} candidates")
|
|
|
return all_enriched
|
|
|
|
|
|
|
|
|
async def curate_picks(
|
|
|
client: httpx.AsyncClient, candidates: list[dict], taste: dict
|
|
|
) -> list[dict]:
|
|
|
"""Use mistral-nemo to produce final scored picks with full metadata."""
|
|
|
if not candidates:
|
|
|
return []
|
|
|
|
|
|
top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:30]
|
|
|
top_artists = [a["name"] for a in taste.get("topArtists", [])[:20]]
|
|
|
recent_artists = [a["name"] for a in taste.get("recentArtists", [])[:10]]
|
|
|
year = datetime.now().year
|
|
|
|
|
|
items_for_prompt = [
|
|
|
{
|
|
|
"index": i,
|
|
|
"artist": c.get("artist", ""),
|
|
|
"album": c.get("album", ""),
|
|
|
"subgenre": (c.get("prefilter") or {}).get("subgenre", "unknown"),
|
|
|
"url": c.get("url", ""),
|
|
|
"source": c.get("source", ""),
|
|
|
}
|
|
|
for i, c in enumerate(top)
|
|
|
]
|
|
|
|
|
|
prompt = f"""You are a heavy metal curator reviewing NEW {year} releases found via web search.
|
|
|
|
|
|
IMPORTANT: Only score albums from the list below. Do NOT recommend albums not in this list.
|
|
|
Do NOT recommend classic albums. These must be actual {year} releases found at the provided URLs.
|
|
|
If you are not sure an item is a real new release, skip it.
|
|
|
|
|
|
nick2day's taste profile:
|
|
|
- Top artists (12-month): {', '.join(top_artists)}
|
|
|
- Recent listens: {', '.join(recent_artists)}
|
|
|
|
|
|
New {year} releases to evaluate (use ONLY these):
|
|
|
{json.dumps(items_for_prompt, indent=2)}
|
|
|
|
|
|
For each item worth recommending, output a JSON object with:
|
|
|
index (integer, from above), artist, album, score (0-100, based on fit to taste profile),
|
|
|
subgenre, obscurity (underground/indie/major), similar_to (2-3 artists from taste profile),
|
|
|
why (1 sentence), review_angle (unique angle), tags (string array), url (copy from above)
|
|
|
|
|
|
Skip items without a clear artist + album. Skip classics or non-{year} releases.
|
|
|
Return ONLY a JSON array starting with ["""
|
|
|
|
|
|
body = {
|
|
|
"model": "mistral-nemo:latest",
|
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
|
"stream": False,
|
|
|
"options": {"temperature": 0.3, "num_predict": 8000},
|
|
|
}
|
|
|
|
|
|
# Build index → candidate map for provenance validation
|
|
|
top_by_index = {i: c for i, c in enumerate(top)}
|
|
|
known_urls = {c.get("url", "") for c in top if c.get("url")}
|
|
|
|
|
|
raw = ""
|
|
|
for attempt in range(3):
|
|
|
try:
|
|
|
r = await client.post(
|
|
|
f"{settings.ollama_url}/api/chat",
|
|
|
json=body,
|
|
|
timeout=180,
|
|
|
)
|
|
|
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
|
|
|
picks = extract_json_array(raw)
|
|
|
if picks:
|
|
|
# Validate provenance: keep picks that reference a real input item
|
|
|
validated = []
|
|
|
for p in picks:
|
|
|
if p.get("score", 0) < 50:
|
|
|
continue
|
|
|
idx = p.get("index")
|
|
|
url = p.get("url", "")
|
|
|
if idx is not None and idx in top_by_index:
|
|
|
# Always use original URL from search result
|
|
|
p["url"] = top_by_index[idx].get("url", url)
|
|
|
validated.append(p)
|
|
|
elif url and any(url.startswith(ku[:40]) for ku in known_urls if ku):
|
|
|
# URL prefix match (handles trailing-slash variants)
|
|
|
validated.append(p)
|
|
|
else:
|
|
|
log.debug(f"Dropped hallucinated pick: {p.get('artist')} — {p.get('album')}")
|
|
|
log.info(f"Curator returned {len(validated)} validated picks (attempt {attempt+1})")
|
|
|
if validated:
|
|
|
return validated
|
|
|
except Exception as e:
|
|
|
log.error(f"Curator attempt {attempt+1} failed: {e}")
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
log.warning("Curator failed all attempts")
|
|
|
return []
|
|
|
|
|
|
|
|
|
def sanitise_picks(picks: list[dict], source_candidates: list[dict]) -> list[dict]:
|
|
|
"""Final cleanup pass: fix obscurity values, drop garbage artist labels."""
|
|
|
# Build URL → candidate map for fallback artist extraction
|
|
|
url_map = {c.get("url", ""): c for c in source_candidates if c.get("url")}
|
|
|
|
|
|
clean = []
|
|
|
for p in picks:
|
|
|
# Normalise obscurity to values the dashboard understands
|
|
|
obs = (p.get("obscurity") or "").lower()
|
|
|
if "underground" in obs or obs in ("high", "demo", "diy"):
|
|
|
p["obscurity"] = "high"
|
|
|
elif "indie" in obs or "cult" in obs or obs == "medium":
|
|
|
p["obscurity"] = "medium"
|
|
|
else:
|
|
|
p["obscurity"] = "low"
|
|
|
|
|
|
# If URL is a review article, artist shouldn't be the site/blog name
|
|
|
url = p.get("url", "")
|
|
|
if _REVIEW_DOMAINS.search(url):
|
|
|
domain_slug = re.sub(r'https?://(www\.)?', '', url).split('/')[0].split('.')[0]
|
|
|
if domain_slug.lower() in (p.get("artist") or "").lower().replace(' ', ''):
|
|
|
# artist IS the domain — try to recover from prefilter data
|
|
|
orig = url_map.get(url)
|
|
|
if orig and orig.get("artist"):
|
|
|
p["artist"] = orig["artist"]
|
|
|
p["album"] = orig.get("album") or p.get("album", "")
|
|
|
else:
|
|
|
# Can't recover — drop this pick
|
|
|
continue
|
|
|
|
|
|
# If artist looks bad, try to recover from search result
|
|
|
if _BAD_FIELD.search(p.get("artist", "")):
|
|
|
orig = url_map.get(p.get("url", ""))
|
|
|
if orig:
|
|
|
artist_fallback, album_fallback = parse_title(orig.get("title", ""))
|
|
|
if not artist_fallback:
|
|
|
artist_fallback = artist_from_url(orig.get("url", ""))
|
|
|
p["artist"] = artist_fallback or orig.get("artist", p["artist"])
|
|
|
if not p.get("album"):
|
|
|
p["album"] = album_fallback
|
|
|
|
|
|
if looks_like_bad_pick(p):
|
|
|
log.debug(f"Dropping bad pick after cleanup: {p.get('artist')} / {p.get('album')}")
|
|
|
continue
|
|
|
clean.append(p)
|
|
|
|
|
|
return clean
|
|
|
|
|
|
|
|
|
def extract_json_array(text: str) -> list:
|
|
|
"""Robustly extract a JSON array from LLM output."""
|
|
|
if not text:
|
|
|
return []
|
|
|
# Strip markdown fences
|
|
|
cleaned = re.sub(r'```(?:json)?\s*', '', text).strip()
|
|
|
# Try direct parse
|
|
|
try:
|
|
|
data = json.loads(cleaned)
|
|
|
if isinstance(data, list):
|
|
|
return data
|
|
|
if isinstance(data, dict):
|
|
|
for v in data.values():
|
|
|
if isinstance(v, list):
|
|
|
return v
|
|
|
except Exception:
|
|
|
pass
|
|
|
# Find first [ to last ]
|
|
|
start = cleaned.find('[')
|
|
|
end = cleaned.rfind(']')
|
|
|
if start != -1 and end > start:
|
|
|
try:
|
|
|
data = json.loads(cleaned[start:end+1])
|
|
|
if isinstance(data, list):
|
|
|
return data
|
|
|
except Exception:
|
|
|
pass
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def write_picks(new_picks: list[dict], run_stats: dict):
|
|
|
"""Append new picks to metal-picks.json."""
|
|
|
try:
|
|
|
with open(settings.picks_path, "r") as f:
|
|
|
store = json.load(f)
|
|
|
except Exception:
|
|
|
store = {"allPicks": [], "runs": [], "lastUpdated": ""}
|
|
|
|
|
|
# Add run record
|
|
|
run = {
|
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
|
"source": "fgs-agent",
|
|
|
"picksAdded": len(new_picks),
|
|
|
"candidatesSearched": run_stats.get("candidates", 0),
|
|
|
"candidatesFiltered": run_stats.get("filtered", 0),
|
|
|
}
|
|
|
store["runs"] = ([run] + store.get("runs", []))[:10]
|
|
|
# Append-only — never remove existing picks; dedup by artist+album key
|
|
|
existing = store.get("allPicks", [])
|
|
|
existing_keys = {
|
|
|
(p.get("artist", "").lower(), p.get("album", "").lower())
|
|
|
for p in existing
|
|
|
}
|
|
|
truly_new = [
|
|
|
p for p in new_picks
|
|
|
if (p.get("artist", "").lower(), p.get("album", "").lower()) not in existing_keys
|
|
|
]
|
|
|
store["allPicks"] = truly_new + existing
|
|
|
store["lastUpdated"] = run["timestamp"]
|
|
|
|
|
|
with open(settings.picks_path, "w") as f:
|
|
|
json.dump(store, f, indent=2)
|
|
|
|
|
|
log.info(f"Wrote {len(new_picks)} new picks to {settings.picks_path}")
|
|
|
|
|
|
|
|
|
# ── RUN STATE ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
run_state: dict[str, Any] = {
|
|
|
"status": "idle",
|
|
|
"run_id": None,
|
|
|
"started": None,
|
|
|
"finished": None,
|
|
|
"picks_added": 0,
|
|
|
"candidates_found": 0,
|
|
|
"error": None,
|
|
|
"log": [],
|
|
|
}
|
|
|
_run_lock = asyncio.Lock()
|
|
|
|
|
|
|
|
|
async def pipeline_run(run_id: str):
|
|
|
global run_state
|
|
|
|
|
|
def log_step(msg: str):
|
|
|
log.info(msg)
|
|
|
run_state["log"].append({"ts": datetime.now(timezone.utc).isoformat(), "msg": msg})
|
|
|
|
|
|
async with _run_lock:
|
|
|
run_state.update({
|
|
|
"status": "running",
|
|
|
"run_id": run_id,
|
|
|
"started": datetime.now(timezone.utc).isoformat(),
|
|
|
"finished": None,
|
|
|
"picks_added": 0,
|
|
|
"candidates_found": 0,
|
|
|
"error": None,
|
|
|
"log": [],
|
|
|
})
|
|
|
|
|
|
try:
|
|
|
async with httpx.AsyncClient() as client:
|
|
|
log_step("Fetching Last.fm data")
|
|
|
lastfm = await fetch_lastfm(client)
|
|
|
taste = build_taste_profile(lastfm)
|
|
|
log_step(f"Taste profile: {len(taste['topArtists'])} top artists, {len(taste['recentArtists'])} recent")
|
|
|
|
|
|
log_step("Searching candidates")
|
|
|
candidates = await search_candidates(client)
|
|
|
run_state["candidates_found"] = len(candidates)
|
|
|
|
|
|
if not candidates:
|
|
|
log_step("No candidates found — ending run")
|
|
|
run_state["status"] = "done"
|
|
|
run_state["finished"] = datetime.now(timezone.utc).isoformat()
|
|
|
return
|
|
|
|
|
|
log_step("Filtering seen albums")
|
|
|
fresh = await filter_unseen(candidates)
|
|
|
log_step(f"{len(fresh)} fresh candidates (of {len(candidates)} total)")
|
|
|
|
|
|
if not fresh:
|
|
|
log_step("All candidates already seen — ending run")
|
|
|
run_state["status"] = "done"
|
|
|
run_state["finished"] = datetime.now(timezone.utc).isoformat()
|
|
|
return
|
|
|
|
|
|
log_step("Scoring by embedding similarity")
|
|
|
scored = await score_candidates(client, fresh, taste)
|
|
|
|
|
|
log_step("Prefiltering with Mistral")
|
|
|
prefiltered = await prefilter_candidates(client, scored)
|
|
|
log_step(f"{len(prefiltered)} candidates passed prefilter")
|
|
|
|
|
|
log_step("Curating picks with Mistral")
|
|
|
picks = await curate_picks(client, prefiltered, taste)
|
|
|
picks = sanitise_picks(picks, scored)
|
|
|
log_step(f"{len(picks)} picks after cleanup")
|
|
|
|
|
|
if picks:
|
|
|
# Mark only actual picks as seen — unselected candidates stay eligible for re-eval
|
|
|
await mark_seen(picks)
|
|
|
await write_picks(picks, {
|
|
|
"candidates": len(candidates),
|
|
|
"filtered": len(prefiltered),
|
|
|
})
|
|
|
|
|
|
run_state.update({
|
|
|
"status": "done",
|
|
|
"finished": datetime.now(timezone.utc).isoformat(),
|
|
|
"picks_added": len(picks),
|
|
|
})
|
|
|
log_step(f"Run complete — {len(picks)} new picks added")
|
|
|
|
|
|
except Exception as e:
|
|
|
log.exception(f"Pipeline run failed: {e}")
|
|
|
run_state.update({
|
|
|
"status": "error",
|
|
|
"finished": datetime.now(timezone.utc).isoformat(),
|
|
|
"error": str(e),
|
|
|
})
|
|
|
|
|
|
|
|
|
# ── API ───────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
app = FastAPI(title="fgs-agent", description="First Gain Stage metal discovery")
|
|
|
|
|
|
|
|
|
@app.on_event("startup")
|
|
|
async def startup():
|
|
|
await init_dedup_db()
|
|
|
log.info(f"fgs-agent started on port {settings.port}")
|
|
|
|
|
|
|
|
|
@app.post("/api/fgs/run")
|
|
|
async def trigger_run(background_tasks: BackgroundTasks):
|
|
|
if run_state["status"] == "running":
|
|
|
return JSONResponse({"status": "already_running", "run_id": run_state["run_id"]}, status_code=409)
|
|
|
run_id = str(uuid.uuid4())[:8]
|
|
|
background_tasks.add_task(pipeline_run, run_id)
|
|
|
return {"status": "started", "run_id": run_id}
|
|
|
|
|
|
|
|
|
@app.get("/api/fgs/status")
|
|
|
async def get_status():
|
|
|
return run_state
|
|
|
|
|
|
|
|
|
@app.get("/api/fgs/picks")
|
|
|
async def get_picks():
|
|
|
try:
|
|
|
with open(settings.picks_path, "r") as f:
|
|
|
return json.load(f)
|
|
|
except Exception as e:
|
|
|
return JSONResponse({"error": str(e)}, status_code=500)
|
|
|
|
|
|
|
|
|
class RemoveRequest(BaseModel):
|
|
|
artist: str
|
|
|
album: str
|
|
|
|
|
|
|
|
|
@app.post("/api/fgs/picks/remove")
|
|
|
async def remove_pick(req: RemoveRequest):
|
|
|
"""User-initiated pick removal. Writes the pick key to dedup so it won't return."""
|
|
|
try:
|
|
|
with open(settings.picks_path, "r") as f:
|
|
|
store = json.load(f)
|
|
|
except Exception as e:
|
|
|
return JSONResponse({"error": str(e)}, status_code=500)
|
|
|
|
|
|
key_a = req.artist.lower()
|
|
|
key_b = req.album.lower()
|
|
|
before = len(store.get("allPicks", []))
|
|
|
store["allPicks"] = [
|
|
|
p for p in store.get("allPicks", [])
|
|
|
if not (p.get("artist", "").lower() == key_a and p.get("album", "").lower() == key_b)
|
|
|
]
|
|
|
after = len(store["allPicks"])
|
|
|
|
|
|
with open(settings.picks_path, "w") as f:
|
|
|
json.dump(store, f, indent=2)
|
|
|
|
|
|
# Also add to dedup so it won't come back
|
|
|
await mark_seen([{"artist": req.artist, "album": req.album}])
|
|
|
|
|
|
return {"removed": before - after, "total": after}
|
|
|
|
|
|
|
|
|
@app.get("/api/fgs/health")
|
|
|
async def health():
|
|
|
return {"status": "ok", "agent": "fgs-agent"}
|
|
|
|
|
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
uvicorn.run("agent:app", host="0.0.0.0", port=settings.port, reload=False)
|