Files
ScraperControl/scripts/scheduler.ts
2026-03-28 08:50:19 -04:00

783 lines
30 KiB
TypeScript

import cron from 'node-cron';
import { spawn, ChildProcess } from 'child_process';
import fs from 'fs';
import path from 'path';
import dotenv from 'dotenv';
dotenv.config({ path: path.resolve(process.cwd(), '.env') });
import { Pool } from 'pg';
import { PrismaPg } from '@prisma/adapter-pg';
import { PrismaClient } from '@prisma/client';
// Fresh DB connection for scheduler
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const adapter = new PrismaPg(pool);
const prisma = new PrismaClient({ adapter });
// ─── Configuration ───────────────────────────────────────────────────────────
const LOGS_DIR = '/app/logs';
const LOG_RETENTION_DAYS = 30;
const JOB_POLL_INTERVAL_MS = 30_000; // Check pipeline state every 30 seconds
const CYCLE_COOLDOWN_MS = 6 * 60 * 60 * 1000; // 6 hours between cycles
const JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours max per job
const STALE_JOB_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24 hours — mark non-pipeline running jobs as stale
// ─── Pipeline Phases ────────────────────────────────────────────────────────
interface PipelinePhase {
name: string;
type: string;
language?: string;
config: Record<string, unknown>;
}
interface PipelineGroup {
name: string;
phases: PipelinePhase[];
mode: 'sequential' | 'parallel';
}
const PIPELINE_GROUPS: PipelineGroup[] = [
{
name: 'imports',
mode: 'sequential',
phases: [
{ name: 'osm-import-p1', type: 'osm-import', config: { priority: 1 } },
{ name: 'gcatholic-import', type: 'gcatholic-import', config: { delay: 2000 } },
{ name: 'orarimesse-import', type: 'orarimesse-import', config: {} },
{ name: 'mass-schedules-ph-import', type: 'mass-schedules-ph-import', config: {} },
{ name: 'philmass-import', type: 'philmass-import', config: {} },
{ name: 'horariosmisas-import', type: 'horariosmisas-import', config: {} },
{ name: 'msze-info-import', type: 'msze-info-import', config: {} },
{ name: 'weekdaymasses-import', type: 'weekdaymasses-import', config: {} },
{ name: 'messesinfo-import', type: 'messesinfo-import', config: {} },
{ name: 'bohosluzby-import', type: 'bohosluzby-import', config: {} },
{ name: 'miserend-import', type: 'miserend-import', config: {} },
{ name: 'kerknet-import', type: 'kerknet-import', config: {} },
{ name: 'gottesdienstzeiten-import', type: 'gottesdienstzeiten-import', config: {} },
{ name: 'masstimes-api-import', type: 'masstimes-api-import', config: {} },
],
},
{
name: 'scrapers-batch-1',
mode: 'parallel',
phases: [
{ name: 'scraper-english', type: 'scraper', language: 'english', config: { allMode: true, maxFailures: 10, language: 'english' } },
{ name: 'scraper-french', type: 'scraper', language: 'french', config: { allMode: true, maxFailures: 10, language: 'french' } },
{ name: 'scraper-german', type: 'scraper', language: 'german', config: { allMode: true, maxFailures: 10, language: 'german' } },
],
},
{
name: 'scrapers-batch-2',
mode: 'parallel',
phases: [
{ name: 'scraper-polish', type: 'scraper', language: 'polish', config: { allMode: true, maxFailures: 10, language: 'polish' } },
{ name: 'scraper-spanish', type: 'scraper', language: 'spanish', config: { allMode: true, maxFailures: 10, language: 'spanish' } },
{ name: 'scraper-italian', type: 'scraper', language: 'italian', config: { allMode: true, maxFailures: 10, language: 'italian' } },
{ name: 'scraper-portuguese', type: 'scraper', language: 'portuguese', config: { allMode: true, maxFailures: 10, language: 'portuguese' } },
{ name: 'scraper-czech', type: 'scraper', language: 'czech', config: { allMode: true, maxFailures: 10, language: 'czech' } },
],
},
{
name: 'scrapers-batch-3',
mode: 'parallel',
phases: [
{ name: 'scraper-dutch', type: 'scraper', language: 'dutch', config: { allMode: true, maxFailures: 10, language: 'dutch' } },
{ name: 'scraper-hungarian', type: 'scraper', language: 'hungarian', config: { allMode: true, maxFailures: 10, language: 'hungarian' } },
{ name: 'scraper-generic', type: 'scraper', language: 'generic', config: { allMode: true, maxFailures: 10, language: 'generic' } },
],
},
];
// ─── Cycle State ────────────────────────────────────────────────────────────
interface CycleState {
currentGroupIndex: number;
currentSequentialPhaseIndex: number;
cycleNumber: number;
cycleStartedAt: Date | null;
lastCycleCompletedAt: Date | null;
waitingForCooldown: boolean;
activeGroupJobs: number;
}
const cycleState: CycleState = {
currentGroupIndex: 0,
currentSequentialPhaseIndex: 0,
cycleNumber: 0,
cycleStartedAt: null,
lastCycleCompletedAt: null,
waitingForCooldown: false,
activeGroupJobs: 0,
};
// Map job type+language to script command
function getJobCommand(type: string, language?: string | null, config?: Record<string, unknown> | null): { command: string; args: string[] } {
const limit = (config?.limit as number) || undefined;
switch (type) {
case 'scraper': {
const args = ['tsx', 'scripts/scrape-churches.ts', '--all'];
if (language) args.push('--language', language);
if (config?.maxFailures) args.push('--max-failures', String(config.maxFailures));
return { command: 'npx', args };
}
case 'freesearch-enrichment': {
const args = ['tsx', 'scripts/enrich-with-freesearch.ts'];
if (config?.continuous) args.push('--continuous');
if (config?.reSearch) args.push('--re-search');
if (limit) args.push('--limit', String(limit));
if (config?.country) args.push('--country', String(config.country));
return { command: 'npx', args };
}
case 'reverse-geocode-enrichment': {
const args = ['tsx', 'scripts/enrich-with-reverse-geocode.ts'];
if (config?.continuous) args.push('--continuous');
if (limit) args.push('--limit', String(limit));
if (config?.country) args.push('--country', String(config.country));
return { command: 'npx', args };
}
case 'match-search-results': {
const args = ['tsx', 'scripts/match-search-results.ts'];
if (limit) args.push('--limit', String(limit));
if (config?.country) args.push('--country', String(config.country));
if (config?.threshold) args.push('--threshold', String(config.threshold));
return { command: 'npx', args };
}
case 'diocese-directory': {
const args = ['tsx', 'scripts/scrape-diocese-directory.ts', '--all'];
return { command: 'npx', args };
}
case 'wikidata-enrichment': {
return { command: 'npx', args: ['tsx', 'scripts/enrich-with-wikidata.ts'] };
}
case 'osm-import': {
const args = ['tsx', 'scripts/import-osm-churches.ts'];
if (config?.priority) args.push('--priority', String(config.priority));
else args.push('--all');
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'baidu-import': {
const args = ['tsx', 'scripts/import-baidu-churches.ts'];
if (config?.resumeFromCell) args.push('--resume-from-cell', String(config.resumeFromCell));
return { command: 'npx', args };
}
case 'gcatholic-import': {
const args = ['tsx', 'scripts/import-gcatholic.ts', '--all'];
if (config?.country) args.push('--country', String(config.country));
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
if (config?.delay) args.push('--delay', String(config.delay));
return { command: 'npx', args };
}
case 'orarimesse-import': {
const args = ['tsx', 'scripts/import-orarimesse.ts', '--all'];
if (config?.diocese) args.push('--diocese', String(config.diocese));
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'mass-schedules-ph-import': {
const args = ['tsx', 'scripts/import-mass-schedules-ph.ts', '--all'];
if (config?.churchId) args.push('--church-id', String(config.churchId));
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'philmass-import': {
const args = ['tsx', 'scripts/import-philmass.ts', '--all'];
if (config?.province) args.push('--province', String(config.province));
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'horariosmisas-import': {
const args = ['tsx', 'scripts/import-horariosmisas.ts', '--all', '--geocode'];
if (config?.province) args.push('--province', String(config.province));
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'msze-info-import': {
const args = ['tsx', 'scripts/import-msze-info.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'weekdaymasses-import': {
const args = ['tsx', 'scripts/import-weekdaymasses.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'messesinfo-import': {
const args = ['tsx', 'scripts/import-messesinfo.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'bohosluzby-import': {
const args = ['tsx', 'scripts/import-bohosluzby.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'miserend-import': {
const args = ['tsx', 'scripts/import-miserend.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'kerknet-import': {
const args = ['tsx', 'scripts/import-kerknet.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'gottesdienstzeiten-import': {
const args = ['tsx', 'scripts/import-gottesdienstzeiten.ts', '--all'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
return { command: 'npx', args };
}
case 'masstimes-api-import': {
const args = ['tsx', 'scripts/import-masstimes-api.ts', '--all', '--skip-us'];
if (config?.resumeFrom) args.push('--resume-from', String(config.resumeFrom));
if (config?.region) args.splice(2, 1, '--region', String(config.region)); // replace --all with --region
return { command: 'npx', args };
}
default:
throw new Error(`Unknown job type: ${type}`);
}
}
// ─── State ───────────────────────────────────────────────────────────────────
const runningJobs = new Map<string, { process: ChildProcess; jobId: string }>();
/** Kill entire process group (child + all descendants like Chromium).
* Falls back to child.kill() if process group kill fails. */
function killProcessGroup(child: ChildProcess, signal: NodeJS.Signals = 'SIGTERM'): void {
try {
if (child.pid) {
process.kill(-child.pid, signal); // negative PID = kill process group
}
} catch (err: unknown) {
const code = (err as NodeJS.ErrnoException).code;
if (code !== 'ESRCH') {
logError(`killProcessGroup(-${child.pid}, ${signal}) failed: ${err}`);
}
// Process group already dead, try direct kill as fallback
try { child.kill(signal); } catch { /* already dead */ }
}
}
// ─── Logging ─────────────────────────────────────────────────────────────────
function log(message: string): void {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] ${message}`);
}
function logError(message: string): void {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ERROR: ${message}`);
}
// ─── Environment Validation ──────────────────────────────────────────────────
function validateEnvironment(): void {
if (!process.env.DATABASE_URL) {
logError('Missing required environment variable: DATABASE_URL');
process.exit(1);
}
log('Environment variables validated');
log(` DATABASE_URL: ${process.env.DATABASE_URL?.substring(0, 30)}...`);
}
// ─── Log File Management ─────────────────────────────────────────────────────
function ensureLogsDir(): void {
if (!fs.existsSync(LOGS_DIR)) {
fs.mkdirSync(LOGS_DIR, { recursive: true });
log(`Created logs directory: ${LOGS_DIR}`);
}
}
function getLogFilePath(jobType: string): string {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
return path.join(LOGS_DIR, `${jobType}-${timestamp}.log`);
}
function cleanOldLogs(): void {
log('Cleaning old log files...');
const cutoff = Date.now() - LOG_RETENTION_DAYS * 24 * 60 * 60 * 1000;
let cleaned = 0;
try {
const files = fs.readdirSync(LOGS_DIR);
for (const file of files) {
if (!file.endsWith('.log')) continue;
const filePath = path.join(LOGS_DIR, file);
const stat = fs.statSync(filePath);
if (stat.mtimeMs < cutoff) {
fs.unlinkSync(filePath);
cleaned++;
}
}
log(`Cleaned ${cleaned} log files older than ${LOG_RETENTION_DAYS} days`);
} catch (err) {
logError(`Failed to clean logs: ${err}`);
}
}
// ─── Crash Recovery ─────────────────────────────────────────────────────────
async function recoverFromCrash(): Promise<void> {
log('Running crash recovery...');
// Re-queue orphaned 'running' pipeline jobs as 'pending' so they get retried
const requeued = await prisma.backgroundJob.updateMany({
where: {
status: 'running',
config: { path: ['pipelineManaged'], equals: true },
},
data: { status: 'pending', error: null, completedAt: null },
});
if (requeued.count > 0) {
log(`Re-queued ${requeued.count} pipeline jobs interrupted by restart`);
}
// Mark non-pipeline running jobs as failed (manual jobs can't be safely resumed)
const orphanedRunning = await prisma.backgroundJob.updateMany({
where: {
status: 'running',
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() },
});
if (orphanedRunning.count > 0) {
log(`Marked ${orphanedRunning.count} orphaned manual jobs as failed`);
}
// Keep stale 'pending' pipeline jobs — they'll be picked up by the pipeline
// Only cancel non-pipeline stale pending jobs
const stalePending = await prisma.backgroundJob.updateMany({
where: {
status: 'pending',
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
data: { status: 'failed', error: 'Scheduler restarted', completedAt: new Date() },
});
if (stalePending.count > 0) {
log(`Cancelled ${stalePending.count} stale pending manual jobs`);
}
// Resume pipeline from the last incomplete phase
const lastRunningPipelineJob = await prisma.backgroundJob.findFirst({
where: {
status: 'pending',
config: { path: ['pipelineManaged'], equals: true },
},
orderBy: { createdAt: 'desc' },
});
if (lastRunningPipelineJob) {
for (let gi = 0; gi < PIPELINE_GROUPS.length; gi++) {
const group = PIPELINE_GROUPS[gi];
const phaseIdx = group.phases.findIndex(
p => p.type === lastRunningPipelineJob.type &&
(p.language || null) === (lastRunningPipelineJob.language || null)
);
if (phaseIdx >= 0) {
cycleState.currentGroupIndex = gi;
cycleState.currentSequentialPhaseIndex = group.mode === 'sequential' ? phaseIdx : 0;
log(`Resuming pipeline from group ${gi + 1}: ${group.name}`);
break;
}
}
}
log('Crash recovery complete');
}
async function cleanStaleJobs(): Promise<void> {
const cutoff = new Date(Date.now() - STALE_JOB_TIMEOUT_MS);
const stale = await prisma.backgroundJob.updateMany({
where: {
status: 'running',
startedAt: { lt: cutoff },
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
data: {
status: 'failed',
error: 'Stale: exceeded 24h runtime without completion',
completedAt: new Date(),
},
});
if (stale.count > 0) {
log(`Cleaned ${stale.count} stale non-pipeline job(s) running >24h`);
}
}
// ─── Database-Driven Job Queue ───────────────────────────────────────────────
async function createPendingJob(type: string, language?: string, config?: Record<string, unknown>): Promise<string> {
// Don't create if a job of this type is already pending or running
const existing = await prisma.backgroundJob.findFirst({
where: {
type,
language: language || null,
status: { in: ['pending', 'running'] },
},
});
if (existing) {
log(`Skipping ${type}${language ? `:${language}` : ''} — already ${existing.status} (${existing.id})`);
return existing.id;
}
const job = await prisma.backgroundJob.create({
data: {
type,
language: language || null,
status: 'pending',
config: config || null,
},
});
log(`Created pending job: ${type}${language ? `:${language}` : ''} (${job.id})`);
return job.id;
}
// ─── Continuous Pipeline Logic ──────────────────────────────────────────────
async function pollAndAdvancePipeline(): Promise<void> {
try {
// 1. Check for manual pending jobs from admin API (priority over pipeline)
if (runningJobs.size === 0) {
const manualJob = await prisma.backgroundJob.findFirst({
where: {
status: 'pending',
NOT: { config: { path: ['pipelineManaged'], equals: true } },
},
orderBy: { createdAt: 'asc' },
});
if (manualJob) {
log(`Found manual job: ${manualJob.type}${manualJob.language ? `:${manualJob.language}` : ''} (${manualJob.id})`);
await startJobProcess(
manualJob.id,
manualJob.type,
manualJob.language,
manualJob.config as Record<string, unknown> | null
);
return;
}
}
// 2. If jobs are still running for the current group, wait
if (cycleState.activeGroupJobs > 0) {
return;
}
// 3. If in cooldown, check if expired
if (cycleState.waitingForCooldown) {
if (cycleState.lastCycleCompletedAt) {
const elapsed = Date.now() - cycleState.lastCycleCompletedAt.getTime();
if (elapsed < CYCLE_COOLDOWN_MS) {
const remaining = Math.round((CYCLE_COOLDOWN_MS - elapsed) / 60_000);
if (remaining % 30 === 0 || remaining <= 5) {
log(`Cooldown: ${remaining} minutes remaining before next cycle`);
}
return;
}
}
cycleState.waitingForCooldown = false;
cycleState.currentGroupIndex = 0;
cycleState.currentSequentialPhaseIndex = 0;
log('Cooldown expired, starting new cycle');
}
// 4. If past the last group, complete the cycle
if (cycleState.currentGroupIndex >= PIPELINE_GROUPS.length) {
cycleState.cycleNumber++;
cycleState.lastCycleCompletedAt = new Date();
cycleState.waitingForCooldown = true;
const cooldownHours = CYCLE_COOLDOWN_MS / (60 * 60 * 1000);
log(`=== Cycle ${cycleState.cycleNumber} complete! Entering ${cooldownHours}h cooldown ===`);
return;
}
// 5. Start the current group
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (cycleState.currentGroupIndex === 0 && cycleState.currentSequentialPhaseIndex === 0 && !cycleState.cycleStartedAt) {
cycleState.cycleStartedAt = new Date();
log(`=== Starting cycle ${cycleState.cycleNumber + 1} ===`);
}
if (group.mode === 'parallel') {
// Launch all phases in the group concurrently
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (parallel, ${group.phases.length} jobs)`);
cycleState.activeGroupJobs = group.phases.length;
for (const phase of group.phases) {
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true);
}
} else {
// Sequential: run one phase at a time within the group
const phaseIndex = cycleState.currentSequentialPhaseIndex;
if (phaseIndex >= group.phases.length) {
// All phases in this sequential group are done
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
return; // Will pick up next group on next poll
}
const phase = group.phases[phaseIndex];
log(`Pipeline group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length}: ${group.name} (sequential ${phaseIndex + 1}/${group.phases.length}: ${phase.name})`);
cycleState.activeGroupJobs = 1;
const jobId = await createPendingJob(
phase.type,
phase.language,
{ ...phase.config, pipelineManaged: true }
);
await startJobProcess(jobId, phase.type, phase.language || null, phase.config, true);
}
} catch (err) {
logError(`Error in pipeline: ${err}`);
}
}
function onJobCompleted(): void {
cycleState.activeGroupJobs--;
if (cycleState.activeGroupJobs <= 0) {
cycleState.activeGroupJobs = 0;
const group = PIPELINE_GROUPS[cycleState.currentGroupIndex];
if (group?.mode === 'sequential') {
cycleState.currentSequentialPhaseIndex++;
if (cycleState.currentSequentialPhaseIndex < group.phases.length) {
return; // Don't advance group yet
}
}
cycleState.currentGroupIndex++;
cycleState.currentSequentialPhaseIndex = 0;
log(`Group "${group?.name}" complete, advancing to group ${cycleState.currentGroupIndex + 1}`);
}
}
async function startJobProcess(
jobId: string,
type: string,
language: string | null,
config: Record<string, unknown> | null,
isPipelineJob = false
): Promise<void> {
const { command, args } = getJobCommand(type, language, config);
// Add --job-id to args so the script tracks this job
args.push('--job-id', jobId);
const logFile = getLogFilePath(type);
const logStream = fs.createWriteStream(logFile, { flags: 'a' });
const typeKey = `${type}:${language || 'all'}`;
const stderrBuffer: string[] = []; // Keep last 20 lines of stderr for error reporting
log(`Starting job: ${typeKey} (${jobId})`);
log(` Command: ${command} ${args.join(' ')}`);
log(` Log file: ${logFile}`);
const header = `[${new Date().toISOString()}] Starting ${typeKey}: ${command} ${args.join(' ')}\n`;
logStream.write(header);
const child = spawn(command, args, {
cwd: '/app',
env: process.env as NodeJS.ProcessEnv,
stdio: ['ignore', 'pipe', 'pipe'],
detached: true, // own process group so we can kill Chromium grandchildren
});
runningJobs.set(typeKey, { process: child, jobId });
child.stdout?.on('data', (data: Buffer) => {
logStream.write(data);
});
child.stderr?.on('data', (data: Buffer) => {
logStream.write(data);
const lines = data.toString().split('\n').filter(l => l.trim());
stderrBuffer.push(...lines);
if (stderrBuffer.length > 20) {
stderrBuffer.splice(0, stderrBuffer.length - 20);
}
});
// Timeout — kill entire process group (including Chromium grandchildren)
const timeout = setTimeout(() => {
logError(`Job ${typeKey} (${jobId}) timed out after ${JOB_TIMEOUT_MS / 3600000}h, killing process group...`);
logStream.write(`\n[${new Date().toISOString()}] TIMEOUT: killed after ${JOB_TIMEOUT_MS / 3600000}h\n`);
killProcessGroup(child, 'SIGTERM');
setTimeout(() => {
if (runningJobs.has(typeKey)) {
killProcessGroup(child, 'SIGKILL');
}
}, 10_000);
}, JOB_TIMEOUT_MS);
child.on('close', async (code: number | null) => {
clearTimeout(timeout);
runningJobs.delete(typeKey);
const footer = `\n[${new Date().toISOString()}] Finished ${typeKey}: exit code ${code}\n`;
logStream.write(footer);
logStream.end();
if (code === 0) {
log(`Job ${typeKey} (${jobId}) completed successfully`);
} else {
logError(`Job ${typeKey} (${jobId}) failed with exit code ${code}`);
// Mark as failed if the script didn't do it itself
try {
const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
if (job && job.status === 'running') {
const stderrTail = stderrBuffer.join('\n').slice(-500);
const errorMsg = stderrTail
? `Exit code: ${code}\n${stderrTail}`
: `Exit code: ${code}`;
await prisma.backgroundJob.update({
where: { id: jobId },
data: { status: 'failed', error: errorMsg, completedAt: new Date() },
});
}
} catch (err) {
logError(`Failed to update job status: ${err}`);
}
}
// Advance pipeline regardless of success/failure
// Failed languages are logged but pipeline continues
if (isPipelineJob) onJobCompleted();
});
child.on('error', async (err: Error) => {
clearTimeout(timeout);
runningJobs.delete(typeKey);
logStream.write(`\n[${new Date().toISOString()}] ERROR: ${err.message}\n`);
logStream.end();
logError(`Job ${typeKey} (${jobId}) failed to start: ${err.message}`);
try {
await prisma.backgroundJob.update({
where: { id: jobId },
data: { status: 'failed', error: err.message, completedAt: new Date() },
});
} catch (updateErr) {
logError(`Failed to update job status: ${updateErr}`);
}
// Advance pipeline even on error
if (isPipelineJob) onJobCompleted();
});
}
// ─── Graceful Shutdown ───────────────────────────────────────────────────────
async function shutdown(signal: string): Promise<void> {
log(`Received ${signal}, shutting down gracefully...`);
// Mark running jobs as pending so they can be resumed on restart
for (const [name, { process: child, jobId }] of runningJobs) {
log(`Stopping running job: ${name} (${jobId})`);
killProcessGroup(child, 'SIGTERM');
// Re-queue the job for the next startup
try {
await prisma.backgroundJob.update({
where: { id: jobId },
data: { status: 'pending', error: null },
});
log(`Re-queued job ${jobId} for restart`);
} catch (err) {
logError(`Failed to re-queue job ${jobId}: ${err}`);
}
}
// Give jobs 10 seconds to finish, then force exit
setTimeout(() => {
if (runningJobs.size > 0) {
logError(`Force killing ${runningJobs.size} remaining jobs`);
for (const [, { process: child }] of runningJobs) {
killProcessGroup(child, 'SIGKILL');
}
}
prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0));
}, 10_000);
// If no jobs running, exit immediately
if (runningJobs.size === 0) {
prisma.$disconnect().then(() => pool.end()).then(() => process.exit(0));
}
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// ─── Main ────────────────────────────────────────────────────────────────────
async function main(): Promise<void> {
log('=== NearestMass Scheduler Starting (Continuous Pipeline) ===');
validateEnvironment();
ensureLogsDir();
// Crash recovery: mark orphaned jobs as failed
await recoverFromCrash();
// Log cleanup at 03:00 UTC
cron.schedule('0 3 * * *', () => cleanOldLogs(), { timezone: 'UTC' });
log('Registered cron job: log-cleanup (0 3 * * * UTC)');
// Stale job cleanup every 6 hours
cron.schedule('0 */6 * * *', () => cleanStaleJobs(), { timezone: 'UTC' });
log('Registered cron job: stale-job-cleanup (every 6h)');
// Heartbeat every hour — logs cycle state and writes heartbeat file for Docker healthcheck
cron.schedule('0 * * * *', () => {
const currentGroup = cycleState.currentGroupIndex < PIPELINE_GROUPS.length
? PIPELINE_GROUPS[cycleState.currentGroupIndex].name
: 'none';
const jobs = runningJobs.size > 0
? `Running: ${[...runningJobs.keys()].join(', ')}`
: 'No jobs running';
const state = cycleState.waitingForCooldown
? 'cooldown'
: `group ${cycleState.currentGroupIndex + 1}/${PIPELINE_GROUPS.length} (${currentGroup})`;
log(`Heartbeat: Cycle ${cycleState.cycleNumber + 1}, ${state}. ${jobs}`);
fs.writeFileSync(path.join(LOGS_DIR, 'scheduler.heartbeat'), new Date().toISOString());
}, { timezone: 'UTC' });
log('Registered cron job: heartbeat (hourly)');
// Poll and advance pipeline every 30 seconds
setInterval(() => {
pollAndAdvancePipeline();
}, JOB_POLL_INTERVAL_MS);
log(`Pipeline polling every ${JOB_POLL_INTERVAL_MS / 1000}s`);
// Start pipeline immediately
await pollAndAdvancePipeline();
log('=== Scheduler running (parallel grouped pipeline) ===');
log(`Pipeline groups (${PIPELINE_GROUPS.length}):`);
for (let i = 0; i < PIPELINE_GROUPS.length; i++) {
const g = PIPELINE_GROUPS[i];
const phaseNames = g.phases.map(p => p.name).join(', ');
log(` ${i + 1}. ${g.name} [${g.mode}]: ${phaseNames}`);
}
log(`Cooldown between cycles: ${CYCLE_COOLDOWN_MS / (60 * 60 * 1000)}h`);
log(`Job timeout: ${JOB_TIMEOUT_MS / (60 * 60 * 1000)}h`);
}
main();