Reset local main to gitea/master (new source of truth) and restored local-only files: web scrapers, admin dashboard, ChromaDB integration, debug scripts, and utility libraries that aren't tracked in Gitea. Gitea master adds: discovermass, buscarmisas-network, hk-parishes, bohosluzby, kerknet, gottesdienstzeiten, miserend importers, ClaimRequest model, forward geocoding, heartbeat healthcheck. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
14 KiB
Parallel Scrapers Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Run language scrapers in parallel groups of 3, add missing country mappings, and deprioritize the generic scraper.
Architecture: Replace sequential pipeline phases with grouped phases. Groups run their jobs concurrently (max 3), then wait for all to complete before advancing. Import phases stay sequential. The scheduler tracks a groupJobsRemaining counter per group instead of advancing on every job completion.
Tech Stack: TypeScript, node child_process spawn, Prisma, Docker Compose
Task 1: Add Missing Country Mappings
Files:
- Modify:
src/lib/scraper-service.ts:29-45
Step 1: Update COUNTRY_SCRAPER_MAP
Add these entries to the existing COUNTRY_SCRAPER_MAP object at src/lib/scraper-service.ts:29:
const COUNTRY_SCRAPER_MAP: Record<string, string> = {
US: 'english', CA: 'english', GB: 'english',
AU: 'english', NZ: 'english', IE: 'english', PH: 'english',
IN: 'english', SG: 'english', MY: 'english', KE: 'english',
JM: 'english', TT: 'english', GH: 'english', NG: 'english',
ZA: 'english', TZ: 'english', UG: 'english',
FR: 'french', BE: 'french', LU: 'french',
ES: 'spanish', MX: 'spanish', AR: 'spanish', CO: 'spanish',
CL: 'spanish', PE: 'spanish', EC: 'spanish', VE: 'spanish',
CR: 'spanish', PA: 'spanish', GT: 'spanish', CU: 'spanish',
HN: 'spanish', SV: 'spanish', NI: 'spanish', BO: 'spanish',
PY: 'spanish', UY: 'spanish', DO: 'spanish',
IT: 'italian', SM: 'italian', VA: 'italian',
HR: 'italian', RO: 'italian',
DE: 'german', AT: 'german', LI: 'german',
CH: 'german', SI: 'german',
PL: 'polish',
PT: 'portuguese', BR: 'portuguese',
NL: 'dutch',
CZ: 'czech', SK: 'czech',
HU: 'hungarian',
};
Also update buildLanguageFilter at src/lib/scraper-service.ts:346-463 to include the new countries in each language filter's country list:
englishfilter (line 356): add'IN', 'SG', 'MY', 'KE', 'JM', 'TT', 'GH', 'NG', 'ZA', 'TZ', 'UG'frenchfilter (line 366): add'BE', 'LU'→{ in: ['FR', 'BE', 'LU'] }spanishfilter: already has all needed countriesitalianfilter (line 387): add'HR', 'RO'→{ in: ['IT', 'SM', 'VA', 'HR', 'RO'] }germanfilter (line 397): add'CH', 'SI'→{ in: ['DE', 'AT', 'LI', 'CH', 'SI'] }
Step 2: Verify build
Run: npm run build
Expected: Build succeeds with no errors
Step 3: Commit
git add src/lib/scraper-service.ts
git commit -m "feat: add missing country mappings to language scrapers
Add BE/LU→french, CH/SI→german, HR/RO→italian, IN/SG/MY/KE/JM/TT/GH/NG/ZA/TZ/UG→english.
~1,400 previously unmapped churches now routed to proper language scrapers."
Task 2: Rewrite Scheduler for Parallel Groups
Files:
- Modify:
scripts/scheduler.ts
Step 1: Replace pipeline data structure
Replace the PipelinePhase interface, PIPELINE_PHASES array (lines 27-49), and CycleState interface (lines 53-69) with:
interface PipelinePhase {
name: string;
type: string;
language?: string;
config: Record<string, unknown>;
}
interface PipelineGroup {
name: string;
phases: PipelinePhase[];
mode: 'sequential' | 'parallel';
}
const PIPELINE_GROUPS: PipelineGroup[] = [
{
name: 'imports',
mode: 'sequential',
phases: [
{ name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } },
{ name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } },
],
},
{
name: 'scrapers-batch-1',
mode: 'parallel',
phases: [
{ name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } },
{ name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } },
{ name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } },
],
},
{
name: 'scrapers-batch-2',
mode: 'parallel',
phases: [
{ name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } },
{ name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } },
{ name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } },
],
},
{
name: 'scrapers-batch-3',
mode: 'parallel',
phases: [
{ name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } },
{ name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } },
{ name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } },
],
},
{
name: 'scrapers-batch-4',
mode: 'parallel',
phases: [
{ name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } },
{ name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } },
],
},
];
Step 2: Replace CycleState
interface CycleState {
currentGroupIndex: number;
currentSequentialPhaseIndex: number; // for sequential groups, tracks which phase within the group
cycleNumber: number;
cycleStartedAt: Date | null;
lastCycleCompletedAt: Date | null;
waitingForCooldown: boolean;
activeGroupJobs: number; // how many jobs still running in the current group
}
const cycleState: CycleState = {
currentGroupIndex: 0,
currentSequentialPhaseIndex: 0,
cycleNumber: 0,
cycleStartedAt: null,
lastCycleCompletedAt: null,
waitingForCooldown: false,
activeGroupJobs: 0,
};
Step 3: Rewrite pollAndAdvancePipeline
Replace the entire pollAndAdvancePipeline function (lines 306-385) and advancePipelinePhase function (lines 387-390) with:
async function pollAndAdvancePipeline(): Promise<void> {
try {
// 1. Check for manual pending jobs from admin API (priority over pipeline)
if (runningJobs.size === 0) {
const manualJob = await prisma.backgroundJob.findFirst({
where: {
status: 'pending',
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
orderBy: { createdAt: 'asc' },
});
if (manualJob) {
log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`);
await startJobProcess(
manualJob.id,
manualJob.type,
manualJob.language,
manualJob.config as Record<string, unknown> | null
);
return;
}
}
// 2. If jobs are still running for the current group, wait
if (cycleState.activeGroupJobs > 0) {
return;
}
// 3. If in cooldown, check if expired
if (cycleState.waitingForCooldown) {
if (cycleState.lastCycleCompletedAt) {
const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime();
if (elapsed < CYCLE_COOLDOWN_MS) {
const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000);
if (remaining % 30 === 0 || remaining <= 5) {
log(`Cooldown: ${remaining} minutes remaining before next cycle`);
}
return;
}
}
cycleState.waitingForCooldown = false;
cycleState.currentGroupIndex = 0;
cycleState.currentSequentialPhaseIndex = 0;
log('Cooldown expired, starting new cycle');
}
// 4. If past the last group, complete the cycle
if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) {
cycleState.cycleNumber++;
cycleState.lastCycleCompletedAt = new Date();
cycleState.waitingForCooldown = true;
const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000);
log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`);
return;
}
// 5. Start the current group
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) {
cycleState.cycleStartedAt = new Date();
log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`);
}
if (group.mode === 'parallel') {
// Launch all phases in the group concurrently
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`);
cycleState.activeGroupJobs = group.phases.length;
for (const phase of group.phases) {
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
}
} else {
// Sequential: run one phase at a time within the group
const phaseIndex = cycleState.currentSequentialPhaseIndex;
if (phaseIndex >= group.phases.length) {
// All phases in this sequential group are done
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
return; // Will pick up next group on next poll
}
const phase = group.phases[phaseIndex];
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`);
cycleState.activeGroupJobs = 1;
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
}
} catch (err) {
logError(`Error in pipeline: ${err}`);
}
}
function onJobCompleted(): void {
cycleState.activeGroupJobs--;
if (cycleState.activeGroupJobs <= 0) {
cycleState.activeGroupJobs = 0;
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (group?.mode === 'sequential') {
cycleState.currentSequentialPhaseIndex++;
// Check if there are more phases in this sequential group
if (cycleState.currentSequentialPhaseIndex < group.phases.length) {
return; // Don't advance group yet
}
}
// Advance to next group
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`);
}
}
Step 4: Update startJobProcess callbacks
In the child.on('close') callback (line 442) and child.on('error') callback (line 472), replace advancePipelinePhase() with onJobCompleted().
Step 5: Update crash recovery
In recoverFromCrash (lines 259-268), replace the PIPELINE_PHASES.findIndex logic with a search through PIPELINE_GROUPS:
if (lastRunningPipelineJob) {
for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) {
const group = PIPELINE_GROUPS[gi];
const phaseIdx = group.phases.findIndex(
p => p.type === lastRunningPipelineJob.type &&
(p.language || null) === (lastRunningPipelineJob.language || null)
);
if (phaseIdx >= 0) {
cycleState.currentGroupIndex = gi;
cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0;
log(`Resuming pipeline from group ${gi + 1}: ${group.name}`);
break;
}
}
}
Step 6: Update heartbeat log in main()
Replace the heartbeat cron (lines 551-562) and the startup log (lines 574-580) to reference groups instead of phases:
cron.schedule('0 * * * *', () => {
const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length
? PIPELINE_GROUPS[cycleState.currentGroupIndex].name
: 'none';
const jobs = runningJobs.size > 0
? `Running: ${[...runningJobs.keys()].join(', ')}`
: 'No jobs running';
const state = cycleState.waitingForCooldown
? 'cooldown'
: `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`;
log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`);
}, { timezone: 'UTC' });
For the startup log:
log('=== Scheduler running (parallel grouped pipeline) ===');
log(`Pipeline groups (${PIPELINE_GROUPS.length}):`);
for (let i = 0; i < PIPELINE_GROUPS.length; i++) {
const g = PIPELINE_GROUPS[i];
const phaseNames = g.phases.map(p => p.name).join(', ');
log(` ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`);
}
Step 7: Remove dead Google Places env log
Delete lines 167-169 (the GOOGLE_PLACES_API_KEY log in validateEnvironment).
Step 8: Verify build
Run: npm run build
Expected: Build succeeds
Step 9: Commit
git add scripts/scheduler.ts
git commit -m "feat: parallel grouped pipeline scheduler
Replace sequential pipeline with grouped phases. Import phases run
sequentially, scraper phases run in parallel groups of 3. This reduces
cycle time from days to hours. Generic scraper moved to last group."
Task 3: Increase Scheduler Memory Limit
Files:
- Modify:
docker-compose.yml:217-220
Step 1: Increase memory limit
Change the scheduler service's deploy.resources.limits.memory from 4G to 10G:
deploy:
resources:
limits:
memory: 10G
Step 2: Commit
git add docker-compose.yml
git commit -m "chore: increase scheduler memory to 10G for parallel scrapers"
Task 4: Deploy and Verify
Step 1: Deploy to NAS
rsync -avz --exclude 'node_modules' --exclude '.next' --exclude '.git' --exclude '.env.local' --exclude '*.log' \
/Users/albert/Documents/Projects/Church/ScraperControl/ albert@192.168.0.145:/volume1/docker/scraper-control/
Step 2: Rebuild and restart scheduler
ssh albert@192.168.0.145 'cd /volume1/docker/scraper-control && /usr/local/bin/docker compose build scheduler && /usr/local/bin/docker compose up -d scheduler'
Step 3: Verify logs show parallel groups
ssh albert@192.168.0.145 '/usr/local/bin/docker logs --tail 30 scraper-control-scheduler-1'
Expected: Logs show "parallel grouped pipeline", group listings with [parallel] and [sequential] tags, and eventually multiple concurrent Running: entries in heartbeat.