diff --git a/package.json b/package.json new file mode 100644 index 0000000..cbf09a9 --- /dev/null +++ b/package.json @@ -0,0 +1,65 @@ +{ + "name": "scraper-control", + "version": "1.0.0", + "private": true, + "scripts": { + "dev": "next dev -p 3001", + "build": "next build", + "start": "next start -p 3001", + "lint": "next lint", + "scrape:churches": "tsx scripts/scrape-churches.ts", + "scrape:masstimes": "tsx scripts/scrape-masstimes.ts", + "import:osm": "tsx scripts/import-osm-churches.ts", + "enrich:places": "tsx scripts/enrich-with-google-places.ts", + "enrich:freesearch": "tsx scripts/enrich-with-freesearch.ts", + "dedup:masses": "tsx scripts/dedup-mass-schedules.ts", + "scheduler": "tsx scripts/scheduler.ts", + "transfer:neon": "tsx scripts/transfer-enriched-to-neon.ts", + "test:scraper": "tsx scripts/test-scraper.ts", + "test:discover": "tsx scripts/test-url-discovery.ts", + "normalize:countries": "tsx scripts/normalize-country-codes.ts", + "enrich:wikidata": "tsx scripts/enrich-with-wikidata.ts", + "scrape:diocese": "tsx scripts/scrape-diocese-directory.ts", + "setup:diocese": "tsx scripts/setup-diocese.ts", + "import:gcatholic": "tsx scripts/import-gcatholic.ts", + "import:orarimesse": "tsx scripts/import-orarimesse.ts", + "import:mass-schedules-ph": "tsx scripts/import-mass-schedules-ph.ts", + "import:philmass": "tsx scripts/import-philmass.ts", + "import:horariosmisas": "tsx scripts/import-horariosmisas.ts", + "import:msze-info": "tsx scripts/import-msze-info.ts", + "import:weekdaymasses": "tsx scripts/import-weekdaymasses.ts", + "import:masstimes-api": "tsx scripts/import-masstimes-api.ts", + "import:discovermass": "tsx scripts/import-discovermass.ts", + "postinstall": "prisma generate" + }, + "dependencies": { + "@prisma/adapter-pg": "^7.3.0", + "@prisma/client": "^7.3.0", + "axios": "^1.13.3", + "chromadb": "^1.9.2", + "coordtransform": "^2.1.2", + "next": "^16.0.0", + "node-cron": "^3.0.3", + "open-location-code": "^1.0.3", + "openai": "^4.77.0", + "pg": "^8.17.2", + "react": "^19.0.0", + "react-dom": "^19.0.0" + }, + "devDependencies": { + "@tailwindcss/postcss": "^4.0.0", + "@types/node": "^22.0.0", + "@types/node-cron": "^3.0.11", + "@types/open-location-code": "^1.0.1", + "@types/pg": "^8.11.0", + "@types/react": "^19.0.0", + "@types/react-dom": "^19.0.0", + "eslint": "^9.0.0", + "eslint-config-next": "^16.0.0", + "playwright": "^1.58.0", + "prisma": "^7.3.0", + "tailwindcss": "^4.0.0", + "tsx": "^4.21.0", + "typescript": "^5.7.0" + } +} diff --git a/scripts/scheduler.ts b/scripts/scheduler.ts new file mode 100644 index 0000000..c750b65 --- /dev/null +++ b/scripts/scheduler.ts @@ -0,0 +1,787 @@ +import cron from 'node-cron'; +import { spawn, ChildProcess } from 'child_process'; +import fs from 'fs'; +import path from 'path'; +import dotenv from 'dotenv'; +dotenv.config({ path: path.resolve(process.cwd(), '.env') }); + +import { Pool } from 'pg'; +import { PrismaPg } from '@prisma/adapter-pg'; +import { PrismaClient } from '@prisma/client'; + +// Fresh DB connection for scheduler +const pool = new Pool({ connectionString: process.env.DATABASE_URL }); +const adapter = new PrismaPg(pool); +const prisma = new PrismaClient({ adapter }); + +// ─── Configuration ─────────────────────────────────────────────────────────── + +const LOGS_DIR = '/app/logs'; +const LOG_RETENTION_DAYS = 30; +const JOB_POLL_INTERVAL_MS = 30_000; // Check pipeline state every 30 seconds +const CYCLE_COOLDOWN_MS = 6 * 60 * 60 * 1000; // 6 hours between cycles +const JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours max per job +const STALE_JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours — mark non-pipeline running jobs as stale + +// ─── Pipeline Phases ──────────────────────────────────────────────────────── + +interface PipelinePhase { + name: string; + type: string; + language?: string; + config: Record; +} + +interface PipelineGroup { + name: string; + phases: PipelinePhase[]; + mode: 'sequential' | 'parallel'; +} + +const PIPELINE_GROUPS: PipelineGroup[] = [ + { + name: 'imports', + mode: 'sequential', + phases: [ + { name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } }, + { name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } }, + { name: 'orarimesse-import', type: 'orarimesse-import', config: {} }, + { name: 'mass-schedules-ph-import', type: 'mass-schedules-ph-import', config: {} }, + { name: 'philmass-import', type: 'philmass-import', config: {} }, + { name: 'horariosmisas-import', type: 'horariosmisas-import', config: {} }, + { name: 'msze-info-import', type: 'msze-info-import', config: {} }, + { name: 'weekdaymasses-import', type: 'weekdaymasses-import', config: {} }, + { name: 'messesinfo-import', type: 'messesinfo-import', config: {} }, + { name: 'bohosluzby-import', type: 'bohosluzby-import', config: {} }, + { name: 'miserend-import', type: 'miserend-import', config: {} }, + { name: 'kerknet-import', type: 'kerknet-import', config: {} }, + { name: 'gottesdienstzeiten-import', type: 'gottesdienstzeiten-import', config: {} }, + { name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} }, + { name: 'discovermass-import', type: 'discovermass-import', config: {} }, + ], + }, + { + name: 'scrapers-batch-1', + mode: 'parallel', + phases: [ + { name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } }, + { name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } }, + { name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } }, + ], + }, + { + name: 'scrapers-batch-2', + mode: 'parallel', + phases: [ + { name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } }, + { name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } }, + { name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } }, + { name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } }, + { name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } }, + ], + }, + { + name: 'scrapers-batch-3', + mode: 'parallel', + phases: [ + { name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } }, + { name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } }, + { name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } }, + ], + }, +]; + +// ─── Cycle State ──────────────────────────────────────────────────────────── + +interface CycleState { + currentGroupIndex: number; + currentSequentialPhaseIndex: number; + cycleNumber: number; + cycleStartedAt: Date | null; + lastCycleCompletedAt: Date | null; + waitingForCooldown: boolean; + activeGroupJobs: number; +} + +const cycleState: CycleState = { + currentGroupIndex: 0, + currentSequentialPhaseIndex: 0, + cycleNumber: 0, + cycleStartedAt: null, + lastCycleCompletedAt: null, + waitingForCooldown: false, + activeGroupJobs: 0, +}; + +// Map job type+language to script command +function getJobCommand(type: string, language?: string | null, config?: Record | null): { command: string; args: string[] } { + const limit = (config?.limit as number) || undefined; + + switch (type) { + case 'scraper': { + const args = ['tsx', 'scripts/scrape-churches.ts', '--all']; + if (language) args.push('--language', language); + if (config?.maxFailures) args.push('--max-failures', String(config.maxFailures)); + return { command: 'npx', args }; + } + case 'freesearch-enrichment': { + const args = ['tsx', 'scripts/enrich-with-freesearch.ts']; + if (config?.continuous) args.push('--continuous'); + if (config?.reSearch) args.push('--re-search'); + if (limit) args.push('--limit', String(limit)); + if (config?.country) args.push('--country', String(config.country)); + return { command: 'npx', args }; + } + case 'reverse-geocode-enrichment': { + const args = ['tsx', 'scripts/enrich-with-reverse-geocode.ts']; + if (config?.continuous) args.push('--continuous'); + if (limit) args.push('--limit', String(limit)); + if (config?.country) args.push('--country', String(config.country)); + return { command: 'npx', args }; + } + case 'match-search-results': { + const args = ['tsx', 'scripts/match-search-results.ts']; + if (limit) args.push('--limit', String(limit)); + if (config?.country) args.push('--country', String(config.country)); + if (config?.threshold) args.push('--threshold', String(config.threshold)); + return { command: 'npx', args }; + } + case 'diocese-directory': { + const args = ['tsx', 'scripts/scrape-diocese-directory.ts', '--all']; + return { command: 'npx', args }; + } + case 'wikidata-enrichment': { + return { command: 'npx', args: ['tsx', 'scripts/enrich-with-wikidata.ts'] }; + } + case 'osm-import': { + const args = ['tsx', 'scripts/import-osm-churches.ts']; + if (config?.priority) args.push('--priority', String(config.priority)); + else args.push('--all'); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'baidu-import': { + const args = ['tsx', 'scripts/import-baidu-churches.ts']; + if (config?.resumeFromCell) args.push('--resume-from-cell', String(config.resumeFromCell)); + return { command: 'npx', args }; + } + case 'gcatholic-import': { + const args = ['tsx', 'scripts/import-gcatholic.ts', '--all']; + if (config?.country) args.push('--country', String(config.country)); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + if (config?.delay) args.push('--delay', String(config.delay)); + return { command: 'npx', args }; + } + case 'orarimesse-import': { + const args = ['tsx', 'scripts/import-orarimesse.ts', '--all']; + if (config?.diocese) args.push('--diocese', String(config.diocese)); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'mass-schedules-ph-import': { + const args = ['tsx', 'scripts/import-mass-schedules-ph.ts', '--all']; + if (config?.churchId) args.push('--church-id', String(config.churchId)); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'philmass-import': { + const args = ['tsx', 'scripts/import-philmass.ts', '--all']; + if (config?.province) args.push('--province', String(config.province)); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'horariosmisas-import': { + const args = ['tsx', 'scripts/import-horariosmisas.ts', '--all', '--geocode']; + if (config?.province) args.push('--province', String(config.province)); + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'msze-info-import': { + const args = ['tsx', 'scripts/import-msze-info.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'weekdaymasses-import': { + const args = ['tsx', 'scripts/import-weekdaymasses.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'messesinfo-import': { + const args = ['tsx', 'scripts/import-messesinfo.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'bohosluzby-import': { + const args = ['tsx', 'scripts/import-bohosluzby.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'miserend-import': { + const args = ['tsx', 'scripts/import-miserend.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'kerknet-import': { + const args = ['tsx', 'scripts/import-kerknet.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'gottesdienstzeiten-import': { + const args = ['tsx', 'scripts/import-gottesdienstzeiten.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + case 'masstimes-api-import': { + const args = ['tsx', 'scripts/import-masstimes-api.ts', '--all', '--skip-us']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + if (config?.region) args.splice(2, 1, '--region', String(config.region)); // replace --all with --region + return { command: 'npx', args }; + } + case 'discovermass-import': { + const args = ['tsx', 'scripts/import-discovermass.ts', '--all']; + if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); + return { command: 'npx', args }; + } + default: + throw new Error(`Unknown job type: ${type}`); + } +} + +// ─── State ─────────────────────────────────────────────────────────────────── + +const runningJobs = new Map(); + +/** Kill entire process group (child + all descendants like Chromium). + * Falls back to child.kill() if process group kill fails. */ +function killProcessGroup(child: ChildProcess, signal: NodeJS.Signals = 'SIGTERM'): void { + try { + if (child.pid) { + process.kill(-child.pid, signal); // negative PID = kill process group + } + } catch (err: unknown) { + const code = (err as NodeJS.ErrnoException).code; + if (code !== 'ESRCH') { + logError(`killProcessGroup(-${child.pid}, ${signal}) failed: ${err}`); + } + // Process group already dead, try direct kill as fallback + try { child.kill(signal); } catch { /* already dead */ } + } +} + +// ─── Logging ───────────────────────────────────────────────────────────────── + +function log(message: string): void { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] ${message}`); +} + +function logError(message: string): void { + const timestamp = new Date().toISOString(); + console.error(`[${timestamp}] ERROR: ${message}`); +} + +// ─── Environment Validation ────────────────────────────────────────────────── + +function validateEnvironment(): void { + if (!process.env.DATABASE_URL) { + logError('Missing required environment variable: DATABASE_URL'); + process.exit(1); + } + + log('Environment variables validated'); + log(` DATABASE_URL: ${process.env.DATABASE_URL?.substring(0, 30)}...`); +} + +// ─── Log File Management ───────────────────────────────────────────────────── + +function ensureLogsDir(): void { + if (!fs.existsSync(LOGS_DIR)) { + fs.mkdirSync(LOGS_DIR, { recursive: true }); + log(`Created logs directory: ${LOGS_DIR}`); + } +} + +function getLogFilePath(jobType: string): string { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + return path.join(LOGS_DIR, `${jobType}-${timestamp}.log`); +} + +function cleanOldLogs(): void { + log('Cleaning old log files...'); + const cutoff = Date.now() - LOG_RETENTION_DAYS * 24 * 60 * 60 * 1000; + let cleaned = 0; + + try { + const files = fs.readdirSync(LOGS_DIR); + for (const file of files) { + if (!file.endsWith('.log')) continue; + const filePath = path.join(LOGS_DIR, file); + const stat = fs.statSync(filePath); + if (stat.mtimeMs < cutoff) { + fs.unlinkSync(filePath); + cleaned++; + } + } + log(`Cleaned ${cleaned} log files older than ${LOG_RETENTION_DAYS} days`); + } catch (err) { + logError(`Failed to clean logs: ${err}`); + } +} + +// ─── Crash Recovery ───────────────────────────────────────────────────────── + +async function recoverFromCrash(): Promise { + log('Running crash recovery...'); + + // Re-queue orphaned 'running' pipeline jobs as 'pending' so they get retried + const requeued = await prisma.backgroundJob.updateMany({ + where: { + status: 'running', + config: { path: ['pipelineManaged'], equals: true }, + }, + data: { status: 'pending', error: null, completedAt: null }, + }); + if (requeued.count > 0) { + log(`Re-queued ${requeued.count} pipeline jobs interrupted by restart`); + } + + // Mark non-pipeline running jobs as failed (manual jobs can't be safely resumed) + const orphanedRunning = await prisma.backgroundJob.updateMany({ + where: { + status: 'running', + NOT: { config: { path: ['pipelineManaged'], equals: true } }, + }, + data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() }, + }); + if (orphanedRunning.count > 0) { + log(`Marked ${orphanedRunning.count} orphaned manual jobs as failed`); + } + + // Keep stale 'pending' pipeline jobs — they'll be picked up by the pipeline + // Only cancel non-pipeline stale pending jobs + const stalePending = await prisma.backgroundJob.updateMany({ + where: { + status: 'pending', + NOT: { config: { path: ['pipelineManaged'], equals: true } }, + }, + data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() }, + }); + if (stalePending.count > 0) { + log(`Cancelled ${stalePending.count} stale pending manual jobs`); + } + + // Resume pipeline from the last incomplete phase + const lastRunningPipelineJob = await prisma.backgroundJob.findFirst({ + where: { + status: 'pending', + config: { path: ['pipelineManaged'], equals: true }, + }, + orderBy: { createdAt: 'desc' }, + }); + + if (lastRunningPipelineJob) { + for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) { + const group = PIPELINE_GROUPS[gi]; + const phaseIdx = group.phases.findIndex( + p => p.type === lastRunningPipelineJob.type && + (p.language || null) === (lastRunningPipelineJob.language || null) + ); + if (phaseIdx >= 0) { + cycleState.currentGroupIndex = gi; + cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0; + log(`Resuming pipeline from group ${gi + 1}: ${group.name}`); + break; + } + } + } + + log('Crash recovery complete'); +} + +async function cleanStaleJobs(): Promise { + const cutoff = new Date(Date.now() - STALE_JOB_TIMEOUT_MS); + + const stale = await prisma.backgroundJob.updateMany({ + where: { + status: 'running', + startedAt: { lt: cutoff }, + NOT: { config: { path: ['pipelineManaged'], equals: true } }, + }, + data: { + status: 'failed', + error: 'Stale: exceeded 24h runtime without completion', + completedAt: new Date(), + }, + }); + + if (stale.count > 0) { + log(`Cleaned ${stale.count} stale non-pipeline job(s) running >24h`); + } +} + +// ─── Database-Driven Job Queue ─────────────────────────────────────────────── + +async function createPendingJob(type: string, language?: string, config?: Record): Promise { + // Don't create if a job of this type is already pending or running + const existing = await prisma.backgroundJob.findFirst({ + where: { + type, + language: language || null, + status: { in: ['pending', 'running'] }, + }, + }); + + if (existing) { + log(`Skipping ${type}${language ? `:${language}` : ''} — already ${existing.status} (${existing.id})`); + return existing.id; + } + + const job = await prisma.backgroundJob.create({ + data: { + type, + language: language || null, + status: 'pending', + config: config || null, + }, + }); + + log(`Created pending job: ${type}${language ? `:${language}` : ''} (${job.id})`); + return job.id; +} + +// ─── Continuous Pipeline Logic ────────────────────────────────────────────── + +async function pollAndAdvancePipeline(): Promise { + try { + // 1. Check for manual pending jobs from admin API (priority over pipeline) + if (runningJobs.size === 0) { + const manualJob = await prisma.backgroundJob.findFirst({ + where: { + status: 'pending', + NOT: { config: { path: ['pipelineManaged'], equals: true } }, + }, + orderBy: { createdAt: 'asc' }, + }); + + if (manualJob) { + log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`); + await startJobProcess( + manualJob.id, + manualJob.type, + manualJob.language, + manualJob.config as Record | null + ); + return; + } + } + + // 2. If jobs are still running for the current group, wait + if (cycleState.activeGroupJobs > 0) { + return; + } + + // 3. If in cooldown, check if expired + if (cycleState.waitingForCooldown) { + if (cycleState.lastCycleCompletedAt) { + const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime(); + if (elapsed < CYCLE_COOLDOWN_MS) { + const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000); + if (remaining % 30 === 0 || remaining <= 5) { + log(`Cooldown: ${remaining} minutes remaining before next cycle`); + } + return; + } + } + cycleState.waitingForCooldown = false; + cycleState.currentGroupIndex = 0; + cycleState.currentSequentialPhaseIndex = 0; + log('Cooldown expired, starting new cycle'); + } + + // 4. If past the last group, complete the cycle + if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) { + cycleState.cycleNumber++; + cycleState.lastCycleCompletedAt = new Date(); + cycleState.waitingForCooldown = true; + const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000); + log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`); + return; + } + + // 5. Start the current group + const group = PIPELINE_GROUPS[cycleState.currentGroupIndex]; + + if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) { + cycleState.cycleStartedAt = new Date(); + log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`); + } + + if (group.mode === 'parallel') { + // Launch all phases in the group concurrently + log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`); + cycleState.activeGroupJobs = group.phases.length; + + for (const phase of group.phases) { + const jobId = await createPendingJob( + phase.type, + phase.language, + { ...phase.config, pipelineManaged: true } + ); + await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true); + } + } else { + // Sequential: run one phase at a time within the group + const phaseIndex = cycleState.currentSequentialPhaseIndex; + if (phaseIndex >= group.phases.length) { + // All phases in this sequential group are done + cycleState.currentGroupIndex++; + cycleState.currentSequentialPhaseIndex = 0; + return; // Will pick up next group on next poll + } + + const phase = group.phases[phaseIndex]; + log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`); + cycleState.activeGroupJobs = 1; + + const jobId = await createPendingJob( + phase.type, + phase.language, + { ...phase.config, pipelineManaged: true } + ); + await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true); + } + } catch (err) { + logError(`Error in pipeline: ${err}`); + } +} + +function onJobCompleted(): void { + cycleState.activeGroupJobs--; + + if (cycleState.activeGroupJobs <= 0) { + cycleState.activeGroupJobs = 0; + const group = PIPELINE_GROUPS[cycleState.currentGroupIndex]; + + if (group?.mode === 'sequential') { + cycleState.currentSequentialPhaseIndex++; + if (cycleState.currentSequentialPhaseIndex < group.phases.length) { + return; // Don't advance group yet + } + } + + cycleState.currentGroupIndex++; + cycleState.currentSequentialPhaseIndex = 0; + log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`); + } +} + +async function startJobProcess( + jobId: string, + type: string, + language: string | null, + config: Record | null, + isPipelineJob = false +): Promise { + const { command, args } = getJobCommand(type, language, config); + + // Add --job-id to args so the script tracks this job + args.push('--job-id', jobId); + + const logFile = getLogFilePath(type); + const logStream = fs.createWriteStream(logFile, { flags: 'a' }); + const typeKey = `${type}:${language || 'all'}`; + const stderrBuffer: string[] = []; // Keep last 20 lines of stderr for error reporting + + log(`Starting job: ${typeKey} (${jobId})`); + log(` Command: ${command} ${args.join(' ')}`); + log(` Log file: ${logFile}`); + + const header = `[${new Date().toISOString()}] Starting ${typeKey}: ${command} ${args.join(' ')}\n`; + logStream.write(header); + + const child = spawn(command, args, { + cwd: '/app', + env: process.env as NodeJS.ProcessEnv, + stdio: ['ignore', 'pipe', 'pipe'], + detached: true, // own process group so we can kill Chromium grandchildren + }); + + runningJobs.set(typeKey, { process: child, jobId }); + + child.stdout?.on('data', (data: Buffer) => { + logStream.write(data); + }); + + child.stderr?.on('data', (data: Buffer) => { + logStream.write(data); + const lines = data.toString().split('\n').filter(l => l.trim()); + stderrBuffer.push(...lines); + if (stderrBuffer.length > 20) { + stderrBuffer.splice(0, stderrBuffer.length - 20); + } + }); + + // Timeout — kill entire process group (including Chromium grandchildren) + const timeout = setTimeout(() => { + logError(`Job ${typeKey} (${jobId}) timed out after ${JOB_TIMEOUT_MS / 3600000}h, killing process group...`); + logStream.write(`\n[${new Date().toISOString()}] TIMEOUT: killed after ${JOB_TIMEOUT_MS / 3600000}h\n`); + killProcessGroup(child, 'SIGTERM'); + setTimeout(() => { + if (runningJobs.has(typeKey)) { + killProcessGroup(child, 'SIGKILL'); + } + }, 10_000); + }, JOB_TIMEOUT_MS); + + child.on('close', async (code: number | null) => { + clearTimeout(timeout); + runningJobs.delete(typeKey); + const footer = `\n[${new Date().toISOString()}] Finished ${typeKey}: exit code ${code}\n`; + logStream.write(footer); + logStream.end(); + + if (code === 0) { + log(`Job ${typeKey} (${jobId}) completed successfully`); + } else { + logError(`Job ${typeKey} (${jobId}) failed with exit code ${code}`); + // Mark as failed if the script didn't do it itself + try { + const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } }); + if (job && job.status === 'running') { + const stderrTail = stderrBuffer.join('\n').slice(-500); + const errorMsg = stderrTail + ? `Exit code: ${code}\n${stderrTail}` + : `Exit code: ${code}`; + await prisma.backgroundJob.update({ + where: { id: jobId }, + data: { status: 'failed', error: errorMsg, completedAt: new Date() }, + }); + } + } catch (err) { + logError(`Failed to update job status: ${err}`); + } + } + + // Advance pipeline regardless of success/failure + // Failed languages are logged but pipeline continues + if (isPipelineJob) onJobCompleted(); + }); + + child.on('error', async (err: Error) => { + clearTimeout(timeout); + runningJobs.delete(typeKey); + logStream.write(`\n[${new Date().toISOString()}] ERROR: ${err.message}\n`); + logStream.end(); + logError(`Job ${typeKey} (${jobId}) failed to start: ${err.message}`); + + try { + await prisma.backgroundJob.update({ + where: { id: jobId }, + data: { status: 'failed', error: err.message, completedAt: new Date() }, + }); + } catch (updateErr) { + logError(`Failed to update job status: ${updateErr}`); + } + + // Advance pipeline even on error + if (isPipelineJob) onJobCompleted(); + }); +} + +// ─── Graceful Shutdown ─────────────────────────────────────────────────────── + +async function shutdown(signal: string): Promise { + log(`Received ${signal}, shutting down gracefully...`); + + // Mark running jobs as pending so they can be resumed on restart + for (const [name, { process: child, jobId }] of runningJobs) { + log(`Stopping running job: ${name} (${jobId})`); + killProcessGroup(child, 'SIGTERM'); + + // Re-queue the job for the next startup + try { + await prisma.backgroundJob.update({ + where: { id: jobId }, + data: { status: 'pending', error: null }, + }); + log(`Re-queued job ${jobId} for restart`); + } catch (err) { + logError(`Failed to re-queue job ${jobId}: ${err}`); + } + } + + // Give jobs 10 seconds to finish, then force exit + setTimeout(() => { + if (runningJobs.size > 0) { + logError(`Force killing ${runningJobs.size} remaining jobs`); + for (const [, { process: child }] of runningJobs) { + killProcessGroup(child, 'SIGKILL'); + } + } + prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0)); + }, 10_000); + + // If no jobs running, exit immediately + if (runningJobs.size === 0) { + prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0)); + } +} + +process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on('SIGINT', () => shutdown('SIGINT')); + +// ─── Main ──────────────────────────────────────────────────────────────────── + +async function main(): Promise { + log('=== NearestMass Scheduler Starting (Continuous Pipeline) ==='); + + validateEnvironment(); + ensureLogsDir(); + + // Crash recovery: mark orphaned jobs as failed + await recoverFromCrash(); + + // Log cleanup at 03:00 UTC + cron.schedule('0 3 * * *', () => cleanOldLogs(), { timezone: 'UTC' }); + log('Registered cron job: log-cleanup (0 3 * * * UTC)'); + + // Stale job cleanup every 6 hours + cron.schedule('0 */6 * * *', () => cleanStaleJobs(), { timezone: 'UTC' }); + log('Registered cron job: stale-job-cleanup (every 6h)'); + + // Heartbeat every hour — logs cycle state + cron.schedule('0 * * * *', () => { + const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length + ? PIPELINE_GROUPS[cycleState.currentGroupIndex].name + : 'none'; + const jobs = runningJobs.size > 0 + ? `Running: ${[...runningJobs.keys()].join(', ')}` + : 'No jobs running'; + const state = cycleState.waitingForCooldown + ? 'cooldown' + : `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`; + log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`); + }, { timezone: 'UTC' }); + log('Registered cron job: heartbeat (hourly)'); + + // Poll and advance pipeline every 30 seconds + setInterval(() => { + pollAndAdvancePipeline(); + }, JOB_POLL_INTERVAL_MS); + log(`Pipeline polling every ${JOB_POLL_INTERVAL_MS / 1000}s`); + + // Start pipeline immediately + await pollAndAdvancePipeline(); + + log('=== Scheduler running (parallel grouped pipeline) ==='); + log(`Pipeline groups (${PIPELINE_GROUPS.length}):`); + for (let i = 0; i < PIPELINE_GROUPS.length; i++) { + const g = PIPELINE_GROUPS[i]; + const phaseNames = g.phases.map(p => p.name).join(', '); + log(` ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`); + } + log(`Cooldown between cycles: ${CYCLE_COOLDOWN_MS / (60 * 60 * 1000)}h`); + log(`Job timeout: ${JOB_TIMEOUT_MS / (60 * 60 * 1000)}h`); +} + +main();