Compare commits

..

5 Commits

Author SHA1 Message Date
Albert
027ca59a01 feat: capture pastor name and phone from OrariMesse.it detail endpoint
Populate church.pastorName from detail.parroco and church.phone from
detail.telefono during Pass 2 schedule import. Only updates when fields
are present and non-empty.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 22:43:04 -04:00
Albert
9d0af3289a feat: throttle Neon transfer with smaller batches + 1s delay
Reduce BATCH_SIZE from 200 to 100 and add a 1-second pause between
batches to avoid overwhelming the Neon production database during
large incremental syncs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 11:09:02 -04:00
Albert
6d1c7eb3c5 feat: add forward-geocode to scheduler pipeline
- Wire enrich-with-forward-geocode.ts as scheduler job type
- Add geocode-enrichment pipeline group (500/cycle, post-imports)
- Harden transfer script: skip churches at (0,0) coordinates
- Rewrite dedup-mass-schedules.ts with raw SQL to avoid Prisma 7 stack overflow

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 10:37:08 -04:00
206b64b9b8 chore: sync schemas for deploy 2026-04-28 18:04:02 -04:00
Albert
4609fd97db chore: sync schemas for deploy 2026-04-12 23:49:08 -04:00
5 changed files with 74 additions and 41 deletions

View File

@@ -46,6 +46,7 @@ model Church {
gottesdienstzeitenId String? @unique @map("gottesdienstzeiten_id") gottesdienstzeitenId String? @unique @map("gottesdienstzeiten_id")
kerknetId String? @unique @map("kerknet_id") kerknetId String? @unique @map("kerknet_id")
buscarmisasNetworkId String? @unique @map("buscarmisas_network_id") buscarmisasNetworkId String? @unique @map("buscarmisas_network_id")
gcatholicId String? @unique @map("gcatholic_id")
claimed Boolean @default(false) claimed Boolean @default(false)
claimedAt DateTime? @map("claimed_at") claimedAt DateTime? @map("claimed_at")
lastScrapedAt DateTime? @map("last_scraped_at") lastScrapedAt DateTime? @map("last_scraped_at")
@@ -59,6 +60,7 @@ model Church {
googleSearchedAt DateTime? @map("google_searched_at") // When Google Places enrichment was attempted googleSearchedAt DateTime? @map("google_searched_at") // When Google Places enrichment was attempted
createdAt DateTime @default(now()) @map("created_at") createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at") updatedAt DateTime @updatedAt @map("updated_at")
parochiaSlug String? @map("parochia_slug")
dioceseId String? @map("diocese_id") dioceseId String? @map("diocese_id")
@@ -99,6 +101,7 @@ model Church {
@@index([gottesdienstzeitenId]) @@index([gottesdienstzeitenId])
@@index([kerknetId]) @@index([kerknetId])
@@index([buscarmisasNetworkId]) @@index([buscarmisasNetworkId])
@@index([gcatholicId])
@@index([dioceseId]) @@index([dioceseId])
@@index([claimedByUserId]) @@index([claimedByUserId])
@@map("churches") @@map("churches")

View File

@@ -3,12 +3,13 @@ import dotenv from 'dotenv';
import path from 'path'; import path from 'path';
dotenv.config({ path: path.resolve(process.cwd(), '.env.local') }); dotenv.config({ path: path.resolve(process.cwd(), '.env.local') });
import { Pool } from 'pg'; import { Pool } from 'pg';
import { PrismaPg } from '@prisma/adapter-pg';
import { PrismaClient } from '@prisma/client';
const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const adapter = new PrismaPg(pool);
const prisma = new PrismaClient({ adapter }); interface CountResult {
churches_with_dups: string;
duplicate_rows: string;
}
async function main() { async function main() {
const dryRun = !process.argv.includes('--execute'); const dryRun = !process.argv.includes('--execute');
@@ -17,44 +18,52 @@ async function main() {
console.log('DRY RUN - pass --execute to actually delete duplicates\n'); console.log('DRY RUN - pass --execute to actually delete duplicates\n');
} }
const churches = await prisma.church.findMany({ const client = await pool.connect();
where: { massSchedules: { some: { isActive: true } } },
include: { massSchedules: { where: { isActive: true }, orderBy: { createdAt: 'asc' } } },
});
let churchesFixed = 0; try {
let rowsDeleted = 0; const countResult = await client.query<CountResult>(`
WITH ranked AS (
SELECT church_id,
ROW_NUMBER() OVER (
PARTITION BY church_id, day_of_week, time, language
ORDER BY created_at ASC
) AS rn
FROM mass_schedules
WHERE is_active = true
)
SELECT COUNT(DISTINCT church_id) AS churches_with_dups,
COUNT(*) AS duplicate_rows
FROM ranked
WHERE rn > 1;
`);
for (const church of churches) { const { churches_with_dups, duplicate_rows } = countResult.rows[0];
const seen = new Map<string, string>(); console.log(`Churches with duplicate schedules: ${churches_with_dups}`);
const toDelete: string[] = []; console.log(`Duplicate rows to ${dryRun ? 'delete' : 'delete'}: ${duplicate_rows}\n`);
for (const m of church.massSchedules) { if (!dryRun && Number(duplicate_rows) > 0) {
const key = `${m.dayOfWeek}:${m.time}:${m.language}`; console.log('Deleting duplicates (keeping oldest by created_at)...');
if (seen.has(key)) {
toDelete.push(m.id); const deleteResult = await client.query(`
} else { WITH ranked AS (
seen.set(key, m.id); SELECT id,
ROW_NUMBER() OVER (
PARTITION BY church_id, day_of_week, time, language
ORDER BY created_at ASC
) AS rn
FROM mass_schedules
WHERE is_active = true
)
DELETE FROM mass_schedules
WHERE id IN (SELECT id FROM ranked WHERE rn > 1);
`);
console.log(`Deleted ${deleteResult.rowCount} duplicate mass schedule rows.`);
} }
} } finally {
client.release();
if (toDelete.length > 0) {
churchesFixed++;
rowsDeleted += toDelete.length;
if (!dryRun) {
await prisma.massSchedule.deleteMany({
where: { id: { in: toDelete } },
});
}
}
}
console.log(`Churches with duplicates: ${churchesFixed}`);
console.log(`Duplicate rows ${dryRun ? 'found' : 'deleted'}: ${rowsDeleted}`);
await prisma.$disconnect();
await pool.end(); await pool.end();
}
} }
main().catch((err) => { main().catch((err) => {

View File

@@ -491,10 +491,14 @@ async function processSchedulesForDiocese(
})), })),
}); });
// Mark church as scraped // Update church metadata from detail (pastor, phone) if available
const churchUpdateData: Record<string, unknown> = { lastScrapedAt: new Date() };
if (detail.parroco) churchUpdateData.pastorName = detail.parroco;
if (detail.telefono) churchUpdateData.phone = detail.telefono;
await tx.church.update({ await tx.church.update({
where: { id: dbId }, where: { id: dbId },
data: { lastScrapedAt: new Date() }, data: churchUpdateData,
}); });
}); });

