Fix pipeline yield: dedup, query expansion, parallel prefilter

- Dedup: mark only accepted picks as seen (not all prefiltered
  candidates) — unselected items stay eligible for re-evaluation,
  preventing pool exhaustion across runs
- Queries: expanded from 29 to 37+ with rotating 30-subgenre list,
  25 label targets, 14 review sites; Bandcamp/MA queries skip
  time_range for broader results; review sites use time_range:year
- Results per query: 15 → 25
- Prefilter: parallel batches of 35 (up to 3 concurrent), processes
  all fresh candidates instead of just top 80; be-inclusive prompt
- Curator: cap 20 → 30, score floor 60 → 50, URL prefix matching
  in provenance check instead of exact match

Result: 405 candidates/run vs 146 before; 88 passing prefilter vs 10;
pool stays at ~400 fresh on consecutive runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
main
nick2day 3 months ago
parent 05bb4193ad
commit 39d6051a1f

@ -49,62 +49,120 @@ settings = Settings()
# ── SEARXNG QUERY BANK ───────────────────────────────────────────────────────
# All 29 queries — dynamic year, rotating subgenre index
SUBGENRES = [
"death metal", "black metal", "doom metal", "thrash metal", "sludge metal",
"progressive metal", "blackened death metal", "post-metal", "funeral doom", "grindcore",
"melodic death metal", "technical death metal", "atmospheric black metal", "war metal",
"drone metal", "stoner metal", "gothic metal", "power metal", "viking metal", "brutal death metal",
"deathcore", "mathcore", "noise rock", "crust punk", "black doom", "death doom",
"dissonant death metal", "ambient black metal", "depressive black metal", "crossover thrash",
]
# Labels to search on Bandcamp — no time_range, broad results
BANDCAMP_LABELS = [
"sentient ruin",
"20 buck spin",
"prosthetic records",
"unique leader records",
"redefining darkness",
"profound lore",
"season of mist",
"century media",
"nuclear blast",
"relapse records",
"metal blade records",
"dark descent records",
"iron bonehead",
"les acteurs de l'ombre",
"vrasubatlat",
"me saco un ojo",
"hells headbangers",
"invictus productions",
"blood harvest",
"chaos records",
"adagio 830",
"floga records",
"sewer rot records",
"lavadome productions",
"memento mori",
]
# Review/news sites — use time_range year to get fresh but not stale
REVIEW_SITES = [
"metalinjection.net",
"cvltnation.com",
"decibelmag.com",
"invisibleoranges.com",
"brooklynvegan.com",
"heavyblogisheavy.com",
"nocturnalcult.com",
"themetalcrypt.com",
"angrymetalguy.com",
"sputnikmusic.com",
"nocleansinging.com",
"rateyourmusic.com",
"terrorizer.com",
"kerrang.com",
]
def build_queries() -> list[str]:
year = datetime.now().year
# rotate subgenre every 6 hours so successive runs hit different niches
idx = int(time.time() // 21600) % len(SUBGENRES)
sg = SUBGENRES[idx]
sg2 = SUBGENRES[(idx + 1) % len(SUBGENRES)]
sg3 = SUBGENRES[(idx + 2) % len(SUBGENRES)]
return [
# Subgenre new releases
f'{sg} new album release {year}',
f'{sg2} new album {year}',
f'{sg3} full album stream {year}',
# Bandcamp label searches
f'site:bandcamp.com "sentient ruin" new {year}',
f'site:bandcamp.com "20 buck spin" new {year}',
f'site:bandcamp.com "prosthetic records" {year}',
f'site:bandcamp.com "unique leader records" {year}',
f'site:bandcamp.com "redefining darkness" {year}',
f'site:bandcamp.com "profound lore" {year}',
f'site:bandcamp.com "season of mist" {year}',
f'site:bandcamp.com "century media" {year}',
f'site:bandcamp.com "nuclear blast" {year}',
# Bandcamp genre pages
f'site:bandcamp.com {sg} album {year}',
f'site:bandcamp.com {sg2} album {year}',
# Metal reviews / coverage
f'site:metal-archives.com new album review {year}',
f'site:metalinjection.net new album {year}',
f'site:cvltnation.com new album {year}',
f'site:decibelmag.com new album stream {year}',
f'site:invisibleoranges.com album review {year}',
f'site:brooklynvegan.com {sg} new album {year}',
# Obscure/underground
f'underground {sg} demo release {year}',
f'independent {sg} album bandcamp {year}',
f'new {sg} EP release {year}',
# Michigan / regional bias
f'Michigan metal band new album {year}',
f'Detroit metal band new release {year}',
# Best-of / roundup (useful for discovery)
f'best {sg} albums {year}',
f'new {sg} releases {year} bandcamp',
# Last.fm tag pages
f'site:last.fm {sg} new {year}',
# Full-album YouTube
f'"{sg}" "full album" {year} new band',
]
# Rotate through 6 subgenres per run (changes every 4 hours)
idx = int(time.time() // 14400) % len(SUBGENRES)
sgs = [SUBGENRES[(idx + i) % len(SUBGENRES)] for i in range(6)]
# Rotate through label batches (every 6 hours, 5 labels at a time)
label_idx = int(time.time() // 21600) % len(BANDCAMP_LABELS)
labels = [BANDCAMP_LABELS[(label_idx + i) % len(BANDCAMP_LABELS)] for i in range(5)]
# Rotate review sites (every 12 hours, 4 sites at a time)
site_idx = int(time.time() // 43200) % len(REVIEW_SITES)
sites = [REVIEW_SITES[(site_idx + i) % len(REVIEW_SITES)] for i in range(4)]
queries = []
# Subgenre releases — mix of year-scoped and unscoped for breadth
for sg in sgs[:3]:
queries.append(f'{sg} new album release {year}')
for sg in sgs[3:]:
queries.append(f'{sg} new album bandcamp')
# Bandcamp direct — no time_range, catches more
for sg in sgs[:2]:
queries.append(f'site:bandcamp.com {sg} album {year}')
queries.append(f'site:bandcamp.com {sg} new release')
# Label searches — broad, no time restriction
for label in labels:
queries.append(f'site:bandcamp.com "{label}"')
# Review sites with year scope
for site in sites:
queries.append(f'site:{site} album review {year}')
queries.append(f'site:{site} new album {year}')
# Bandcamp underground / demo scene
for sg in sgs[:2]:
queries.append(f'underground {sg} demo {year} bandcamp')
queries.append(f'new {sg} EP {year} bandcamp')
# Michigan / regional
queries.append(f'Michigan metal band new album {year}')
queries.append(f'Detroit metal band new release {year}')
queries.append(f'Midwest metal new release {year} bandcamp')
# Metal Archives new reviews (no time_range — MA is evergreen)
queries.append(f'site:metal-archives.com album review {year}')
queries.append(f'site:metal-archives.com new band {year}')
# Broad discovery
queries.append(f'best underground metal albums {year}')
queries.append(f'new metal releases {year} bandcamp full stream')
queries.append(f'hidden gem metal album {year}')
queries.append(f'obscure metal release {year}')
queries.append(f'metal full album stream {year} new band')
return queries
# ── NOISE FILTERS ────────────────────────────────────────────────────────────
@ -255,16 +313,22 @@ async def search_candidates(client: httpx.AsyncClient) -> list[dict]:
log.info(f"Running {len(queries)} SearXNG queries")
async def one_query(q: str) -> list[dict]:
# Use year time_range only for queries that include the current year literal;
# skip time_range for Bandcamp label/genre queries to maximise breadth
use_time_range = "site:bandcamp.com" not in q and "metal-archives" not in q
params: dict = {"q": q, "format": "json"}
if use_time_range:
params["time_range"] = "year"
try:
r = await client.get(
f"{settings.searxng_url}/search",
params={"q": q, "format": "json", "time_range": "month"},
timeout=20,
params=params,
timeout=25,
)
if r.status_code != 200:
return []
data = r.json()
results = data.get("results", [])[:15] # top 15 per query
results = data.get("results", [])[:25] # top 25 per query
out = []
for res in results:
title = res.get("title", "")
@ -363,56 +427,41 @@ async def score_candidates(
return scored
async def prefilter_candidates(
client: httpx.AsyncClient, candidates: list[dict]
async def prefilter_batch(
client: httpx.AsyncClient, batch: list[dict], batch_offset: int
) -> list[dict]:
"""Use mistral-nemo to prefilter — keep actual album releases, drop noise."""
if not candidates:
return []
# Send in batches of 40
batch = candidates[:80]
"""Run prefilter on one batch. batch_offset adjusts index for provenance mapping."""
items_for_prompt = [
{"index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "url": c.get("url", "")}
for i, c in enumerate(batch)
]
prompt = f"""You are a metal music expert. Review these {len(items_for_prompt)} items from web searches.
Return ONLY a JSON array of items that are actual metal album or EP releases (not news articles, not Wikipedia, not lists).
For each item kept, include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0).
Return ONLY a JSON array of items that are actual metal album or EP releases (not news, not Wikipedia, not lists, not tour dates).
For each kept item include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0).
Be inclusive keep anything that looks like it could be a real release even if you're not certain.
Items:
{json.dumps(items_for_prompt, indent=2)}
Return ONLY valid JSON array starting with ["""
body = {
"model": "mistral-nemo:latest",
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 4000},
"options": {"temperature": 0.1, "num_predict": 6000},
}
try:
r = await client.post(
f"{settings.ollama_url}/api/chat",
json=body,
timeout=120,
)
r = await client.post(f"{settings.ollama_url}/api/chat", json=body, timeout=150)
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
except Exception as e:
log.error(f"Prefilter failed: {e}")
return candidates[:40] # fall through with top candidates
log.warning(f"Prefilter batch failed: {e}")
return batch # pass through on error
# Extract JSON from response
parsed = extract_json_array(raw)
if not parsed:
log.warning("Prefilter returned no parseable JSON — using all candidates")
return candidates[:40]
log.info(f"Prefilter kept {len(parsed)}/{len(batch)} candidates")
log.warning(f"Prefilter batch {batch_offset}: no JSON — passing through")
return batch
# Map back to full candidate objects with prefilter data
enriched = []
for p in parsed:
idx = p.get("index")
@ -426,10 +475,35 @@ Return ONLY valid JSON array starting with ["""
"confidence": p.get("confidence", 0.5),
}
enriched.append(orig)
return enriched
async def prefilter_candidates(
client: httpx.AsyncClient, candidates: list[dict]
) -> list[dict]:
"""Run prefilter on all candidates in parallel batches of 35."""
if not candidates:
return []
BATCH_SIZE = 35
batches = [candidates[i:i+BATCH_SIZE] for i in range(0, min(len(candidates), 200), BATCH_SIZE)]
log.info(f"Prefiltering {len(candidates)} candidates in {len(batches)} parallel batches")
# Run up to 3 batches concurrently to avoid OOMing Ollama
all_enriched = []
for chunk_start in range(0, len(batches), 3):
chunk = batches[chunk_start:chunk_start+3]
results = await asyncio.gather(*[
prefilter_batch(client, b, chunk_start * BATCH_SIZE + i * BATCH_SIZE)
for i, b in enumerate(chunk)
])
for r in results:
all_enriched.extend(r)
log.info(f"Prefilter kept {len(all_enriched)}/{len(candidates)} candidates")
return all_enriched
async def curate_picks(
client: httpx.AsyncClient, candidates: list[dict], taste: dict
) -> list[dict]:
@ -437,7 +511,7 @@ async def curate_picks(
if not candidates:
return []
top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:20]
top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:30]
top_artists = [a["name"] for a in taste.get("topArtists", [])[:20]]
recent_artists = [a["name"] for a in taste.get("recentArtists", [])[:10]]
year = datetime.now().year
@ -497,19 +571,19 @@ Return ONLY a JSON array starting with ["""
raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else ""
picks = extract_json_array(raw)
if picks:
# Validate provenance: url must match a search result OR index must be valid
# Validate provenance: keep picks that reference a real input item
validated = []
for p in picks:
if p.get("score", 0) < 60:
if p.get("score", 0) < 50:
continue
idx = p.get("index")
url = p.get("url", "")
if idx is not None and idx in top_by_index:
# Merge URL from original candidate if LLM dropped it
if not url:
p["url"] = top_by_index[idx].get("url", "")
# Always use original URL from search result
p["url"] = top_by_index[idx].get("url", url)
validated.append(p)
elif url and url in known_urls:
elif url and any(url.startswith(ku[:40]) for ku in known_urls if ku):
# URL prefix match (handles trailing-slash variants)
validated.append(p)
else:
log.debug(f"Dropped hallucinated pick: {p.get('artist')}{p.get('album')}")
@ -650,11 +724,11 @@ async def pipeline_run(run_id: str):
log_step("Curating picks with Mistral")
picks = await curate_picks(client, prefiltered, taste)
log_step(f"{len(picks)} picks scored ≥60")
log_step(f"{len(picks)} picks scored ≥50")
if picks:
# Mark all prefiltered candidates as seen (not just picks) to avoid re-processing
await mark_seen(prefiltered)
# Mark only actual picks as seen — unselected candidates stay eligible for re-eval
await mark_seen(picks)
await write_picks(picks, {
"candidates": len(candidates),
"filtered": len(prefiltered),

Loading…
Cancel
Save