Files
ScraperControl/docs/plans/2026-02-25-parallel-scrapers.md
Albert 2c51513851 chore: sync with Gitea master and restore local-only files
Reset local main to gitea/master (new source of truth) and restored
local-only files: web scrapers, admin dashboard, ChromaDB integration,
debug scripts, and utility libraries that aren't tracked in Gitea.

Gitea master adds: discovermass, buscarmisas-network, hk-parishes,
bohosluzby, kerknet, gottesdienstzeiten, miserend importers,
ClaimRequest model, forward geocoding, heartbeat healthcheck.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-12 19:11:22 -04:00

14 KiB

Parallel Scrapers Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Run language scrapers in parallel groups of 3, add missing country mappings, and deprioritize the generic scraper.

Architecture: Replace sequential pipeline phases with grouped phases. Groups run their jobs concurrently (max 3), then wait for all to complete before advancing. Import phases stay sequential. The scheduler tracks a groupJobsRemaining counter per group instead of advancing on every job completion.

Tech Stack: TypeScript, node child_process spawn, Prisma, Docker Compose


Task 1: Add Missing Country Mappings

Files:

  • Modify: src/lib/scraper-service.ts:29-45

Step 1: Update COUNTRY_SCRAPER_MAP

Add these entries to the existing COUNTRY_SCRAPER_MAP object at src/lib/scraper-service.ts:29:

const COUNTRY_SCRAPER_MAP: Record<string, string> = {
  US: 'english', CA: 'english', GB: 'english',
  AU: 'english', NZ: 'english', IE: 'english', PH: 'english',
  IN: 'english', SG: 'english', MY: 'english', KE: 'english',
  JM: 'english', TT: 'english', GH: 'english', NG: 'english',
  ZA: 'english', TZ: 'english', UG: 'english',
  FR: 'french', BE: 'french', LU: 'french',
  ES: 'spanish', MX: 'spanish', AR: 'spanish', CO: 'spanish',
  CL: 'spanish', PE: 'spanish', EC: 'spanish', VE: 'spanish',
  CR: 'spanish', PA: 'spanish', GT: 'spanish', CU: 'spanish',
  HN: 'spanish', SV: 'spanish', NI: 'spanish', BO: 'spanish',
  PY: 'spanish', UY: 'spanish', DO: 'spanish',
  IT: 'italian', SM: 'italian', VA: 'italian',
  HR: 'italian', RO: 'italian',
  DE: 'german', AT: 'german', LI: 'german',
  CH: 'german', SI: 'german',
  PL: 'polish',
  PT: 'portuguese', BR: 'portuguese',
  NL: 'dutch',
  CZ: 'czech', SK: 'czech',
  HU: 'hungarian',
};

Also update buildLanguageFilter at src/lib/scraper-service.ts:346-463 to include the new countries in each language filter's country list:

  • english filter (line 356): add 'IN', 'SG', 'MY', 'KE', 'JM', 'TT', 'GH', 'NG', 'ZA', 'TZ', 'UG'
  • french filter (line 366): add 'BE', 'LU'{ in: ['FR', 'BE', 'LU'] }
  • spanish filter: already has all needed countries
  • italian filter (line 387): add 'HR', 'RO'{ in: ['IT', 'SM', 'VA', 'HR', 'RO'] }
  • german filter (line 397): add 'CH', 'SI'{ in: ['DE', 'AT', 'LI', 'CH', 'SI'] }

Step 2: Verify build

Run: npm run build Expected: Build succeeds with no errors

Step 3: Commit

git add src/lib/scraper-service.ts
git commit -m "feat: add missing country mappings to language scrapers

Add BE/LU→french, CH/SI→german, HR/RO→italian, IN/SG/MY/KE/JM/TT/GH/NG/ZA/TZ/UG→english.
~1,400 previously unmapped churches now routed to proper language scrapers."

Task 2: Rewrite Scheduler for Parallel Groups

Files:

  • Modify: scripts/scheduler.ts

Step 1: Replace pipeline data structure

Replace the PipelinePhase interface, PIPELINE_PHASES array (lines 27-49), and CycleState interface (lines 53-69) with:

interface PipelinePhase {
  name: string;
  type: string;
  language?: string;
  config: Record<string, unknown>;
}

interface PipelineGroup {
  name: string;
  phases: PipelinePhase[];
  mode: 'sequential' | 'parallel';
}

