From 6d1c7eb3c5c38f92aaced7ca0fd291e15607b591 Mon Sep 17 00:00:00 2001 From: Albert Date: Mon, 25 May 2026 10:37:08 -0400 Subject: [PATCH] feat: add forward-geocode to scheduler pipeline - Wire enrich-with-forward-geocode.ts as scheduler job type - Add geocode-enrichment pipeline group (500/cycle, post-imports) - Harden transfer script: skip churches at (0,0) coordinates - Rewrite dedup-mass-schedules.ts with raw SQL to avoid Prisma 7 stack overflow Co-Authored-By: Claude Opus 4.7 --- scripts/dedup-mass-schedules.ts | 83 +++++++++++++++------------- scripts/scheduler.ts | 13 +++++ scripts/transfer-enriched-to-neon.ts | 3 +- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/scripts/dedup-mass-schedules.ts b/scripts/dedup-mass-schedules.ts index b94fa24..3b81895 100644 --- a/scripts/dedup-mass-schedules.ts +++ b/scripts/dedup-mass-schedules.ts @@ -3,12 +3,13 @@ import dotenv from 'dotenv'; import path from 'path'; dotenv.config({ path: path.resolve(process.cwd(), '.env.local') }); import { Pool } from 'pg'; -import { PrismaPg } from '@prisma/adapter-pg'; -import { PrismaClient } from '@prisma/client'; const pool = new Pool({ connectionString: process.env.DATABASE_URL }); -const adapter = new PrismaPg(pool); -const prisma = new PrismaClient({ adapter }); + +interface CountResult { + churches_with_dups: string; + duplicate_rows: string; +} async function main() { const dryRun = !process.argv.includes('--execute'); @@ -17,44 +18,52 @@ async function main() { console.log('DRY RUN - pass --execute to actually delete duplicates\n'); } - const churches = await prisma.church.findMany({ - where: { massSchedules: { some: { isActive: true } } }, - include: { massSchedules: { where: { isActive: true }, orderBy: { createdAt: 'asc' } } }, - }); + const client = await pool.connect(); - let churchesFixed = 0; - let rowsDeleted = 0; + try { + const countResult = await client.query(` + WITH ranked AS ( + SELECT church_id, + ROW_NUMBER() OVER ( + PARTITION BY church_id, day_of_week, time, language + ORDER BY created_at ASC + ) AS rn + FROM mass_schedules + WHERE is_active = true + ) + SELECT COUNT(DISTINCT church_id) AS churches_with_dups, + COUNT(*) AS duplicate_rows + FROM ranked + WHERE rn > 1; + `); - for (const church of churches) { - const seen = new Map(); - const toDelete: string[] = []; + const { churches_with_dups, duplicate_rows } = countResult.rows[0]; + console.log(`Churches with duplicate schedules: ${churches_with_dups}`); + console.log(`Duplicate rows to ${dryRun ? 'delete' : 'delete'}: ${duplicate_rows}\n`); - for (const m of church.massSchedules) { - const key = `${m.dayOfWeek}:${m.time}:${m.language}`; - if (seen.has(key)) { - toDelete.push(m.id); - } else { - seen.set(key, m.id); - } - } - - if (toDelete.length > 0) { - churchesFixed++; - rowsDeleted += toDelete.length; - - if (!dryRun) { - await prisma.massSchedule.deleteMany({ - where: { id: { in: toDelete } }, - }); - } + if (!dryRun && Number(duplicate_rows) > 0) { + console.log('Deleting duplicates (keeping oldest by created_at)...'); + + const deleteResult = await client.query(` + WITH ranked AS ( + SELECT id, + ROW_NUMBER() OVER ( + PARTITION BY church_id, day_of_week, time, language + ORDER BY created_at ASC + ) AS rn + FROM mass_schedules + WHERE is_active = true + ) + DELETE FROM mass_schedules + WHERE id IN (SELECT id FROM ranked WHERE rn > 1); + `); + + console.log(`Deleted ${deleteResult.rowCount} duplicate mass schedule rows.`); } + } finally { + client.release(); + await pool.end(); } - - console.log(`Churches with duplicates: ${churchesFixed}`); - console.log(`Duplicate rows ${dryRun ? 'found' : 'deleted'}: ${rowsDeleted}`); - - await prisma.$disconnect(); - await pool.end(); } main().catch((err) => { diff --git a/scripts/scheduler.ts b/scripts/scheduler.ts index 7e06c90..3f9076b 100644 --- a/scripts/scheduler.ts +++ b/scripts/scheduler.ts @@ -59,6 +59,13 @@ const PIPELINE_GROUPS: PipelineGroup[] = [ { name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} }, ], }, + { + name: 'geocode-enrichment', + mode: 'sequential', + phases: [ + { name: 'forward-geocode', type: 'forward-geocode-enrichment', config: { limit: 500 } }, + ], + }, { name: 'scrapers-batch-1', mode: 'parallel', @@ -138,6 +145,12 @@ function getJobCommand(type: string, language?: string | null, config?: Record