import cron from 'node-cron'; import { spawn, ChildProcess } from 'child_process'; import fs from 'fs'; import path from 'path'; import dotenv from 'dotenv'; dotenv.config({ path: path.resolve(process.cwd(), '.env') }); import { Pool } from 'pg'; import { PrismaPg } from '@prisma/adapter-pg'; import { PrismaClient } from '@prisma/client'; // Fresh DB connection for scheduler const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const adapter = new PrismaPg(pool); const prisma = new PrismaClient({ adapter }); // ─── Configuration ─────────────────────────────────────────────────────────── const LOGS_DIR = '/app/logs'; const LOG_RETENTION_DAYS = 30; const JOB_POLL_INTERVAL_MS = 30_000; // Check pipeline state every 30 seconds const CYCLE_COOLDOWN_MS = 6 * 60 * 60 * 1000; // 6 hours between cycles const JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours max per job const STALE_JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours — mark non-pipeline running jobs as stale // ─── Pipeline Phases ──────────────────────────────────────────────────────── interface PipelinePhase { name: string; type: string; language?: string; config: Record; } interface PipelineGroup { name: string; phases: PipelinePhase[]; mode: 'sequential' | 'parallel'; } const PIPELINE_GROUPS: PipelineGroup[] = [ { name: 'imports', mode: 'sequential', phases: [ { name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } }, { name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } }, { name: 'orarimesse-import', type: 'orarimesse-import', config: {} }, { name: 'mass-schedules-ph-import', type: 'mass-schedules-ph-import', config: {} }, { name: 'philmass-import', type: 'philmass-import', config: {} }, { name: 'horariosmisas-import', type: 'horariosmisas-import', config: {} }, { name: 'msze-info-import', type: 'msze-info-import', config: {} }, { name: 'weekdaymasses-import', type: 'weekdaymasses-import', config: {} }, { name: 'messesinfo-import', type: 'messesinfo-import', config: {} }, { name: 'bohosluzby-import', type: 'bohosluzby-import', config: {} }, { name: 'miserend-import', type: 'miserend-import', config: {} }, { name: 'kerknet-import', type: 'kerknet-import', config: {} }, { name: 'gottesdienstzeiten-import', type: 'gottesdienstzeiten-import', config: {} }, { name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} }, ], }, { name: 'scrapers-batch-1', mode: 'parallel', phases: [ { name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } }, { name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } }, { name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } }, ], }, { name: 'scrapers-batch-2', mode: 'parallel', phases: [ { name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } }, { name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } }, { name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } }, { name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } }, { name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } }, ], }, { name: 'scrapers-batch-3', mode: 'parallel', phases: [ { name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } }, { name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } }, { name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } }, ], }, ]; // ─── Cycle State ──────────────────────────────────────────────────────────── interface CycleState { currentGroupIndex: number; currentSequentialPhaseIndex: number; cycleNumber: number; cycleStartedAt: Date | null; lastCycleCompletedAt: Date | null; waitingForCooldown: boolean; activeGroupJobs: number; } const cycleState: CycleState = { currentGroupIndex: 0, currentSequentialPhaseIndex: 0, cycleNumber: 0, cycleStartedAt: null, lastCycleCompletedAt: null, waitingForCooldown: false, activeGroupJobs: 0, }; // Map job type+language to script command function getJobCommand(type: string, language?: string | null, config?: Record | null): { command: string; args: string[] } { const limit = (config?.limit as number) || undefined; switch (type) { case 'scraper': { const args = ['tsx', 'scripts/scrape-churches.ts', '--all']; if (language) args.push('--language', language); if (config?.maxFailures) args.push('--max-failures', String(config.maxFailures)); return { command: 'npx', args }; } case 'freesearch-enrichment': { const args = ['tsx', 'scripts/enrich-with-freesearch.ts']; if (config?.continuous) args.push('--continuous'); if (config?.reSearch) args.push('--re-search'); if (limit) args.push('--limit', String(limit)); if (config?.country) args.push('--country', String(config.country)); return { command: 'npx', args }; } case 'reverse-geocode-enrichment': { const args = ['tsx', 'scripts/enrich-with-reverse-geocode.ts']; if (config?.continuous) args.push('--continuous'); if (limit) args.push('--limit', String(limit)); if (config?.country) args.push('--country', String(config.country)); return { command: 'npx', args }; } case 'match-search-results': { const args = ['tsx', 'scripts/match-search-results.ts']; if (limit) args.push('--limit', String(limit)); if (config?.country) args.push('--country', String(config.country)); if (config?.threshold) args.push('--threshold', String(config.threshold)); return { command: 'npx', args }; } case 'diocese-directory': { const args = ['tsx', 'scripts/scrape-diocese-directory.ts', '--all']; return { command: 'npx', args }; } case 'wikidata-enrichment': { return { command: 'npx', args: ['tsx', 'scripts/enrich-with-wikidata.ts'] }; } case 'osm-import': { const args = ['tsx', 'scripts/import-osm-churches.ts']; if (config?.priority) args.push('--priority', String(config.priority)); else args.push('--all'); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'baidu-import': { const args = ['tsx', 'scripts/import-baidu-churches.ts']; if (config?.resumeFromCell) args.push('--resume-from-cell', String(config.resumeFromCell)); return { command: 'npx', args }; } case 'gcatholic-import': { const args = ['tsx', 'scripts/import-gcatholic.ts', '--all']; if (config?.country) args.push('--country', String(config.country)); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); if (config?.delay) args.push('--delay', String(config.delay)); return { command: 'npx', args }; } case 'orarimesse-import': { const args = ['tsx', 'scripts/import-orarimesse.ts', '--all']; if (config?.diocese) args.push('--diocese', String(config.diocese)); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'mass-schedules-ph-import': { const args = ['tsx', 'scripts/import-mass-schedules-ph.ts', '--all']; if (config?.churchId) args.push('--church-id', String(config.churchId)); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'philmass-import': { const args = ['tsx', 'scripts/import-philmass.ts', '--all']; if (config?.province) args.push('--province', String(config.province)); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'horariosmisas-import': { const args = ['tsx', 'scripts/import-horariosmisas.ts', '--all', '--geocode']; if (config?.province) args.push('--province', String(config.province)); if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'msze-info-import': { const args = ['tsx', 'scripts/import-msze-info.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'weekdaymasses-import': { const args = ['tsx', 'scripts/import-weekdaymasses.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'messesinfo-import': { const args = ['tsx', 'scripts/import-messesinfo.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'bohosluzby-import': { const args = ['tsx', 'scripts/import-bohosluzby.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'miserend-import': { const args = ['tsx', 'scripts/import-miserend.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'kerknet-import': { const args = ['tsx', 'scripts/import-kerknet.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'gottesdienstzeiten-import': { const args = ['tsx', 'scripts/import-gottesdienstzeiten.ts', '--all']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); return { command: 'npx', args }; } case 'masstimes-api-import': { const args = ['tsx', 'scripts/import-masstimes-api.ts', '--all', '--skip-us']; if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom)); if (config?.region) args.splice(2, 1, '--region', String(config.region)); // replace --all with --region return { command: 'npx', args }; } default: throw new Error(`Unknown job type: ${type}`); } } // ─── State ─────────────────────────────────────────────────────────────────── const runningJobs = new Map(); /** Kill entire process group (child + all descendants like Chromium). * Falls back to child.kill() if process group kill fails. */ function killProcessGroup(child: ChildProcess, signal: NodeJS.Signals = 'SIGTERM'): void { try { if (child.pid) { process.kill(-child.pid, signal); // negative PID = kill process group } } catch (err: unknown) { const code = (err as NodeJS.ErrnoException).code; if (code !== 'ESRCH') { logError(`killProcessGroup(-${child.pid}, ${signal}) failed: ${err}`); } // Process group already dead, try direct kill as fallback try { child.kill(signal); } catch { /* already dead */ } } } // ─── Logging ───────────────────────────────────────────────────────────────── function log(message: string): void { const timestamp = new Date().toISOString(); console.log(`[${timestamp}] ${message}`); } function logError(message: string): void { const timestamp = new Date().toISOString(); console.error(`[${timestamp}] ERROR: ${message}`); } // ─── Environment Validation ────────────────────────────────────────────────── function validateEnvironment(): void { if (!process.env.DATABASE_URL) { logError('Missing required environment variable: DATABASE_URL'); process.exit(1); } log('Environment variables validated'); log(` DATABASE_URL: ${process.env.DATABASE_URL?.substring(0, 30)}...`); } // ─── Log File Management ───────────────────────────────────────────────────── function ensureLogsDir(): void { if (!fs.existsSync(LOGS_DIR)) { fs.mkdirSync(LOGS_DIR, { recursive: true }); log(`Created logs directory: ${LOGS_DIR}`); } } function getLogFilePath(jobType: string): string { const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); return path.join(LOGS_DIR, `${jobType}-${timestamp}.log`); } function cleanOldLogs(): void { log('Cleaning old log files...'); const cutoff = Date.now() - LOG_RETENTION_DAYS * 24 * 60 * 60 * 1000; let cleaned = 0; try { const files = fs.readdirSync(LOGS_DIR); for (const file of files) { if (!file.endsWith('.log')) continue; const filePath = path.join(LOGS_DIR, file); const stat = fs.statSync(filePath); if (stat.mtimeMs < cutoff) { fs.unlinkSync(filePath); cleaned++; } } log(`Cleaned ${cleaned} log files older than ${LOG_RETENTION_DAYS} days`); } catch (err) { logError(`Failed to clean logs: ${err}`); } } // ─── Crash Recovery ───────────────────────────────────────────────────────── async function recoverFromCrash(): Promise { log('Running crash recovery...'); // Re-queue orphaned 'running' pipeline jobs as 'pending' so they get retried const requeued = await prisma.backgroundJob.updateMany({ where: { status: 'running', config: { path: ['pipelineManaged'], equals: true }, }, data: { status: 'pending', error: null, completedAt: null }, }); if (requeued.count > 0) { log(`Re-queued ${requeued.count} pipeline jobs interrupted by restart`); } // Mark non-pipeline running jobs as failed (manual jobs can't be safely resumed) const orphanedRunning = await prisma.backgroundJob.updateMany({ where: { status: 'running', NOT: { config: { path: ['pipelineManaged'], equals: true } }, }, data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() }, }); if (orphanedRunning.count > 0) { log(`Marked ${orphanedRunning.count} orphaned manual jobs as failed`); } // Keep stale 'pending' pipeline jobs — they'll be picked up by the pipeline // Only cancel non-pipeline stale pending jobs const stalePending = await prisma.backgroundJob.updateMany({ where: { status: 'pending', NOT: { config: { path: ['pipelineManaged'], equals: true } }, }, data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() }, }); if (stalePending.count > 0) { log(`Cancelled ${stalePending.count} stale pending manual jobs`); } // Resume pipeline from the last incomplete phase const lastRunningPipelineJob = await prisma.backgroundJob.findFirst({ where: { status: 'pending', config: { path: ['pipelineManaged'], equals: true }, }, orderBy: { createdAt: 'desc' }, }); if (lastRunningPipelineJob) { for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) { const group = PIPELINE_GROUPS[gi]; const phaseIdx = group.phases.findIndex( p => p.type === lastRunningPipelineJob.type && (p.language || null) === (lastRunningPipelineJob.language || null) ); if (phaseIdx >= 0) { cycleState.currentGroupIndex = gi; cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0; log(`Resuming pipeline from group ${gi + 1}: ${group.name}`); break; } } } log('Crash recovery complete'); } async function cleanStaleJobs(): Promise { const cutoff = new Date(Date.now() - STALE_JOB_TIMEOUT_MS); const stale = await prisma.backgroundJob.updateMany({ where: { status: 'running', startedAt: { lt: cutoff }, NOT: { config: { path: ['pipelineManaged'], equals: true } }, }, data: { status: 'failed', error: 'Stale: exceeded 24h runtime without completion', completedAt: new Date(), }, }); if (stale.count > 0) { log(`Cleaned ${stale.count} stale non-pipeline job(s) running >24h`); } } // ─── Database-Driven Job Queue ─────────────────────────────────────────────── async function createPendingJob(type: string, language?: string, config?: Record): Promise { // Don't create if a job of this type is already pending or running const existing = await prisma.backgroundJob.findFirst({ where: { type, language: language || null, status: { in: ['pending', 'running'] }, }, }); if (existing) { log(`Skipping ${type}${language ? `:${language}` : ''} — already ${existing.status} (${existing.id})`); return existing.id; } const job = await prisma.backgroundJob.create({ data: { type, language: language || null, status: 'pending', config: config || null, }, }); log(`Created pending job: ${type}${language ? `:${language}` : ''} (${job.id})`); return job.id; } // ─── Continuous Pipeline Logic ────────────────────────────────────────────── async function pollAndAdvancePipeline(): Promise { try { // 1. Check for manual pending jobs from admin API (priority over pipeline) if (runningJobs.size === 0) { const manualJob = await prisma.backgroundJob.findFirst({ where: { status: 'pending', NOT: { config: { path: ['pipelineManaged'], equals: true } }, }, orderBy: { createdAt: 'asc' }, }); if (manualJob) { log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`); await startJobProcess( manualJob.id, manualJob.type, manualJob.language, manualJob.config as Record | null ); return; } } // 2. If jobs are still running for the current group, wait if (cycleState.activeGroupJobs > 0) { return; } // 3. If in cooldown, check if expired if (cycleState.waitingForCooldown) { if (cycleState.lastCycleCompletedAt) { const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime(); if (elapsed < CYCLE_COOLDOWN_MS) { const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000); if (remaining % 30 === 0 || remaining <= 5) { log(`Cooldown: ${remaining} minutes remaining before next cycle`); } return; } } cycleState.waitingForCooldown = false; cycleState.currentGroupIndex = 0; cycleState.currentSequentialPhaseIndex = 0; log('Cooldown expired, starting new cycle'); } // 4. If past the last group, complete the cycle if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) { cycleState.cycleNumber++; cycleState.lastCycleCompletedAt = new Date(); cycleState.waitingForCooldown = true; const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000); log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`); return; } // 5. Start the current group const group = PIPELINE_GROUPS[cycleState.currentGroupIndex]; if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) { cycleState.cycleStartedAt = new Date(); log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`); } if (group.mode === 'parallel') { // Launch all phases in the group concurrently log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`); cycleState.activeGroupJobs = group.phases.length; for (const phase of group.phases) { const jobId = await createPendingJob( phase.type, phase.language, { ...phase.config, pipelineManaged: true } ); await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true); } } else { // Sequential: run one phase at a time within the group const phaseIndex = cycleState.currentSequentialPhaseIndex; if (phaseIndex >= group.phases.length) { // All phases in this sequential group are done cycleState.currentGroupIndex++; cycleState.currentSequentialPhaseIndex = 0; return; // Will pick up next group on next poll } const phase = group.phases[phaseIndex]; log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`); cycleState.activeGroupJobs = 1; const jobId = await createPendingJob( phase.type, phase.language, { ...phase.config, pipelineManaged: true } ); await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true); } } catch (err) { logError(`Error in pipeline: ${err}`); } } function onJobCompleted(): void { cycleState.activeGroupJobs--; if (cycleState.activeGroupJobs <= 0) { cycleState.activeGroupJobs = 0; const group = PIPELINE_GROUPS[cycleState.currentGroupIndex]; if (group?.mode === 'sequential') { cycleState.currentSequentialPhaseIndex++; if (cycleState.currentSequentialPhaseIndex < group.phases.length) { return; // Don't advance group yet } } cycleState.currentGroupIndex++; cycleState.currentSequentialPhaseIndex = 0; log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`); } } async function startJobProcess( jobId: string, type: string, language: string | null, config: Record | null, isPipelineJob = false ): Promise { const { command, args } = getJobCommand(type, language, config); // Add --job-id to args so the script tracks this job args.push('--job-id', jobId); const logFile = getLogFilePath(type); const logStream = fs.createWriteStream(logFile, { flags: 'a' }); const typeKey = `${type}:${language || 'all'}`; const stderrBuffer: string[] = []; // Keep last 20 lines of stderr for error reporting log(`Starting job: ${typeKey} (${jobId})`); log(` Command: ${command} ${args.join(' ')}`); log(` Log file: ${logFile}`); const header = `[${new Date().toISOString()}] Starting ${typeKey}: ${command} ${args.join(' ')}\n`; logStream.write(header); const child = spawn(command, args, { cwd: '/app', env: process.env as NodeJS.ProcessEnv, stdio: ['ignore', 'pipe', 'pipe'], detached: true, // own process group so we can kill Chromium grandchildren }); runningJobs.set(typeKey, { process: child, jobId }); child.stdout?.on('data', (data: Buffer) => { logStream.write(data); }); child.stderr?.on('data', (data: Buffer) => { logStream.write(data); const lines = data.toString().split('\n').filter(l => l.trim()); stderrBuffer.push(...lines); if (stderrBuffer.length > 20) { stderrBuffer.splice(0, stderrBuffer.length - 20); } }); // Timeout — kill entire process group (including Chromium grandchildren) const timeout = setTimeout(() => { logError(`Job ${typeKey} (${jobId}) timed out after ${JOB_TIMEOUT_MS / 3600000}h, killing process group...`); logStream.write(`\n[${new Date().toISOString()}] TIMEOUT: killed after ${JOB_TIMEOUT_MS / 3600000}h\n`); killProcessGroup(child, 'SIGTERM'); setTimeout(() => { if (runningJobs.has(typeKey)) { killProcessGroup(child, 'SIGKILL'); } }, 10_000); }, JOB_TIMEOUT_MS); child.on('close', async (code: number | null) => { clearTimeout(timeout); runningJobs.delete(typeKey); const footer = `\n[${new Date().toISOString()}] Finished ${typeKey}: exit code ${code}\n`; logStream.write(footer); logStream.end(); if (code === 0) { log(`Job ${typeKey} (${jobId}) completed successfully`); } else { logError(`Job ${typeKey} (${jobId}) failed with exit code ${code}`); // Mark as failed if the script didn't do it itself try { const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } }); if (job && job.status === 'running') { const stderrTail = stderrBuffer.join('\n').slice(-500); const errorMsg = stderrTail ? `Exit code: ${code}\n${stderrTail}` : `Exit code: ${code}`; await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: 'failed', error: errorMsg, completedAt: new Date() }, }); } } catch (err) { logError(`Failed to update job status: ${err}`); } } // Advance pipeline regardless of success/failure // Failed languages are logged but pipeline continues if (isPipelineJob) onJobCompleted(); }); child.on('error', async (err: Error) => { clearTimeout(timeout); runningJobs.delete(typeKey); logStream.write(`\n[${new Date().toISOString()}] ERROR: ${err.message}\n`); logStream.end(); logError(`Job ${typeKey} (${jobId}) failed to start: ${err.message}`); try { await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: 'failed', error: err.message, completedAt: new Date() }, }); } catch (updateErr) { logError(`Failed to update job status: ${updateErr}`); } // Advance pipeline even on error if (isPipelineJob) onJobCompleted(); }); } // ─── Graceful Shutdown ─────────────────────────────────────────────────────── async function shutdown(signal: string): Promise { log(`Received ${signal}, shutting down gracefully...`); // Mark running jobs as pending so they can be resumed on restart for (const [name, { process: child, jobId }] of runningJobs) { log(`Stopping running job: ${name} (${jobId})`); killProcessGroup(child, 'SIGTERM'); // Re-queue the job for the next startup try { await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: 'pending', error: null }, }); log(`Re-queued job ${jobId} for restart`); } catch (err) { logError(`Failed to re-queue job ${jobId}: ${err}`); } } // Give jobs 10 seconds to finish, then force exit setTimeout(() => { if (runningJobs.size > 0) { logError(`Force killing ${runningJobs.size} remaining jobs`); for (const [, { process: child }] of runningJobs) { killProcessGroup(child, 'SIGKILL'); } } prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0)); }, 10_000); // If no jobs running, exit immediately if (runningJobs.size === 0) { prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0)); } } process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); // ─── Main ──────────────────────────────────────────────────────────────────── async function main(): Promise { log('=== NearestMass Scheduler Starting (Continuous Pipeline) ==='); validateEnvironment(); ensureLogsDir(); // Crash recovery: mark orphaned jobs as failed await recoverFromCrash(); // Log cleanup at 03:00 UTC cron.schedule('0 3 * * *', () => cleanOldLogs(), { timezone: 'UTC' }); log('Registered cron job: log-cleanup (0 3 * * * UTC)'); // Stale job cleanup every 6 hours cron.schedule('0 */6 * * *', () => cleanStaleJobs(), { timezone: 'UTC' }); log('Registered cron job: stale-job-cleanup (every 6h)'); // Heartbeat every hour — logs cycle state and writes heartbeat file for Docker healthcheck cron.schedule('0 * * * *', () => { const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length ? PIPELINE_GROUPS[cycleState.currentGroupIndex].name : 'none'; const jobs = runningJobs.size > 0 ? `Running: ${[...runningJobs.keys()].join(', ')}` : 'No jobs running'; const state = cycleState.waitingForCooldown ? 'cooldown' : `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`; log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`); fs.writeFileSync(path.join(LOGS_DIR, 'scheduler.heartbeat'), new Date().toISOString()); }, { timezone: 'UTC' }); log('Registered cron job: heartbeat (hourly)'); // Poll and advance pipeline every 30 seconds setInterval(() => { pollAndAdvancePipeline(); }, JOB_POLL_INTERVAL_MS); log(`Pipeline polling every ${JOB_POLL_INTERVAL_MS / 1000}s`); // Start pipeline immediately await pollAndAdvancePipeline(); log('=== Scheduler running (parallel grouped pipeline) ==='); log(`Pipeline groups (${PIPELINE_GROUPS.length}):`); for (let i = 0; i < PIPELINE_GROUPS.length; i++) { const g = PIPELINE_GROUPS[i]; const phaseNames = g.phases.map(p => p.name).join(', '); log(` ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`); } log(`Cooldown between cycles: ${CYCLE_COOLDOWN_MS / (60 * 60 * 1000)}h`); log(`Job timeout: ${JOB_TIMEOUT_MS / (60 * 60 * 1000)}h`); } main();