const PIPELINE_GROUPS: PipelineGroup[] = [
  {
    name: 'imports',
    mode: 'sequential',
    phases: [
      { name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } },
      { name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } },
    ],
  },
  {
    name: 'scrapers-batch-1',
    mode: 'parallel',
    phases: [
      { name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } },
      { name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } },
      { name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } },
    ],
  },
  {
    name: 'scrapers-batch-2',
    mode: 'parallel',
    phases: [
      { name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } },
      { name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } },
      { name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } },
    ],
  },
  {
    name: 'scrapers-batch-3',
    mode: 'parallel',
    phases: [
      { name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } },
      { name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } },
      { name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } },
    ],
  },
  {
    name: 'scrapers-batch-4',
    mode: 'parallel',
    phases: [
      { name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } },
      { name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } },
    ],
  },
];

Step 2: Replace CycleState

interface CycleState {
  currentGroupIndex: number;
  currentSequentialPhaseIndex: number; // for sequential groups, tracks which phase within the group
  cycleNumber: number;
  cycleStartedAt: Date | null;
  lastCycleCompletedAt: Date | null;
  waitingForCooldown: boolean;
  activeGroupJobs: number; // how many jobs still running in the current group
}

const cycleState: CycleState = {
  currentGroupIndex: 0,
  currentSequentialPhaseIndex: 0,
  cycleNumber: 0,
  cycleStartedAt: null,
  lastCycleCompletedAt: null,
  waitingForCooldown: false,
  activeGroupJobs: 0,
};

Step 3: Rewrite pollAndAdvancePipeline

Replace the entire pollAndAdvancePipeline function (lines 306-385) and advancePipelinePhase function (lines 387-390) with:

async function pollAndAdvancePipeline(): Promise<void> {
  try {
    // 1. Check for manual pending jobs from admin API (priority over pipeline)
    if (runningJobs.size === 0) {
      const manualJob = await prisma.backgroundJob.findFirst({
        where: {
          status: 'pending',
          NOT: { config: { path: ['pipelineManaged'], equals: true } },
        },
        orderBy: { createdAt: 'asc' },
      });

      if (manualJob) {
        log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`);
        await startJobProcess(
          manualJob.id,
          manualJob.type,
          manualJob.language,
          manualJob.config as Record<string, unknown> | null
        );
        return;
      }
    }

    // 2. If jobs are still running for the current group, wait
    if (cycleState.activeGroupJobs > 0) {
      return;
    }

    // 3. If in cooldown, check if expired
    if (cycleState.waitingForCooldown) {
      if (cycleState.lastCycleCompletedAt) {
        const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime();
        if (elapsed < CYCLE_COOLDOWN_MS) {
          const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000);
          if (remaining % 30 === 0 || remaining <= 5) {
            log(`Cooldown: ${remaining} minutes remaining before next cycle`);
          }
          return;
        }
      }
      cycleState.waitingForCooldown = false;
      cycleState.currentGroupIndex = 0;
      cycleState.currentSequentialPhaseIndex = 0;
      log('Cooldown expired, starting new cycle');
    }

    // 4. If past the last group, complete the cycle
    if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) {
      cycleState.cycleNumber++;
      cycleState.lastCycleCompletedAt = new Date();
      cycleState.waitingForCooldown = true;
      const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000);
      log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`);
      return;
    }

    // 5. Start the current group
    const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];

    if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) {
      cycleState.cycleStartedAt = new Date();
      log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`);
    }

    if (group.mode === 'parallel') {
      // Launch all phases in the group concurrently
      log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`);
      cycleState.activeGroupJobs = group.phases.length;

      for (const phase of group.phases) {
        const jobId = await createPendingJob(
          phase.type,
          phase.language,
          { ...phase.config, pipelineManaged: true }
        );
        await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
      }
    } else {
      // Sequential: run one phase at a time within the group
      const phaseIndex = cycleState.currentSequentialPhaseIndex;
      if (phaseIndex >= group.phases.length) {
        // All phases in this sequential group are done
        cycleState.currentGroupIndex++;
        cycleState.currentSequentialPhaseIndex = 0;
        return; // Will pick up next group on next poll
      }

      const phase = group.phases[phaseIndex];
      log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`);
      cycleState.activeGroupJobs = 1;

      const jobId = await createPendingJob(
        phase.type,
        phase.language,
        { ...phase.config, pipelineManaged: true }
      );
      await startJobProcess(jobId, phase.type, phase.language || null, phase.config);
    }
  } catch (err) {
    logError(`Error in pipeline: ${err}`);
  }
}

function onJobCompleted(): void {
  cycleState.activeGroupJobs--;

  if (cycleState.activeGroupJobs <= 0) {
    cycleState.activeGroupJobs = 0;
    const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];

    if (group?.mode === 'sequential') {
      cycleState.currentSequentialPhaseIndex++;
      // Check if there are more phases in this sequential group
      if (cycleState.currentSequentialPhaseIndex < group.phases.length) {
        return; // Don't advance group yet
      }
    }

    // Advance to next group
    cycleState.currentGroupIndex++;
    cycleState.currentSequentialPhaseIndex = 0;
    log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`);
  }
}

