Reset local main to gitea/master (new source of truth) and restored local-only files: web scrapers, admin dashboard, ChromaDB integration, debug scripts, and utility libraries that aren't tracked in Gitea. Gitea master adds: discovermass, buscarmisas-network, hk-parishes, bohosluzby, kerknet, gottesdienstzeiten, miserend importers, ClaimRequest model, forward geocoding, heartbeat healthcheck. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
625 lines
17 KiB
TypeScript
625 lines
17 KiB
TypeScript
#!/usr/bin/env tsx
|
|
/**
|
|
* Enrich churches with city/state/zip via Nominatim reverse geocoding (OSM)
|
|
*
|
|
* Usage:
|
|
* npx tsx scripts/enrich-with-reverse-geocode.ts --country FR --limit 10 --dry-run
|
|
* npx tsx scripts/enrich-with-reverse-geocode.ts --country FR --continuous
|
|
* npx tsx scripts/enrich-with-reverse-geocode.ts --continuous
|
|
*
|
|
* Rate limit: 1 request/second (Nominatim usage policy — mandatory).
|
|
* Full pass of ~193K churches in ~2 days.
|
|
*/
|
|
|
|
import dotenv from 'dotenv';
|
|
import path from 'path';
|
|
dotenv.config({ path: path.resolve(process.cwd(), '.env') });
|
|
|
|
import { Pool } from 'pg';
|
|
import { PrismaPg } from '@prisma/adapter-pg';
|
|
import { PrismaClient } from '@prisma/client';
|
|
import axios from 'axios';
|
|
|
|
// Fresh DB connection (not cached singleton)
|
|
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
|
|
const adapter = new PrismaPg(pool);
|
|
const prisma = new PrismaClient({ adapter });
|
|
|
|
const NOMINATIM_URL = 'https://nominatim.openstreetmap.org/reverse';
|
|
const RATE_LIMIT_MS = 1100; // Slightly over 1s to stay safe
|
|
const BATCH_SIZE = 50;
|
|
const PROGRESS_INTERVAL = 10;
|
|
|
|
// --- Job Tracking ---
|
|
|
|
async function createOrResumeJob(args: string[]): Promise<string | null> {
|
|
const jobIdIndex = args.indexOf('--job-id');
|
|
if (jobIdIndex !== -1) {
|
|
const jobId = args[jobIdIndex + 1];
|
|
await prisma.backgroundJob.update({
|
|
where: { id: jobId },
|
|
data: { status: 'running', startedAt: new Date() },
|
|
});
|
|
return jobId;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
async function createNewJob(config: Record<string, unknown>): Promise<string> {
|
|
const job = await prisma.backgroundJob.create({
|
|
data: {
|
|
type: 'reverse-geocode-enrichment',
|
|
status: 'running',
|
|
startedAt: new Date(),
|
|
config,
|
|
},
|
|
});
|
|
return job.id;
|
|
}
|
|
|
|
async function updateJobProgress(jobId: string, stats: EnrichmentStats, totalItems: number): Promise<void> {
|
|
await prisma.backgroundJob.update({
|
|
where: { id: jobId },
|
|
data: {
|
|
processed: stats.processed,
|
|
succeeded: stats.enriched,
|
|
failed: stats.errors,
|
|
itemsFound: stats.enriched,
|
|
totalItems,
|
|
},
|
|
});
|
|
}
|
|
|
|
async function checkJobStopping(jobId: string): Promise<boolean> {
|
|
const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
|
|
return job?.status === 'stopping';
|
|
}
|
|
|
|
async function completeJob(jobId: string, error?: string): Promise<void> {
|
|
await prisma.backgroundJob.update({
|
|
where: { id: jobId },
|
|
data: {
|
|
status: error ? 'failed' : 'completed',
|
|
error,
|
|
completedAt: new Date(),
|
|
},
|
|
});
|
|
}
|
|
|
|
// --- Types ---
|
|
|
|
interface ChurchRecord {
|
|
id: string;
|
|
name: string;
|
|
address: string | null;
|
|
city: string | null;
|
|
state: string | null;
|
|
zip: string | null;
|
|
country: string;
|
|
latitude: number;
|
|
longitude: number;
|
|
}
|
|
|
|
interface NominatimAddress {
|
|
house_number?: string;
|
|
road?: string;
|
|
city?: string;
|
|
town?: string;
|
|
village?: string;
|
|
municipality?: string;
|
|
hamlet?: string;
|
|
suburb?: string;
|
|
neighbourhood?: string;
|
|
state?: string;
|
|
province?: string;
|
|
postcode?: string;
|
|
country_code?: string;
|
|
}
|
|
|
|
interface NominatimResponse {
|
|
display_name?: string;
|
|
address?: NominatimAddress;
|
|
error?: string;
|
|
}
|
|
|
|
interface EnrichmentStats {
|
|
processed: number;
|
|
enriched: number;
|
|
noCity: number;
|
|
errors: number;
|
|
skippedExisting: number;
|
|
cycles: number;
|
|
startTime: number;
|
|
}
|
|
|
|
// --- Circuit Breaker ---
|
|
|
|
class CircuitBreaker {
|
|
private failures = 0;
|
|
private isOpen = false;
|
|
private backoffMs = 60000; // Start at 60s for Nominatim
|
|
private readonly maxBackoffMs = 300000; // 5 minutes
|
|
private readonly threshold = 5;
|
|
|
|
async checkAndWait(): Promise<boolean> {
|
|
if (!this.isOpen) return true;
|
|
|
|
log(`Circuit breaker open. Waiting ${Math.round(this.backoffMs / 1000)}s before retry...`);
|
|
await sleep(this.backoffMs);
|
|
|
|
// Try a test request
|
|
try {
|
|
const resp = await axios.get(NOMINATIM_URL, {
|
|
params: { lat: 48.8566, lon: 2.3522, format: 'json' },
|
|
headers: { 'User-Agent': 'NearestMass/1.0 (privacy@nearestmass.com)' },
|
|
timeout: 10000,
|
|
});
|
|
if (resp.status === 200) {
|
|
this.reset();
|
|
log('Circuit breaker closed: Nominatim is back');
|
|
return true;
|
|
}
|
|
} catch {
|
|
// Still down
|
|
}
|
|
|
|
this.backoffMs = Math.min(this.backoffMs * 2, this.maxBackoffMs);
|
|
return false;
|
|
}
|
|
|
|
recordFailure() {
|
|
this.failures++;
|
|
if (this.failures >= this.threshold && !this.isOpen) {
|
|
this.isOpen = true;
|
|
this.backoffMs = 60000;
|
|
log(`Circuit breaker OPEN after ${this.failures} consecutive failures`);
|
|
}
|
|
}
|
|
|
|
reset() {
|
|
if (this.failures > 0 || this.isOpen) {
|
|
this.failures = 0;
|
|
this.isOpen = false;
|
|
this.backoffMs = 60000;
|
|
}
|
|
}
|
|
|
|
get opened() { return this.isOpen; }
|
|
}
|
|
|
|
// --- Helpers ---
|
|
|
|
let shuttingDown = false;
|
|
|
|
function log(msg: string) {
|
|
console.log(`[${new Date().toISOString()}] ${msg}`);
|
|
}
|
|
|
|
function logError(msg: string) {
|
|
console.error(`[${new Date().toISOString()}] ${msg}`);
|
|
}
|
|
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise(resolve => {
|
|
const timer = setTimeout(resolve, ms);
|
|
const check = setInterval(() => {
|
|
if (shuttingDown) {
|
|
clearTimeout(timer);
|
|
clearInterval(check);
|
|
resolve();
|
|
}
|
|
}, 1000);
|
|
setTimeout(() => clearInterval(check), ms + 100);
|
|
});
|
|
}
|
|
|
|
// --- Nominatim API ---
|
|
|
|
async function reverseGeocode(lat: number, lng: number): Promise<NominatimResponse> {
|
|
const response = await axios.get(NOMINATIM_URL, {
|
|
params: {
|
|
lat,
|
|
lon: lng,
|
|
format: 'json',
|
|
zoom: 16,
|
|
addressdetails: 1,
|
|
},
|
|
headers: {
|
|
'User-Agent': 'NearestMass/1.0 (privacy@nearestmass.com)',
|
|
'Accept-Language': 'en',
|
|
},
|
|
timeout: 15000,
|
|
});
|
|
return response.data;
|
|
}
|
|
|
|
function extractCity(address: NominatimAddress): string | null {
|
|
return address.city || address.town || address.village ||
|
|
address.municipality || address.hamlet || null;
|
|
}
|
|
|
|
function extractState(address: NominatimAddress): string | null {
|
|
return address.state || address.province || null;
|
|
}
|
|
|
|
function extractAddress(address: NominatimAddress): string | null {
|
|
const parts: string[] = [];
|
|
if (address.house_number) parts.push(address.house_number);
|
|
if (address.road) parts.push(address.road);
|
|
if (parts.length === 0) return null;
|
|
return parts.join(' ');
|
|
}
|
|
|
|
// --- Database Queries ---
|
|
|
|
async function getNextBatch(
|
|
batchSize: number,
|
|
countryCode?: string,
|
|
): Promise<ChurchRecord[]> {
|
|
return prisma.church.findMany({
|
|
where: {
|
|
city: null,
|
|
latitude: { not: undefined },
|
|
longitude: { not: undefined },
|
|
reverseGeocodedAt: null,
|
|
...(countryCode ? { country: countryCode } : {}),
|
|
},
|
|
select: {
|
|
id: true, name: true, address: true, city: true, state: true, zip: true,
|
|
country: true, latitude: true, longitude: true,
|
|
},
|
|
take: batchSize,
|
|
orderBy: [
|
|
{ country: 'asc' },
|
|
{ createdAt: 'asc' },
|
|
],
|
|
});
|
|
}
|
|
|
|
async function getTotalRemaining(countryCode?: string): Promise<number> {
|
|
return prisma.church.count({
|
|
where: {
|
|
city: null,
|
|
latitude: { not: undefined },
|
|
longitude: { not: undefined },
|
|
reverseGeocodedAt: null,
|
|
...(countryCode ? { country: countryCode } : {}),
|
|
},
|
|
});
|
|
}
|
|
|
|
// --- Main Processing ---
|
|
|
|
async function processChurch(
|
|
church: ChurchRecord,
|
|
stats: EnrichmentStats,
|
|
dryRun: boolean,
|
|
): Promise<void> {
|
|
const label = `${church.name} (${church.country})`;
|
|
|
|
try {
|
|
const result = await reverseGeocode(church.latitude, church.longitude);
|
|
|
|
if (result.error || !result.address) {
|
|
log(` - [${stats.processed}] ${label} => no address data`);
|
|
stats.noCity++;
|
|
if (!dryRun) {
|
|
await prisma.church.update({
|
|
where: { id: church.id },
|
|
data: { reverseGeocodedAt: new Date() },
|
|
});
|
|
}
|
|
return;
|
|
}
|
|
|
|
const address = extractAddress(result.address);
|
|
const city = extractCity(result.address);
|
|
const state = extractState(result.address);
|
|
const zip = result.address.postcode || null;
|
|
|
|
if (city) {
|
|
const addrStr = address ? `${address}, ` : '';
|
|
log(` + [${stats.processed}] ${label} => ${addrStr}${city}, ${state || '?'}`);
|
|
stats.enriched++;
|
|
} else {
|
|
log(` - [${stats.processed}] ${label} => no city in response`);
|
|
stats.noCity++;
|
|
}
|
|
|
|
if (!dryRun) {
|
|
const updateData: Record<string, unknown> = {
|
|
reverseGeocodedAt: new Date(),
|
|
};
|
|
// Only update fields that are currently null
|
|
if (address && !church.address) updateData.address = address;
|
|
if (city && !church.city) updateData.city = city;
|
|
if (state && !church.state) updateData.state = state;
|
|
if (zip && !church.zip) updateData.zip = zip;
|
|
// Update country if currently unknown (XX) and Nominatim returned one
|
|
const countryCodeResult = result.address.country_code?.toUpperCase();
|
|
if (church.country === 'XX' && countryCodeResult && countryCodeResult !== 'XX') {
|
|
updateData.country = countryCodeResult;
|
|
}
|
|
|
|
await prisma.church.update({
|
|
where: { id: church.id },
|
|
data: updateData,
|
|
});
|
|
}
|
|
} catch (error: any) {
|
|
stats.errors++;
|
|
|
|
// Handle rate limiting (429)
|
|
if (error.response?.status === 429) {
|
|
logError(` ! [${stats.processed}] ${label} => rate limited (429), backing off...`);
|
|
await sleep(5000); // Extra 5s backoff
|
|
throw error;
|
|
}
|
|
|
|
// Handle server errors (5xx)
|
|
if (error.response?.status >= 500) {
|
|
logError(` ! [${stats.processed}] ${label} => server error (${error.response.status})`);
|
|
throw error;
|
|
}
|
|
|
|
logError(` ! [${stats.processed}] ${label} => ${error.message}`);
|
|
// Don't throw for non-retriable errors (just mark as attempted)
|
|
if (!dryRun) {
|
|
await prisma.church.update({
|
|
where: { id: church.id },
|
|
data: { reverseGeocodedAt: new Date() },
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
async function runSinglePass(
|
|
stats: EnrichmentStats,
|
|
countryCode?: string,
|
|
limit?: number,
|
|
dryRun: boolean = false,
|
|
jobId?: string | null,
|
|
): Promise<void> {
|
|
let totalProcessed = 0;
|
|
const circuitBreaker = new CircuitBreaker();
|
|
|
|
while (!shuttingDown) {
|
|
if (limit && totalProcessed >= limit) break;
|
|
|
|
// Circuit breaker check
|
|
if (circuitBreaker.opened) {
|
|
const ok = await circuitBreaker.checkAndWait();
|
|
if (!ok) continue;
|
|
}
|
|
|
|
const batchLimit = limit
|
|
? Math.min(BATCH_SIZE, limit - totalProcessed)
|
|
: BATCH_SIZE;
|
|
|
|
const churches = await getNextBatch(batchLimit, countryCode);
|
|
if (churches.length === 0) break;
|
|
|
|
for (const church of churches) {
|
|
if (shuttingDown) break;
|
|
if (limit && totalProcessed >= limit) break;
|
|
|
|
stats.processed++;
|
|
totalProcessed++;
|
|
|
|
try {
|
|
await processChurch(church, stats, dryRun);
|
|
circuitBreaker.reset();
|
|
} catch (error: any) {
|
|
circuitBreaker.recordFailure();
|
|
// Already logged in processChurch
|
|
}
|
|
|
|
// Rate limit: 1 request per second
|
|
if (!shuttingDown) {
|
|
await sleep(RATE_LIMIT_MS);
|
|
}
|
|
|
|
// Job tracking: update progress every PROGRESS_INTERVAL items
|
|
if (jobId && stats.processed % PROGRESS_INTERVAL === 0) {
|
|
await updateJobProgress(jobId, stats, 0);
|
|
const stopping = await checkJobStopping(jobId);
|
|
if (stopping) {
|
|
log('Job stop requested via admin dashboard.');
|
|
shuttingDown = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Progress logging
|
|
if (stats.processed % 100 === 0) {
|
|
const elapsed = (Date.now() - stats.startTime) / 1000;
|
|
const rate = Math.round((stats.processed / elapsed) * 3600);
|
|
const enrichRate = stats.processed > 0
|
|
? ((stats.enriched / stats.processed) * 100).toFixed(1)
|
|
: '0.0';
|
|
log(`Progress: ${stats.processed} processed, ${stats.enriched} enriched, ${stats.noCity} no-city, ${stats.errors} errors`);
|
|
log(` Enrich rate: ${enrichRate}%, Rate: ~${rate}/hour`);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async function runContinuous(
|
|
stats: EnrichmentStats,
|
|
countryCode?: string,
|
|
jobId?: string | null,
|
|
): Promise<void> {
|
|
log('Running in continuous mode. Press Ctrl+C to stop.');
|
|
const circuitBreaker = new CircuitBreaker();
|
|
|
|
while (!shuttingDown) {
|
|
stats.cycles++;
|
|
log(`--- Cycle ${stats.cycles} ---`);
|
|
let processedInCycle = 0;
|
|
|
|
while (!shuttingDown) {
|
|
// Circuit breaker check
|
|
if (circuitBreaker.opened) {
|
|
const ok = await circuitBreaker.checkAndWait();
|
|
if (!ok) continue;
|
|
}
|
|
|
|
const churches = await getNextBatch(BATCH_SIZE, countryCode);
|
|
if (churches.length === 0) break;
|
|
|
|
for (const church of churches) {
|
|
if (shuttingDown) break;
|
|
|
|
stats.processed++;
|
|
processedInCycle++;
|
|
|
|
try {
|
|
await processChurch(church, stats, false);
|
|
circuitBreaker.reset();
|
|
} catch {
|
|
circuitBreaker.recordFailure();
|
|
}
|
|
|
|
// Rate limit
|
|
if (!shuttingDown) {
|
|
await sleep(RATE_LIMIT_MS);
|
|
}
|
|
|
|
// Job tracking
|
|
if (jobId && stats.processed % PROGRESS_INTERVAL === 0) {
|
|
await updateJobProgress(jobId, stats, 0);
|
|
const stopping = await checkJobStopping(jobId);
|
|
if (stopping) {
|
|
log('Job stop requested via admin dashboard.');
|
|
shuttingDown = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Progress logging
|
|
if (stats.processed % 100 === 0) {
|
|
const elapsed = (Date.now() - stats.startTime) / 1000;
|
|
const rate = Math.round((stats.processed / elapsed) * 3600);
|
|
log(`Progress: ${stats.processed} processed, ${stats.enriched} enriched, ${stats.noCity} no-city, ${stats.errors} errors (~${rate}/hour)`);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (shuttingDown) break;
|
|
|
|
if (processedInCycle === 0) {
|
|
log('No churches needing reverse geocoding. Waiting 1 hour...');
|
|
for (let i = 0; i < 360 && !shuttingDown; i++) {
|
|
await sleep(10000);
|
|
}
|
|
} else {
|
|
log(`Cycle ${stats.cycles} complete. ${processedInCycle} churches processed. Brief pause...`);
|
|
await sleep(10000);
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Main ---
|
|
|
|
async function main() {
|
|
const args = process.argv.slice(2);
|
|
const countryIndex = args.indexOf('--country');
|
|
const limitIndex = args.indexOf('--limit');
|
|
const dryRun = args.includes('--dry-run');
|
|
const continuous = args.includes('--continuous');
|
|
|
|
const countryCode = countryIndex !== -1 ? args[countryIndex + 1] : undefined;
|
|
const limit = limitIndex !== -1 ? parseInt(args[limitIndex + 1]) : undefined;
|
|
|
|
// Graceful shutdown
|
|
process.on('SIGTERM', () => {
|
|
log('Received SIGTERM, finishing current request...');
|
|
shuttingDown = true;
|
|
});
|
|
process.on('SIGINT', () => {
|
|
log('Received SIGINT, finishing current request...');
|
|
shuttingDown = true;
|
|
});
|
|
|
|
log('============================================================');
|
|
log('Nominatim Reverse Geocode Enrichment');
|
|
log('============================================================');
|
|
log(`Mode: ${continuous ? 'Continuous' : 'Single pass'}`);
|
|
log(`Country: ${countryCode || 'All'}`);
|
|
log(`Limit: ${limit || 'No limit'}`);
|
|
log(`Dry run: ${dryRun ? 'Yes' : 'No'}`);
|
|
log(`Rate limit: ${RATE_LIMIT_MS}ms between requests`);
|
|
log('============================================================');
|
|
|
|
// Count remaining
|
|
const remaining = await getTotalRemaining(countryCode);
|
|
log(`Churches needing reverse geocoding: ${remaining}`);
|
|
const estimatedHours = (remaining * RATE_LIMIT_MS / 1000 / 3600).toFixed(1);
|
|
log(`Estimated time: ~${estimatedHours} hours @ 1 req/sec`);
|
|
|
|
if (remaining === 0) {
|
|
log('Nothing to do!');
|
|
await prisma.$disconnect();
|
|
await pool.end();
|
|
return;
|
|
}
|
|
|
|
// Job tracking
|
|
let jobId = await createOrResumeJob(args);
|
|
if (!jobId) {
|
|
jobId = await createNewJob({ countryCode, limit, continuous, dryRun });
|
|
}
|
|
log(`Job ID: ${jobId}`);
|
|
|
|
const stats: EnrichmentStats = {
|
|
processed: 0,
|
|
enriched: 0,
|
|
noCity: 0,
|
|
errors: 0,
|
|
skippedExisting: 0,
|
|
cycles: 0,
|
|
startTime: Date.now(),
|
|
};
|
|
|
|
if (continuous) {
|
|
await runContinuous(stats, countryCode, jobId);
|
|
} else {
|
|
await runSinglePass(stats, countryCode, limit, dryRun, jobId);
|
|
}
|
|
|
|
// Complete job
|
|
if (jobId) {
|
|
await updateJobProgress(jobId, stats, 0);
|
|
await completeJob(jobId);
|
|
}
|
|
|
|
// Print summary
|
|
const elapsed = ((Date.now() - stats.startTime) / 1000).toFixed(1);
|
|
const enrichRate = stats.processed > 0
|
|
? ((stats.enriched / stats.processed) * 100).toFixed(1)
|
|
: '0.0';
|
|
|
|
log('');
|
|
log('============================================================');
|
|
log('Reverse Geocode Enrichment Summary');
|
|
log('============================================================');
|
|
log(`Churches processed: ${stats.processed}`);
|
|
log(`Cities found: ${stats.enriched}`);
|
|
log(`No city in response: ${stats.noCity}`);
|
|
log(`Errors: ${stats.errors}`);
|
|
log(`Enrich rate: ${enrichRate}%`);
|
|
log(`Elapsed: ${elapsed}s`);
|
|
if (stats.cycles > 0) {
|
|
log(`Cycles completed: ${stats.cycles}`);
|
|
}
|
|
log('============================================================');
|
|
|
|
await prisma.$disconnect();
|
|
await pool.end();
|
|
}
|
|
|
|
main().catch((error) => {
|
|
logError(`Fatal error: ${error.message}`);
|
|
process.exit(1);
|
|
});
|