View File

@@ -59,6 +59,13 @@ const PIPELINE_GROUPS: PipelineGroup[] = [
{ name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} }, { name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} },
], ],
}, },
{
name: 'geocode-enrichment',
mode: 'sequential',
phases: [
{ name: 'forward-geocode', type: 'forward-geocode-enrichment', config: { limit: 500 } },
],
},
{ {
name: 'scrapers-batch-1', name: 'scrapers-batch-1',
mode: 'parallel', mode: 'parallel',
@@ -138,6 +145,12 @@ function getJobCommand(type: string, language?: string | null, config?: Record<s
if (config?.country) args.push('--country', String(config.country)); if (config?.country) args.push('--country', String(config.country));
return { command: 'npx', args }; return { command: 'npx', args };
} }
case 'forward-geocode-enrichment': {
const args = ['tsx', 'scripts/enrich-with-forward-geocode.ts'];
if (limit) args.push('--limit', String(limit));
if (config?.country) args.push('--country', String(config.country));
return { command: 'npx', args };
}
case 'match-search-results': { case 'match-search-results': {
const args = ['tsx', 'scripts/match-search-results.ts']; const args = ['tsx', 'scripts/match-search-results.ts'];
if (limit) args.push('--limit', String(limit)); if (limit) args.push('--limit', String(limit));

View File

@@ -103,7 +103,8 @@ async function main() {
{ phone: { not: null } }, { phone: { not: null } },
{ googlePlaceId: { not: null } }, { googlePlaceId: { not: null } },
{ massSchedules: { some: {} } }, { massSchedules: { some: {} } },
] ],
NOT: { latitude: 0, longitude: 0 },
}; };
// Add incremental filter if applicable // Add incremental filter if applicable
@@ -112,7 +113,7 @@ async function main() {
console.log(`🔄 Incremental filter: updatedAt > ${transferSince.toISOString()}\n`); console.log(`🔄 Incremental filter: updatedAt > ${transferSince.toISOString()}\n`);
} }
const BATCH_SIZE = 200; const BATCH_SIZE = 100;
const totalCount = await nasPrisma.church.count({ where: whereClause }); const totalCount = await nasPrisma.church.count({ where: whereClause });
console.log(`Found ${totalCount} enriched churches (will process in batches of ${BATCH_SIZE})\n`); console.log(`Found ${totalCount} enriched churches (will process in batches of ${BATCH_SIZE})\n`);
@@ -269,6 +270,9 @@ async function main() {
console.error(`Error transferring ${church.name}:`, error instanceof Error ? error.message : error); console.error(`Error transferring ${church.name}:`, error instanceof Error ? error.message : error);
} }
} }
// Brief pause between batches to avoid overwhelming Neon
await new Promise(resolve => setTimeout(resolve, 1000));
} // end batch loop } // end batch loop
console.log('\n════════════════════════════════════════════════════════════'); console.log('\n════════════════════════════════════════════════════════════');