#!/usr/bin/env tsx /** * Enrich churches with city/state/zip via Nominatim reverse geocoding (OSM) * * Usage: * npx tsx scripts/enrich-with-reverse-geocode.ts --country FR --limit 10 --dry-run * npx tsx scripts/enrich-with-reverse-geocode.ts --country FR --continuous * npx tsx scripts/enrich-with-reverse-geocode.ts --continuous * * Rate limit: 1 request/second (Nominatim usage policy — mandatory). * Full pass of ~193K churches in ~2 days. */ import dotenv from 'dotenv'; import path from 'path'; dotenv.config({ path: path.resolve(process.cwd(), '.env') }); import { Pool } from 'pg'; import { PrismaPg } from '@prisma/adapter-pg'; import { PrismaClient } from '@prisma/client'; import axios from 'axios'; // Fresh DB connection (not cached singleton) const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const adapter = new PrismaPg(pool); const prisma = new PrismaClient({ adapter }); const NOMINATIM_URL = 'https://nominatim.openstreetmap.org/reverse'; const RATE_LIMIT_MS = 1100; // Slightly over 1s to stay safe const BATCH_SIZE = 50; const PROGRESS_INTERVAL = 10; // --- Job Tracking --- async function createOrResumeJob(args: string[]): Promise { const jobIdIndex = args.indexOf('--job-id'); if (jobIdIndex !== -1) { const jobId = args[jobIdIndex + 1]; await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: 'running', startedAt: new Date() }, }); return jobId; } return null; } async function createNewJob(config: Record): Promise { const job = await prisma.backgroundJob.create({ data: { type: 'reverse-geocode-enrichment', status: 'running', startedAt: new Date(), config, }, }); return job.id; } async function updateJobProgress(jobId: string, stats: EnrichmentStats, totalItems: number): Promise { await prisma.backgroundJob.update({ where: { id: jobId }, data: { processed: stats.processed, succeeded: stats.enriched, failed: stats.errors, itemsFound: stats.enriched, totalItems, }, }); } async function checkJobStopping(jobId: string): Promise { const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } }); return job?.status === 'stopping'; } async function completeJob(jobId: string, error?: string): Promise { await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: error ? 'failed' : 'completed', error, completedAt: new Date(), }, }); } // --- Types --- interface ChurchRecord { id: string; name: string; address: string | null; city: string | null; state: string | null; zip: string | null; country: string; latitude: number; longitude: number; } interface NominatimAddress { house_number?: string; road?: string; city?: string; town?: string; village?: string; municipality?: string; hamlet?: string; suburb?: string; neighbourhood?: string; state?: string; province?: string; postcode?: string; country_code?: string; } interface NominatimResponse { display_name?: string; address?: NominatimAddress; error?: string; } interface EnrichmentStats { processed: number; enriched: number; noCity: number; errors: number; skippedExisting: number; cycles: number; startTime: number; } // --- Circuit Breaker --- class CircuitBreaker { private failures = 0; private isOpen = false; private backoffMs = 60000; // Start at 60s for Nominatim private readonly maxBackoffMs = 300000; // 5 minutes private readonly threshold = 5; async checkAndWait(): Promise { if (!this.isOpen) return true; log(`Circuit breaker open. Waiting ${Math.round(this.backoffMs / 1000)}s before retry...`); await sleep(this.backoffMs); // Try a test request try { const resp = await axios.get(NOMINATIM_URL, { params: { lat: 48.8566, lon: 2.3522, format: 'json' }, headers: { 'User-Agent': 'NearestMass/1.0 (privacy@nearestmass.com)' }, timeout: 10000, }); if (resp.status === 200) { this.reset(); log('Circuit breaker closed: Nominatim is back'); return true; } } catch { // Still down } this.backoffMs = Math.min(this.backoffMs * 2, this.maxBackoffMs); return false; } recordFailure() { this.failures++; if (this.failures >= this.threshold && !this.isOpen) { this.isOpen = true; this.backoffMs = 60000; log(`Circuit breaker OPEN after ${this.failures} consecutive failures`); } } reset() { if (this.failures > 0 || this.isOpen) { this.failures = 0; this.isOpen = false; this.backoffMs = 60000; } } get opened() { return this.isOpen; } } // --- Helpers --- let shuttingDown = false; function log(msg: string) { console.log(`[${new Date().toISOString()}] ${msg}`); } function logError(msg: string) { console.error(`[${new Date().toISOString()}] ${msg}`); } function sleep(ms: number): Promise { return new Promise(resolve => { const timer = setTimeout(resolve, ms); const check = setInterval(() => { if (shuttingDown) { clearTimeout(timer); clearInterval(check); resolve(); } }, 1000); setTimeout(() => clearInterval(check), ms + 100); }); } // --- Nominatim API --- async function reverseGeocode(lat: number, lng: number): Promise { const response = await axios.get(NOMINATIM_URL, { params: { lat, lon: lng, format: 'json', zoom: 16, addressdetails: 1, }, headers: { 'User-Agent': 'NearestMass/1.0 (privacy@nearestmass.com)', 'Accept-Language': 'en', }, timeout: 15000, }); return response.data; } function extractCity(address: NominatimAddress): string | null { return address.city || address.town || address.village || address.municipality || address.hamlet || null; } function extractState(address: NominatimAddress): string | null { return address.state || address.province || null; } function extractAddress(address: NominatimAddress): string | null { const parts: string[] = []; if (address.house_number) parts.push(address.house_number); if (address.road) parts.push(address.road); if (parts.length === 0) return null; return parts.join(' '); } // --- Database Queries --- async function getNextBatch( batchSize: number, countryCode?: string, ): Promise { return prisma.church.findMany({ where: { city: null, latitude: { not: undefined }, longitude: { not: undefined }, reverseGeocodedAt: null, ...(countryCode ? { country: countryCode } : {}), }, select: { id: true, name: true, address: true, city: true, state: true, zip: true, country: true, latitude: true, longitude: true, }, take: batchSize, orderBy: [ { country: 'asc' }, { createdAt: 'asc' }, ], }); } async function getTotalRemaining(countryCode?: string): Promise { return prisma.church.count({ where: { city: null, latitude: { not: undefined }, longitude: { not: undefined }, reverseGeocodedAt: null, ...(countryCode ? { country: countryCode } : {}), }, }); } // --- Main Processing --- async function processChurch( church: ChurchRecord, stats: EnrichmentStats, dryRun: boolean, ): Promise { const label = `${church.name} (${church.country})`; try { const result = await reverseGeocode(church.latitude, church.longitude); if (result.error || !result.address) { log(` - [${stats.processed}] ${label} => no address data`); stats.noCity++; if (!dryRun) { await prisma.church.update({ where: { id: church.id }, data: { reverseGeocodedAt: new Date() }, }); } return; } const address = extractAddress(result.address); const city = extractCity(result.address); const state = extractState(result.address); const zip = result.address.postcode || null; if (city) { const addrStr = address ? `${address}, ` : ''; log(` + [${stats.processed}] ${label} => ${addrStr}${city}, ${state || '?'}`); stats.enriched++; } else { log(` - [${stats.processed}] ${label} => no city in response`); stats.noCity++; } if (!dryRun) { const updateData: Record = { reverseGeocodedAt: new Date(), }; // Only update fields that are currently null if (address && !church.address) updateData.address = address; if (city && !church.city) updateData.city = city; if (state && !church.state) updateData.state = state; if (zip && !church.zip) updateData.zip = zip; // Update country if currently unknown (XX) and Nominatim returned one const countryCodeResult = result.address.country_code?.toUpperCase(); if (church.country === 'XX' && countryCodeResult && countryCodeResult !== 'XX') { updateData.country = countryCodeResult; } await prisma.church.update({ where: { id: church.id }, data: updateData, }); } } catch (error: any) { stats.errors++; // Handle rate limiting (429) if (error.response?.status === 429) { logError(` ! [${stats.processed}] ${label} => rate limited (429), backing off...`); await sleep(5000); // Extra 5s backoff throw error; } // Handle server errors (5xx) if (error.response?.status >= 500) { logError(` ! [${stats.processed}] ${label} => server error (${error.response.status})`); throw error; } logError(` ! [${stats.processed}] ${label} => ${error.message}`); // Don't throw for non-retriable errors (just mark as attempted) if (!dryRun) { await prisma.church.update({ where: { id: church.id }, data: { reverseGeocodedAt: new Date() }, }); } } } async function runSinglePass( stats: EnrichmentStats, countryCode?: string, limit?: number, dryRun: boolean = false, jobId?: string | null, ): Promise { let totalProcessed = 0; const circuitBreaker = new CircuitBreaker(); while (!shuttingDown) { if (limit && totalProcessed >= limit) break; // Circuit breaker check if (circuitBreaker.opened) { const ok = await circuitBreaker.checkAndWait(); if (!ok) continue; } const batchLimit = limit ? Math.min(BATCH_SIZE, limit - totalProcessed) : BATCH_SIZE; const churches = await getNextBatch(batchLimit, countryCode); if (churches.length === 0) break; for (const church of churches) { if (shuttingDown) break; if (limit && totalProcessed >= limit) break; stats.processed++; totalProcessed++; try { await processChurch(church, stats, dryRun); circuitBreaker.reset(); } catch (error: any) { circuitBreaker.recordFailure(); // Already logged in processChurch } // Rate limit: 1 request per second if (!shuttingDown) { await sleep(RATE_LIMIT_MS); } // Job tracking: update progress every PROGRESS_INTERVAL items if (jobId && stats.processed % PROGRESS_INTERVAL === 0) { await updateJobProgress(jobId, stats, 0); const stopping = await checkJobStopping(jobId); if (stopping) { log('Job stop requested via admin dashboard.'); shuttingDown = true; break; } } // Progress logging if (stats.processed % 100 === 0) { const elapsed = (Date.now() - stats.startTime) / 1000; const rate = Math.round((stats.processed / elapsed) * 3600); const enrichRate = stats.processed > 0 ? ((stats.enriched / stats.processed) * 100).toFixed(1) : '0.0'; log(`Progress: ${stats.processed} processed, ${stats.enriched} enriched, ${stats.noCity} no-city, ${stats.errors} errors`); log(` Enrich rate: ${enrichRate}%, Rate: ~${rate}/hour`); } } } } async function runContinuous( stats: EnrichmentStats, countryCode?: string, jobId?: string | null, ): Promise { log('Running in continuous mode. Press Ctrl+C to stop.'); const circuitBreaker = new CircuitBreaker(); while (!shuttingDown) { stats.cycles++; log(`--- Cycle ${stats.cycles} ---`); let processedInCycle = 0; while (!shuttingDown) { // Circuit breaker check if (circuitBreaker.opened) { const ok = await circuitBreaker.checkAndWait(); if (!ok) continue; } const churches = await getNextBatch(BATCH_SIZE, countryCode); if (churches.length === 0) break; for (const church of churches) { if (shuttingDown) break; stats.processed++; processedInCycle++; try { await processChurch(church, stats, false); circuitBreaker.reset(); } catch { circuitBreaker.recordFailure(); } // Rate limit if (!shuttingDown) { await sleep(RATE_LIMIT_MS); } // Job tracking if (jobId && stats.processed % PROGRESS_INTERVAL === 0) { await updateJobProgress(jobId, stats, 0); const stopping = await checkJobStopping(jobId); if (stopping) { log('Job stop requested via admin dashboard.'); shuttingDown = true; break; } } // Progress logging if (stats.processed % 100 === 0) { const elapsed = (Date.now() - stats.startTime) / 1000; const rate = Math.round((stats.processed / elapsed) * 3600); log(`Progress: ${stats.processed} processed, ${stats.enriched} enriched, ${stats.noCity} no-city, ${stats.errors} errors (~${rate}/hour)`); } } } if (shuttingDown) break; if (processedInCycle === 0) { log('No churches needing reverse geocoding. Waiting 1 hour...'); for (let i = 0; i < 360 && !shuttingDown; i++) { await sleep(10000); } } else { log(`Cycle ${stats.cycles} complete. ${processedInCycle} churches processed. Brief pause...`); await sleep(10000); } } } // --- Main --- async function main() { const args = process.argv.slice(2); const countryIndex = args.indexOf('--country'); const limitIndex = args.indexOf('--limit'); const dryRun = args.includes('--dry-run'); const continuous = args.includes('--continuous'); const countryCode = countryIndex !== -1 ? args[countryIndex + 1] : undefined; const limit = limitIndex !== -1 ? parseInt(args[limitIndex + 1]) : undefined; // Graceful shutdown process.on('SIGTERM', () => { log('Received SIGTERM, finishing current request...'); shuttingDown = true; }); process.on('SIGINT', () => { log('Received SIGINT, finishing current request...'); shuttingDown = true; }); log('============================================================'); log('Nominatim Reverse Geocode Enrichment'); log('============================================================'); log(`Mode: ${continuous ? 'Continuous' : 'Single pass'}`); log(`Country: ${countryCode || 'All'}`); log(`Limit: ${limit || 'No limit'}`); log(`Dry run: ${dryRun ? 'Yes' : 'No'}`); log(`Rate limit: ${RATE_LIMIT_MS}ms between requests`); log('============================================================'); // Count remaining const remaining = await getTotalRemaining(countryCode); log(`Churches needing reverse geocoding: ${remaining}`); const estimatedHours = (remaining * RATE_LIMIT_MS / 1000 / 3600).toFixed(1); log(`Estimated time: ~${estimatedHours} hours @ 1 req/sec`); if (remaining === 0) { log('Nothing to do!'); await prisma.$disconnect(); await pool.end(); return; } // Job tracking let jobId = await createOrResumeJob(args); if (!jobId) { jobId = await createNewJob({ countryCode, limit, continuous, dryRun }); } log(`Job ID: ${jobId}`); const stats: EnrichmentStats = { processed: 0, enriched: 0, noCity: 0, errors: 0, skippedExisting: 0, cycles: 0, startTime: Date.now(), }; if (continuous) { await runContinuous(stats, countryCode, jobId); } else { await runSinglePass(stats, countryCode, limit, dryRun, jobId); } // Complete job if (jobId) { await updateJobProgress(jobId, stats, 0); await completeJob(jobId); } // Print summary const elapsed = ((Date.now() - stats.startTime) / 1000).toFixed(1); const enrichRate = stats.processed > 0 ? ((stats.enriched / stats.processed) * 100).toFixed(1) : '0.0'; log(''); log('============================================================'); log('Reverse Geocode Enrichment Summary'); log('============================================================'); log(`Churches processed: ${stats.processed}`); log(`Cities found: ${stats.enriched}`); log(`No city in response: ${stats.noCity}`); log(`Errors: ${stats.errors}`); log(`Enrich rate: ${enrichRate}%`); log(`Elapsed: ${elapsed}s`); if (stats.cycles > 0) { log(`Cycles completed: ${stats.cycles}`); } log('============================================================'); await prisma.$disconnect(); await pool.end(); } main().catch((error) => { logError(`Fatal error: ${error.message}`); process.exit(1); });