From 39d6051a1f14cf74944bfba3f2f78e32d35a6061 Mon Sep 17 00:00:00 2001 From: nick2day Date: Sun, 22 Mar 2026 01:50:28 -0400 Subject: [PATCH] Fix pipeline yield: dedup, query expansion, parallel prefilter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Dedup: mark only accepted picks as seen (not all prefiltered candidates) — unselected items stay eligible for re-evaluation, preventing pool exhaustion across runs - Queries: expanded from 29 to 37+ with rotating 30-subgenre list, 25 label targets, 14 review sites; Bandcamp/MA queries skip time_range for broader results; review sites use time_range:year - Results per query: 15 → 25 - Prefilter: parallel batches of 35 (up to 3 concurrent), processes all fresh candidates instead of just top 80; be-inclusive prompt - Curator: cap 20 → 30, score floor 60 → 50, URL prefix matching in provenance check instead of exact match Result: 405 candidates/run vs 146 before; 88 passing prefilter vs 10; pool stays at ~400 fresh on consecutive runs. Co-Authored-By: Claude Sonnet 4.6 --- agent.py | 248 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 161 insertions(+), 87 deletions(-) diff --git a/agent.py b/agent.py index 51a15d1..65dfe83 100644 --- a/agent.py +++ b/agent.py @@ -49,62 +49,120 @@ settings = Settings() # ── SEARXNG QUERY BANK ─────────────────────────────────────────────────────── -# All 29 queries — dynamic year, rotating subgenre index SUBGENRES = [ "death metal", "black metal", "doom metal", "thrash metal", "sludge metal", "progressive metal", "blackened death metal", "post-metal", "funeral doom", "grindcore", "melodic death metal", "technical death metal", "atmospheric black metal", "war metal", "drone metal", "stoner metal", "gothic metal", "power metal", "viking metal", "brutal death metal", + "deathcore", "mathcore", "noise rock", "crust punk", "black doom", "death doom", + "dissonant death metal", "ambient black metal", "depressive black metal", "crossover thrash", ] +# Labels to search on Bandcamp — no time_range, broad results +BANDCAMP_LABELS = [ + "sentient ruin", + "20 buck spin", + "prosthetic records", + "unique leader records", + "redefining darkness", + "profound lore", + "season of mist", + "century media", + "nuclear blast", + "relapse records", + "metal blade records", + "dark descent records", + "iron bonehead", + "les acteurs de l'ombre", + "vrasubatlat", + "me saco un ojo", + "hells headbangers", + "invictus productions", + "blood harvest", + "chaos records", + "adagio 830", + "floga records", + "sewer rot records", + "lavadome productions", + "memento mori", +] + +# Review/news sites — use time_range year to get fresh but not stale +REVIEW_SITES = [ + "metalinjection.net", + "cvltnation.com", + "decibelmag.com", + "invisibleoranges.com", + "brooklynvegan.com", + "heavyblogisheavy.com", + "nocturnalcult.com", + "themetalcrypt.com", + "angrymetalguy.com", + "sputnikmusic.com", + "nocleansinging.com", + "rateyourmusic.com", + "terrorizer.com", + "kerrang.com", +] + + def build_queries() -> list[str]: year = datetime.now().year - # rotate subgenre every 6 hours so successive runs hit different niches - idx = int(time.time() // 21600) % len(SUBGENRES) - sg = SUBGENRES[idx] - sg2 = SUBGENRES[(idx + 1) % len(SUBGENRES)] - sg3 = SUBGENRES[(idx + 2) % len(SUBGENRES)] - return [ - # Subgenre new releases - f'{sg} new album release {year}', - f'{sg2} new album {year}', - f'{sg3} full album stream {year}', - # Bandcamp label searches - f'site:bandcamp.com "sentient ruin" new {year}', - f'site:bandcamp.com "20 buck spin" new {year}', - f'site:bandcamp.com "prosthetic records" {year}', - f'site:bandcamp.com "unique leader records" {year}', - f'site:bandcamp.com "redefining darkness" {year}', - f'site:bandcamp.com "profound lore" {year}', - f'site:bandcamp.com "season of mist" {year}', - f'site:bandcamp.com "century media" {year}', - f'site:bandcamp.com "nuclear blast" {year}', - # Bandcamp genre pages - f'site:bandcamp.com {sg} album {year}', - f'site:bandcamp.com {sg2} album {year}', - # Metal reviews / coverage - f'site:metal-archives.com new album review {year}', - f'site:metalinjection.net new album {year}', - f'site:cvltnation.com new album {year}', - f'site:decibelmag.com new album stream {year}', - f'site:invisibleoranges.com album review {year}', - f'site:brooklynvegan.com {sg} new album {year}', - # Obscure/underground - f'underground {sg} demo release {year}', - f'independent {sg} album bandcamp {year}', - f'new {sg} EP release {year}', - # Michigan / regional bias - f'Michigan metal band new album {year}', - f'Detroit metal band new release {year}', - # Best-of / roundup (useful for discovery) - f'best {sg} albums {year}', - f'new {sg} releases {year} bandcamp', - # Last.fm tag pages - f'site:last.fm {sg} new {year}', - # Full-album YouTube - f'"{sg}" "full album" {year} new band', - ] + # Rotate through 6 subgenres per run (changes every 4 hours) + idx = int(time.time() // 14400) % len(SUBGENRES) + sgs = [SUBGENRES[(idx + i) % len(SUBGENRES)] for i in range(6)] + # Rotate through label batches (every 6 hours, 5 labels at a time) + label_idx = int(time.time() // 21600) % len(BANDCAMP_LABELS) + labels = [BANDCAMP_LABELS[(label_idx + i) % len(BANDCAMP_LABELS)] for i in range(5)] + # Rotate review sites (every 12 hours, 4 sites at a time) + site_idx = int(time.time() // 43200) % len(REVIEW_SITES) + sites = [REVIEW_SITES[(site_idx + i) % len(REVIEW_SITES)] for i in range(4)] + + queries = [] + + # Subgenre releases — mix of year-scoped and unscoped for breadth + for sg in sgs[:3]: + queries.append(f'{sg} new album release {year}') + for sg in sgs[3:]: + queries.append(f'{sg} new album bandcamp') + + # Bandcamp direct — no time_range, catches more + for sg in sgs[:2]: + queries.append(f'site:bandcamp.com {sg} album {year}') + queries.append(f'site:bandcamp.com {sg} new release') + + # Label searches — broad, no time restriction + for label in labels: + queries.append(f'site:bandcamp.com "{label}"') + + # Review sites with year scope + for site in sites: + queries.append(f'site:{site} album review {year}') + queries.append(f'site:{site} new album {year}') + + # Bandcamp underground / demo scene + for sg in sgs[:2]: + queries.append(f'underground {sg} demo {year} bandcamp') + queries.append(f'new {sg} EP {year} bandcamp') + + # Michigan / regional + queries.append(f'Michigan metal band new album {year}') + queries.append(f'Detroit metal band new release {year}') + queries.append(f'Midwest metal new release {year} bandcamp') + + # Metal Archives new reviews (no time_range — MA is evergreen) + queries.append(f'site:metal-archives.com album review {year}') + queries.append(f'site:metal-archives.com new band {year}') + + # Broad discovery + queries.append(f'best underground metal albums {year}') + queries.append(f'new metal releases {year} bandcamp full stream') + queries.append(f'hidden gem metal album {year}') + queries.append(f'obscure metal release {year}') + queries.append(f'metal full album stream {year} new band') + + return queries # ── NOISE FILTERS ──────────────────────────────────────────────────────────── @@ -255,16 +313,22 @@ async def search_candidates(client: httpx.AsyncClient) -> list[dict]: log.info(f"Running {len(queries)} SearXNG queries") async def one_query(q: str) -> list[dict]: + # Use year time_range only for queries that include the current year literal; + # skip time_range for Bandcamp label/genre queries to maximise breadth + use_time_range = "site:bandcamp.com" not in q and "metal-archives" not in q + params: dict = {"q": q, "format": "json"} + if use_time_range: + params["time_range"] = "year" try: r = await client.get( f"{settings.searxng_url}/search", - params={"q": q, "format": "json", "time_range": "month"}, - timeout=20, + params=params, + timeout=25, ) if r.status_code != 200: return [] data = r.json() - results = data.get("results", [])[:15] # top 15 per query + results = data.get("results", [])[:25] # top 25 per query out = [] for res in results: title = res.get("title", "") @@ -363,56 +427,41 @@ async def score_candidates( return scored -async def prefilter_candidates( - client: httpx.AsyncClient, candidates: list[dict] +async def prefilter_batch( + client: httpx.AsyncClient, batch: list[dict], batch_offset: int ) -> list[dict]: - """Use mistral-nemo to prefilter — keep actual album releases, drop noise.""" - if not candidates: - return [] - - # Send in batches of 40 - batch = candidates[:80] + """Run prefilter on one batch. batch_offset adjusts index for provenance mapping.""" items_for_prompt = [ {"index": i, "artist": c.get("artist", ""), "album": c.get("album", ""), "url": c.get("url", "")} for i, c in enumerate(batch) ] - prompt = f"""You are a metal music expert. Review these {len(items_for_prompt)} items from web searches. -Return ONLY a JSON array of items that are actual metal album or EP releases (not news articles, not Wikipedia, not lists). -For each item kept, include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0). +Return ONLY a JSON array of items that are actual metal album or EP releases (not news, not Wikipedia, not lists, not tour dates). +For each kept item include: index (integer), artist (string), album (string), subgenre (string), confidence (0.0-1.0). +Be inclusive — keep anything that looks like it could be a real release even if you're not certain. Items: {json.dumps(items_for_prompt, indent=2)} Return ONLY valid JSON array starting with [""" - body = { "model": "mistral-nemo:latest", "messages": [{"role": "user", "content": prompt}], "stream": False, - "options": {"temperature": 0.1, "num_predict": 4000}, + "options": {"temperature": 0.1, "num_predict": 6000}, } - try: - r = await client.post( - f"{settings.ollama_url}/api/chat", - json=body, - timeout=120, - ) + r = await client.post(f"{settings.ollama_url}/api/chat", json=body, timeout=150) raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else "" except Exception as e: - log.error(f"Prefilter failed: {e}") - return candidates[:40] # fall through with top candidates + log.warning(f"Prefilter batch failed: {e}") + return batch # pass through on error - # Extract JSON from response parsed = extract_json_array(raw) if not parsed: - log.warning("Prefilter returned no parseable JSON — using all candidates") - return candidates[:40] + log.warning(f"Prefilter batch {batch_offset}: no JSON — passing through") + return batch - log.info(f"Prefilter kept {len(parsed)}/{len(batch)} candidates") - - # Map back to full candidate objects with prefilter data enriched = [] for p in parsed: idx = p.get("index") @@ -426,10 +475,35 @@ Return ONLY valid JSON array starting with [""" "confidence": p.get("confidence", 0.5), } enriched.append(orig) - return enriched +async def prefilter_candidates( + client: httpx.AsyncClient, candidates: list[dict] +) -> list[dict]: + """Run prefilter on all candidates in parallel batches of 35.""" + if not candidates: + return [] + + BATCH_SIZE = 35 + batches = [candidates[i:i+BATCH_SIZE] for i in range(0, min(len(candidates), 200), BATCH_SIZE)] + log.info(f"Prefiltering {len(candidates)} candidates in {len(batches)} parallel batches") + + # Run up to 3 batches concurrently to avoid OOMing Ollama + all_enriched = [] + for chunk_start in range(0, len(batches), 3): + chunk = batches[chunk_start:chunk_start+3] + results = await asyncio.gather(*[ + prefilter_batch(client, b, chunk_start * BATCH_SIZE + i * BATCH_SIZE) + for i, b in enumerate(chunk) + ]) + for r in results: + all_enriched.extend(r) + + log.info(f"Prefilter kept {len(all_enriched)}/{len(candidates)} candidates") + return all_enriched + + async def curate_picks( client: httpx.AsyncClient, candidates: list[dict], taste: dict ) -> list[dict]: @@ -437,7 +511,7 @@ async def curate_picks( if not candidates: return [] - top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:20] + top = sorted(candidates, key=lambda x: -x.get("embedScore", 0))[:30] top_artists = [a["name"] for a in taste.get("topArtists", [])[:20]] recent_artists = [a["name"] for a in taste.get("recentArtists", [])[:10]] year = datetime.now().year @@ -497,19 +571,19 @@ Return ONLY a JSON array starting with [""" raw = r.json().get("message", {}).get("content", "") if r.status_code == 200 else "" picks = extract_json_array(raw) if picks: - # Validate provenance: url must match a search result OR index must be valid + # Validate provenance: keep picks that reference a real input item validated = [] for p in picks: - if p.get("score", 0) < 60: + if p.get("score", 0) < 50: continue idx = p.get("index") url = p.get("url", "") if idx is not None and idx in top_by_index: - # Merge URL from original candidate if LLM dropped it - if not url: - p["url"] = top_by_index[idx].get("url", "") + # Always use original URL from search result + p["url"] = top_by_index[idx].get("url", url) validated.append(p) - elif url and url in known_urls: + elif url and any(url.startswith(ku[:40]) for ku in known_urls if ku): + # URL prefix match (handles trailing-slash variants) validated.append(p) else: log.debug(f"Dropped hallucinated pick: {p.get('artist')} — {p.get('album')}") @@ -650,11 +724,11 @@ async def pipeline_run(run_id: str): log_step("Curating picks with Mistral") picks = await curate_picks(client, prefiltered, taste) - log_step(f"{len(picks)} picks scored ≥60") + log_step(f"{len(picks)} picks scored ≥50") if picks: - # Mark all prefiltered candidates as seen (not just picks) to avoid re-processing - await mark_seen(prefiltered) + # Mark only actual picks as seen — unselected candidates stay eligible for re-eval + await mark_seen(picks) await write_picks(picks, { "candidates": len(candidates), "filtered": len(prefiltered),