Step 4: Update startJobProcess callbacks

In the child.on('close') callback (line 442) and child.on('error') callback (line 472), replace advancePipelinePhase() with onJobCompleted().

Step 5: Update crash recovery

In recoverFromCrash (lines 259-268), replace the PIPELINE_PHASES.findIndex logic with a search through PIPELINE_GROUPS:

  if (lastRunningPipelineJob) {
    for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) {
      const group = PIPELINE_GROUPS[gi];
      const phaseIdx = group.phases.findIndex(
        p => p.type === lastRunningPipelineJob.type &&
          (p.language || null) === (lastRunningPipelineJob.language || null)
      );
      if (phaseIdx >= 0) {
        cycleState.currentGroupIndex = gi;
        cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0;
        log(`Resuming pipeline from group ${gi + 1}: ${group.name}`);
        break;
      }
    }
  }

Step 6: Update heartbeat log in main()

Replace the heartbeat cron (lines 551-562) and the startup log (lines 574-580) to reference groups instead of phases:

  cron.schedule('0 * * * *', () => {
    const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length
      ? PIPELINE_GROUPS[cycleState.currentGroupIndex].name
      : 'none';
    const jobs = runningJobs.size > 0
      ? `Running: ${[...runningJobs.keys()].join(', ')}`
      : 'No jobs running';
    const state = cycleState.waitingForCooldown
      ? 'cooldown'
      : `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`;
    log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`);
  }, { timezone: 'UTC' });

For the startup log:

  log('=== Scheduler running (parallel grouped pipeline) ===');
  log(`Pipeline groups (${PIPELINE_GROUPS.length}):`);
  for (let i = 0; i < PIPELINE_GROUPS.length; i++) {
    const g = PIPELINE_GROUPS[i];
    const phaseNames = g.phases.map(p => p.name).join(', ');
    log(`  ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`);
  }

Step 7: Remove dead Google Places env log

Delete lines 167-169 (the GOOGLE_PLACES_API_KEY log in validateEnvironment).

Step 8: Verify build

Run: npm run build Expected: Build succeeds

Step 9: Commit

git add scripts/scheduler.ts
git commit -m "feat: parallel grouped pipeline scheduler

Replace sequential pipeline with grouped phases. Import phases run
sequentially, scraper phases run in parallel groups of 3. This reduces
cycle time from days to hours. Generic scraper moved to last group."

Task 3: Increase Scheduler Memory Limit

Files:

  • Modify: docker-compose.yml:217-220

Step 1: Increase memory limit

Change the scheduler service's deploy.resources.limits.memory from 4G to 10G:

    deploy:
      resources:
        limits:
          memory: 10G

Step 2: Commit

git add docker-compose.yml
git commit -m "chore: increase scheduler memory to 10G for parallel scrapers"

Task 4: Deploy and Verify

Step 1: Deploy to NAS

rsync -avz --exclude 'node_modules' --exclude '.next' --exclude '.git' --exclude '.env.local' --exclude '*.log' \
  /Users/albert/Documents/Projects/Church/ScraperControl/ albert@192.168.0.145:/volume1/docker/scraper-control/

Step 2: Rebuild and restart scheduler

ssh albert@192.168.0.145 'cd /volume1/docker/scraper-control && /usr/local/bin/docker compose build scheduler && /usr/local/bin/docker compose up -d scheduler'

Step 3: Verify logs show parallel groups

ssh albert@192.168.0.145 '/usr/local/bin/docker logs --tail 30 scraper-control-scheduler-1'

Expected: Logs show "parallel grouped pipeline", group listings with [parallel] and [sequential] tags, and eventually multiple concurrent Running: entries in heartbeat.