Files
ScraperControl/docs/plans/2026-02-25-parallel-scrapers.md
Albert 2c51513851 chore: sync with Gitea master and restore local-only files
Reset local main to gitea/master (new source of truth) and restored
local-only files: web scrapers, admin dashboard, ChromaDB integration,
debug scripts, and utility libraries that aren't tracked in Gitea.

Gitea master adds: discovermass, buscarmisas-network, hk-parishes,
bohosluzby, kerknet, gottesdienstzeiten, miserend importers,
ClaimRequest model, forward geocoding, heartbeat healthcheck.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-12 19:11:22 -04:00

424 lines
14 KiB
Markdown

# Parallel Scrapers Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Run language scrapers in parallel groups of 3, add missing country mappings, and deprioritize the generic scraper.
**Architecture:** Replace sequential pipeline phases with grouped phases. Groups run their jobs concurrently (max 3), then wait for all to complete before advancing. Import phases stay sequential. The scheduler tracks a `groupJobsRemaining` counter per group instead of advancing on every job completion.
**Tech Stack:** TypeScript, node child_process spawn, Prisma, Docker Compose
---
### Task 1: Add Missing Country Mappings
**Files:**
- Modify: `src/lib/scraper-service.ts:29-45`
**Step 1: Update COUNTRY_SCRAPER_MAP**
Add these entries to the existing `COUNTRY_SCRAPER_MAP` object at `src/lib/scraper-service.ts:29`:
```typescript
const COUNTRY_SCRAPER_MAP: Record<string, string> = {
US: 'english', CA: 'english', GB: 'english',
AU: 'english', NZ: 'english', IE: 'english', PH: 'english',
IN: 'english', SG: 'english', MY: 'english', KE: 'english',
JM: 'english', TT: 'english', GH: 'english', NG: 'english',
ZA: 'english', TZ: 'english', UG: 'english',
FR: 'french', BE: 'french', LU: 'french',
ES: 'spanish', MX: 'spanish', AR: 'spanish', CO: 'spanish',
CL: 'spanish', PE: 'spanish', EC: 'spanish', VE: 'spanish',
CR: 'spanish', PA: 'spanish', GT: 'spanish', CU: 'spanish',
HN: 'spanish', SV: 'spanish', NI: 'spanish', BO: 'spanish',
PY: 'spanish', UY: 'spanish', DO: 'spanish',
IT: 'italian', SM: 'italian', VA: 'italian',
HR: 'italian', RO: 'italian',
DE: 'german', AT: 'german', LI: 'german',
CH: 'german', SI: 'german',
PL: 'polish',
PT: 'portuguese', BR: 'portuguese',
NL: 'dutch',
CZ: 'czech', SK: 'czech',
HU: 'hungarian',
};
```
Also update `buildLanguageFilter` at `src/lib/scraper-service.ts:346-463` to include the new countries in each language filter's country list:
- `english` filter (line 356): add `'IN', 'SG', 'MY', 'KE', 'JM', 'TT', 'GH', 'NG', 'ZA', 'TZ', 'UG'`
- `french` filter (line 366): add `'BE', 'LU'``{ in: ['FR', 'BE', 'LU'] }`
- `spanish` filter: already has all needed countries
- `italian` filter (line 387): add `'HR', 'RO'``{ in: ['IT', 'SM', 'VA', 'HR', 'RO'] }`
- `german` filter (line 397): add `'CH', 'SI'``{ in: ['DE', 'AT', 'LI', 'CH', 'SI'] }`
**Step 2: Verify build**
Run: `npm run build`
Expected: Build succeeds with no errors
**Step 3: Commit**
```bash
git add src/lib/scraper-service.ts
git commit -m "feat: add missing country mappings to language scrapers
Add BE/LU→french, CH/SI→german, HR/RO→italian, IN/SG/MY/KE/JM/TT/GH/NG/ZA/TZ/UG→english.
~1,400 previously unmapped churches now routed to proper language scrapers."
```
---
### Task 2: Rewrite Scheduler for Parallel Groups
**Files:**
- Modify: `scripts/scheduler.ts`
**Step 1: Replace pipeline data structure**
Replace the `PipelinePhase` interface, `PIPELINE_PHASES` array (lines 27-49), and `CycleState` interface (lines 53-69) with:
```typescript
interface PipelinePhase {
name: string;
type: string;
language?: string;
config: Record<string, unknown>;
}
interface PipelineGroup {
name: string;
phases: PipelinePhase[];
mode: 'sequential' | 'parallel';
}
const PIPELINE_GROUPS: PipelineGroup[] = [
{
name: 'imports',
mode: 'sequential',
phases: [
{ name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } },
{ name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } },
],
},
{
name: 'scrapers-batch-1',
mode: 'parallel',
phases: [
{ name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } },
{ name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } },
{ name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } },
],
},
{
name: 'scrapers-batch-2',
mode: 'parallel',
phases: [
{ name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } },
{ name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } },
{ name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } },
],
},
{
name: 'scrapers-batch-3',
mode: 'parallel',
phases: [
{ name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } },
{ name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } },
{ name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } },
],
},
{
name: 'scrapers-batch-4',
mode: 'parallel',
phases: [
{ name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } },
{ name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } },
],
},
];
```
**Step 2: Replace CycleState**
```typescript
interface CycleState {
currentGroupIndex: number;
currentSequentialPhaseIndex: number; // for sequential groups, tracks which phase within the group
cycleNumber: number;
cycleStartedAt: Date | null;
lastCycleCompletedAt: Date | null;
waitingForCooldown: boolean;
activeGroupJobs: number; // how many jobs still running in the current group
}
const cycleState: CycleState = {
currentGroupIndex: 0,
currentSequentialPhaseIndex: 0,
cycleNumber: 0,
cycleStartedAt: null,
lastCycleCompletedAt: null,
waitingForCooldown: false,
activeGroupJobs: 0,
};
```
**Step 3: Rewrite pollAndAdvancePipeline**
Replace the entire `pollAndAdvancePipeline` function (lines 306-385) and `advancePipelinePhase` function (lines 387-390) with:
```typescript
async function pollAndAdvancePipeline(): Promise<void> {
try {
// 1. Check for manual pending jobs from admin API (priority over pipeline)
if (runningJobs.size === 0) {
const manualJob = await prisma.backgroundJob.findFirst({
where: {
status: 'pending',
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
orderBy: { createdAt: 'asc' },
});
if (manualJob) {
log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`);
await startJobProcess(
manualJob.id,
manualJob.type,
manualJob.language,
manualJob.config as Record<string, unknown> | null
);
return;
}
}
// 2. If jobs are still running for the current group, wait
if (cycleState.activeGroupJobs > 0) {
return;
}
// 3. If in cooldown, check if expired
if (cycleState.waitingForCooldown) {
if (cycleState.lastCycleCompletedAt) {
const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime();
if (elapsed < CYCLE_COOLDOWN_MS) {
const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000);
if (remaining % 30 === 0 || remaining <= 5) {
log(`Cooldown: ${remaining} minutes remaining before next cycle`);
}
return;
}
}
cycleState.waitingForCooldown = false;
cycleState.currentGroupIndex = 0;
cycleState.currentSequentialPhaseIndex = 0;
log('Cooldown expired, starting new cycle');
}
// 4. If past the last group, complete the cycle
if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) {
cycleState.cycleNumber++;
cycleState.lastCycleCompletedAt = new Date();
cycleState.waitingForCooldown = true;
const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000);
log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`);
return;
}
// 5. Start the current group
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) {
cycleState.cycleStartedAt = new Date();
log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`);
}
if (group.mode === 'parallel') {
// Launch all phases in the group concurrently
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`);
cycleState.activeGroupJobs = group.phases.length;
for (const phase of group.phases) {
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
}
} else {
// Sequential: run one phase at a time within the group
const phaseIndex = cycleState.currentSequentialPhaseIndex;
if (phaseIndex >= group.phases.length) {
// All phases in this sequential group are done
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
return; // Will pick up next group on next poll
}
const phase = group.phases[phaseIndex];
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`);
cycleState.activeGroupJobs = 1;
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
}
} catch (err) {
logError(`Error in pipeline: ${err}`);
}
}
function onJobCompleted(): void {
cycleState.activeGroupJobs--;
if (cycleState.activeGroupJobs <= 0) {
cycleState.activeGroupJobs = 0;
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (group?.mode === 'sequential') {
cycleState.currentSequentialPhaseIndex++;
// Check if there are more phases in this sequential group
if (cycleState.currentSequentialPhaseIndex < group.phases.length) {
return; // Don't advance group yet
}
}
// Advance to next group
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`);
}
}
```
**Step 4: Update startJobProcess callbacks**
In the `child.on('close')` callback (line 442) and `child.on('error')` callback (line 472), replace `advancePipelinePhase()` with `onJobCompleted()`.
**Step 5: Update crash recovery**
In `recoverFromCrash` (lines 259-268), replace the `PIPELINE_PHASES.findIndex` logic with a search through `PIPELINE_GROUPS`:
```typescript
if (lastRunningPipelineJob) {
for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) {
const group = PIPELINE_GROUPS[gi];
const phaseIdx = group.phases.findIndex(
p => p.type === lastRunningPipelineJob.type &&
(p.language || null) === (lastRunningPipelineJob.language || null)
);
if (phaseIdx >= 0) {
cycleState.currentGroupIndex = gi;
cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0;
log(`Resuming pipeline from group ${gi + 1}: ${group.name}`);
break;
}
}
}
```
**Step 6: Update heartbeat log in main()**
Replace the heartbeat cron (lines 551-562) and the startup log (lines 574-580) to reference groups instead of phases:
```typescript
cron.schedule('0 * * * *', () => {
const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length
? PIPELINE_GROUPS[cycleState.currentGroupIndex].name
: 'none';
const jobs = runningJobs.size > 0
? `Running: ${[...runningJobs.keys()].join(', ')}`
: 'No jobs running';
const state = cycleState.waitingForCooldown
? 'cooldown'
: `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`;
log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`);
}, { timezone: 'UTC' });
```
For the startup log:
```typescript
log('=== Scheduler running (parallel grouped pipeline) ===');
log(`Pipeline groups (${PIPELINE_GROUPS.length}):`);
for (let i = 0; i < PIPELINE_GROUPS.length; i++) {
const g = PIPELINE_GROUPS[i];
const phaseNames = g.phases.map(p => p.name).join(', ');
log(` ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`);
}
```
**Step 7: Remove dead Google Places env log**
Delete lines 167-169 (the `GOOGLE_PLACES_API_KEY` log in `validateEnvironment`).
**Step 8: Verify build**
Run: `npm run build`
Expected: Build succeeds
**Step 9: Commit**
```bash
git add scripts/scheduler.ts
git commit -m "feat: parallel grouped pipeline scheduler
Replace sequential pipeline with grouped phases. Import phases run
sequentially, scraper phases run in parallel groups of 3. This reduces
cycle time from days to hours. Generic scraper moved to last group."
```
---
### Task 3: Increase Scheduler Memory Limit
**Files:**
- Modify: `docker-compose.yml:217-220`
**Step 1: Increase memory limit**
Change the scheduler service's `deploy.resources.limits.memory` from `4G` to `10G`:
```yaml
deploy:
resources:
limits:
memory: 10G
```
**Step 2: Commit**
```bash
git add docker-compose.yml
git commit -m "chore: increase scheduler memory to 10G for parallel scrapers"
```
---
### Task 4: Deploy and Verify
**Step 1: Deploy to NAS**
```bash
rsync -avz --exclude 'node_modules' --exclude '.next' --exclude '.git' --exclude '.env.local' --exclude '*.log' \
/Users/albert/Documents/Projects/Church/ScraperControl/ albert@192.168.0.145:/volume1/docker/scraper-control/
```
**Step 2: Rebuild and restart scheduler**
```bash
ssh albert@192.168.0.145 'cd /volume1/docker/scraper-control && /usr/local/bin/docker compose build scheduler && /usr/local/bin/docker compose up -d scheduler'
```
**Step 3: Verify logs show parallel groups**
```bash
ssh albert@192.168.0.145 '/usr/local/bin/docker logs --tail 30 scraper-control-scheduler-1'
```
Expected: Logs show "parallel grouped pipeline", group listings with `[parallel]` and `[sequential]` tags, and eventually multiple concurrent `Running:` entries in heartbeat.