Files
ScraperControl/scripts/populate-chromadb.ts

198 lines
6.2 KiB
TypeScript
Raw Normal View History

/**
* Bulk-populate ChromaDB collections from the database.
*
* Usage:
* npx tsx scripts/populate-chromadb.ts --collection church_identity
* npx tsx scripts/populate-chromadb.ts --collection page_classification
* npx tsx scripts/populate-chromadb.ts --all
* npx tsx scripts/populate-chromadb.ts --all --batch-size 50 --limit 1000
*/
import { Pool } from 'pg';
import { PrismaPg } from '@prisma/adapter-pg';
import { PrismaClient } from '@prisma/client';
import { getCollection, COLLECTION_NAMES, CollectionName } from '../src/chromadb/collections';
import { embed } from '../src/chromadb/embeddings';
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const adapter = new PrismaPg(pool);
const prisma = new PrismaClient({ adapter });
const args = process.argv.slice(2);
const collectionArg = args.includes('--collection')
? args[args.indexOf('--collection') + 1]
: null;
const populateAll = args.includes('--all');
const batchSize = args.includes('--batch-size')
? parseInt(args[args.indexOf('--batch-size') + 1])
: 100;
const limit = args.includes('--limit')
? parseInt(args[args.indexOf('--limit') + 1])
: 0;
async function populateChurchIdentity() {
console.log('\n=== Populating church_identity ===');
const collection = await getCollection(COLLECTION_NAMES.CHURCH_IDENTITY);
const totalCount = await prisma.church.count();
const maxItems = limit > 0 ? Math.min(limit, totalCount) : totalCount;
console.log(`Total churches: ${totalCount}, processing: ${maxItems}`);
let processed = 0;
let cursor: string | undefined = undefined;
while (processed < maxItems) {
const currentBatch = Math.min(batchSize, maxItems - processed);
const churches = await prisma.church.findMany({
take: currentBatch,
...(cursor ? { skip: 1, cursor: { id: cursor } } : {}),
orderBy: { id: 'asc' },
select: {
id: true,
name: true,
address: true,
city: true,
country: true,
source: true,
latitude: true,
longitude: true,
},
});
if (churches.length === 0) break;
const documents = churches.map(
(c) => `${c.name} ${c.address || ''} ${c.city || ''} ${c.country}`.trim()
);
const embeddings = await embed(documents);
await collection.upsert({
ids: churches.map((c) => `church-${c.id}`),
embeddings,
documents,
metadatas: churches.map((c) => ({
churchId: c.id,
country: c.country,
source: c.source,
lat: c.latitude,
lng: c.longitude,
})),
});
processed += churches.length;
cursor = churches[churches.length - 1].id;
console.log(` Processed ${processed}/${maxItems}`);
}
console.log(` Done: ${processed} churches indexed`);
}
async function populatePageClassification() {
console.log('\n=== Populating page_classification ===');
const collection = await getCollection(COLLECTION_NAMES.PAGE_CLASSIFICATION);
// Index churches that have been successfully scraped (have mass schedules)
const totalCount = await prisma.church.count({
where: {
lastScrapedAt: { not: null },
massSchedules: { some: { isActive: true } },
},
});
const maxItems = limit > 0 ? Math.min(limit, totalCount) : totalCount;
console.log(`Scraped churches with schedules: ${totalCount}, processing: ${maxItems}`);
let processed = 0;
let cursor: string | undefined = undefined;
while (processed < maxItems) {
const currentBatch = Math.min(batchSize, maxItems - processed);
const churches = await prisma.church.findMany({
take: currentBatch,
...(cursor ? { skip: 1, cursor: { id: cursor } } : {}),
where: {
lastScrapedAt: { not: null },
massSchedules: { some: { isActive: true } },
},
orderBy: { id: 'asc' },
select: {
id: true,
massScheduleUrl: true,
website: true,
websiteLanguage: true,
scraperConfig: { select: { rawHtml: true } },
},
});
if (churches.length === 0) break;
// Use stored raw HTML (truncated) as the document
const validChurches = churches.filter((c) => c.scraperConfig?.rawHtml);
if (validChurches.length > 0) {
const documents = validChurches.map(
(c) => (c.scraperConfig?.rawHtml || '').slice(0, 2000)
);
const embeddings = await embed(documents);
await collection.upsert({
ids: validChurches.map((c) => `page-${c.id}`),
embeddings,
documents,
metadatas: validChurches.map((c) => ({
url: c.massScheduleUrl || c.website || '',
isMassSchedulePage: true,
language: c.websiteLanguage || 'unknown',
})),
});
}
processed += churches.length;
cursor = churches[churches.length - 1].id;
console.log(` Processed ${processed}/${maxItems} (${validChurches.length} had raw HTML)`);
}
console.log(` Done: ${processed} pages classified`);
}
async function main() {
try {
if (!populateAll && !collectionArg) {
console.log('Usage:');
console.log(' npx tsx scripts/populate-chromadb.ts --collection church_identity');
console.log(' npx tsx scripts/populate-chromadb.ts --collection page_classification');
console.log(' npx tsx scripts/populate-chromadb.ts --all');
console.log(' npx tsx scripts/populate-chromadb.ts --all --batch-size 50 --limit 1000');
process.exit(0);
}
const collectionsToPopulate: CollectionName[] = populateAll
? [COLLECTION_NAMES.CHURCH_IDENTITY, COLLECTION_NAMES.PAGE_CLASSIFICATION]
: [collectionArg as CollectionName];
for (const name of collectionsToPopulate) {
switch (name) {
case COLLECTION_NAMES.CHURCH_IDENTITY:
await populateChurchIdentity();
break;
case COLLECTION_NAMES.PAGE_CLASSIFICATION:
await populatePageClassification();
break;
default:
console.log(`Collection '${name}' does not have a populate function yet.`);
console.log('Available: church_identity, page_classification');
}
}
console.log('\nPopulation complete!');
} catch (error) {
console.error('Error:', error);
process.exit(1);
} finally {
await prisma.$disconnect();
await pool.end();
}
}
main();