/** * Bulk Processing Service for Redirect Intelligence v2 * * Manages CSV upload, parsing, and bulk redirect analysis jobs */ import fs from 'fs/promises'; import path from 'path'; import { Queue, Job } from 'bullmq'; import IORedis from 'ioredis'; import csvParser from 'csv-parser'; import { createObjectCsvWriter } from 'csv-writer'; import { z } from 'zod'; import { logger } from '../lib/logger'; import { prisma } from '../lib/prisma'; // Job types and data structures export interface BulkTrackingJob { id: string; userId: string; organizationId?: string; projectId?: string; urls: Array<{ url: string; label?: string; metadata?: Record; }>; options: { method: 'GET' | 'POST' | 'HEAD'; userAgent?: string; maxHops: number; timeout: number; enableSSLAnalysis: boolean; enableSEOAnalysis: boolean; enableSecurityAnalysis: boolean; headers?: Record; }; status: 'PENDING' | 'QUEUED' | 'RUNNING' | 'COMPLETED' | 'FAILED' | 'CANCELLED' | 'ERROR'; progress: { total: number; processed: number; successful: number; failed: number; }; results?: Array<{ url: string; label?: string; checkId?: string; status: 'success' | 'failed'; error?: string; timing: { startedAt: Date; finishedAt?: Date; durationMs?: number; }; }>; createdAt: Date; startedAt?: Date; finishedAt?: Date; estimatedCompletionAt?: Date; } // Validation schemas const BulkJobCreateSchema = z.object({ projectId: z.string().optional(), urls: z.array(z.object({ url: z.string().url('Invalid URL format'), label: z.string().optional(), metadata: z.record(z.any()).optional(), })).min(1, 'At least one URL is required').max(1000, 'Maximum 1000 URLs per job'), options: z.object({ method: z.enum(['GET', 'POST', 'HEAD']).default('GET'), userAgent: z.string().optional(), maxHops: z.number().min(1).max(20).default(10), timeout: z.number().min(1000).max(30000).default(15000), enableSSLAnalysis: z.boolean().default(true), enableSEOAnalysis: z.boolean().default(true), enableSecurityAnalysis: z.boolean().default(true), headers: z.record(z.string()).optional(), }).default({}), }); const CsvRowSchema = z.object({ url: z.string().min(1, 'URL is required'), label: z.string().optional(), method: z.enum(['GET', 'POST', 'HEAD']).optional(), user_agent: z.string().optional(), max_hops: z.string().optional(), timeout: z.string().optional(), enable_ssl: z.string().optional(), enable_seo: z.string().optional(), enable_security: z.string().optional(), }); export type BulkJobCreateRequest = z.infer; export type CsvRow = z.infer; export class BulkProcessorService { private redis: IORedis; private trackingQueue: Queue; private readonly uploadsDir: string; constructor() { // TEMPORARY: Disable Redis for bulk processing to avoid hangs // this.redis = new IORedis({ // host: process.env.REDIS_HOST || 'localhost', // port: parseInt(process.env.REDIS_PORT || '6379'), // enableReadyCheck: false, // maxRetriesPerRequest: null, // }); // this.trackingQueue = new Queue('bulk-tracking', { // connection: this.redis, // defaultJobOptions: { // removeOnComplete: 100, // Keep last 100 completed jobs // removeOnFail: 50, // Keep last 50 failed jobs // attempts: 3, // backoff: { // type: 'exponential', // delay: 2000, // }, // }, // }); this.uploadsDir = path.join(process.cwd(), 'uploads'); this.ensureUploadsDirectory(); } /** * Ensure uploads directory exists */ private async ensureUploadsDirectory(): Promise { try { await fs.mkdir(this.uploadsDir, { recursive: true }); } catch (error) { logger.error('Failed to create uploads directory:', error); } } /** * Parse CSV file and extract URL data */ async parseCsvFile(filePath: string): Promise; }>> { const results: Array<{ url: string; label?: string; metadata?: Record }> = []; return new Promise((resolve, reject) => { const stream = require('fs').createReadStream(filePath) .pipe(csvParser()) .on('data', (row: any) => { try { // Validate and parse each row const validatedRow = CsvRowSchema.parse(row); // Normalize URL let url = validatedRow.url.trim(); if (!url.startsWith('http://') && !url.startsWith('https://')) { url = `https://${url}`; } const parsedRow = { url, label: validatedRow.label?.trim() || undefined, metadata: { // Store additional CSV columns as metadata method: validatedRow.method || 'GET', userAgent: validatedRow.user_agent?.trim(), maxHops: validatedRow.max_hops ? parseInt(validatedRow.max_hops) : undefined, timeout: validatedRow.timeout ? parseInt(validatedRow.timeout) : undefined, enableSSL: this.parseBoolean(validatedRow.enable_ssl), enableSEO: this.parseBoolean(validatedRow.enable_seo), enableSecurity: this.parseBoolean(validatedRow.enable_security), }, }; results.push(parsedRow); } catch (error) { logger.warn('Invalid CSV row skipped:', { row, error: error instanceof Error ? error.message : 'Unknown error' }); } }) .on('end', () => { logger.info(`CSV parsing completed: ${results.length} valid URLs found`); resolve(results); }) .on('error', (error: Error) => { logger.error('CSV parsing failed:', error); reject(error); }); }); } /** * Parse boolean values from CSV */ private parseBoolean(value?: string): boolean | undefined { if (!value) return undefined; const normalized = value.toLowerCase().trim(); if (normalized === 'true' || normalized === '1' || normalized === 'yes') return true; if (normalized === 'false' || normalized === '0' || normalized === 'no') return false; return undefined; } /** * Create a new bulk tracking job */ async createBulkJob( userId: string, organizationId: string | undefined, jobData: BulkJobCreateRequest, filePath?: string ): Promise { try { // Validate input const validatedData = BulkJobCreateSchema.parse(jobData); const jobId = `bulk_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; // Create job record in database const bulkJob = await prisma.bulkJob.create({ data: { id: jobId, userId, organizationId: organizationId || null, projectId: validatedData.projectId || 'default-project', uploadPath: filePath || 'api', status: 'PENDING' as any, totalUrls: validatedData.urls.length, processedUrls: 0, successfulUrls: 0, failedUrls: 0, configJson: JSON.stringify(validatedData.options), urlsJson: JSON.stringify(validatedData.urls), } as any, }); // Queue the job for processing await this.trackingQueue.add( 'process-bulk-tracking', { jobId, userId, organizationId, urls: validatedData.urls, options: validatedData.options, }, { jobId, delay: 0, // Start immediately } ); const job: BulkTrackingJob = { id: jobId, userId, organizationId, projectId: validatedData.projectId, urls: validatedData.urls as any, options: validatedData.options as any, status: 'PENDING', progress: { total: validatedData.urls.length, processed: 0, successful: 0, failed: 0, }, createdAt: bulkJob.createdAt, }; logger.info(`Bulk tracking job created: ${jobId}`, { userId, urlCount: validatedData.urls.length, organizationId, }); return job; } catch (error) { logger.error('Failed to create bulk job:', error); throw new Error(`Failed to create bulk job: ${error instanceof Error ? error.message : 'Unknown error'}`); } } /** * Create bulk job from CSV file */ async createBulkJobFromCsv( userId: string, organizationId: string | undefined, filePath: string, projectId: string, options: Partial = {} ): Promise { try { // Parse CSV file const urls = await this.parseCsvFile(filePath); if (urls.length === 0) { throw new Error('No valid URLs found in CSV file'); } // Create job with parsed URLs const jobData: BulkJobCreateRequest = { urls, options: { method: 'GET', maxHops: 10, timeout: 15000, enableSSLAnalysis: true, enableSEOAnalysis: true, enableSecurityAnalysis: true, ...options, }, projectId }; const job = await this.createBulkJob(userId, organizationId, jobData, filePath); // Clean up uploaded file await fs.unlink(filePath).catch(() => {}); return job; } catch (error) { // Clean up uploaded file on error await fs.unlink(filePath).catch(() => {}); throw error; } } /** * Get bulk job status and progress */ async getBulkJob(jobId: string, userId: string): Promise { try { const bulkJob = await prisma.bulkJob.findFirst({ where: { id: jobId, userId, }, }); if (!bulkJob) { return null; } // Get job progress from queue const queueJob = await this.trackingQueue.getJob(jobId); const progress = queueJob?.progress || 0; const job: BulkTrackingJob = { id: bulkJob.id, userId: bulkJob.userId, ...(bulkJob.organizationId ? { organizationId: bulkJob.organizationId } : {}), ...(bulkJob.projectId ? { projectId: bulkJob.projectId } : {}), urls: JSON.parse(bulkJob.urlsJson as string), options: JSON.parse(bulkJob.configJson as string), status: bulkJob.status as BulkTrackingJob['status'], progress: { total: bulkJob.totalUrls, processed: bulkJob.processedUrls, successful: bulkJob.successfulUrls, failed: bulkJob.failedUrls, }, results: bulkJob.resultsJson ? JSON.parse(bulkJob.resultsJson as string) : undefined, createdAt: bulkJob.createdAt, startedAt: bulkJob.startedAt || undefined, finishedAt: bulkJob.finishedAt || undefined, estimatedCompletionAt: this.calculateEstimatedCompletion(bulkJob), }; return job; } catch (error) { logger.error('Failed to get bulk job:', error); return null; } } /** * Calculate estimated completion time */ private calculateEstimatedCompletion(bulkJob: any): Date | undefined { if (!bulkJob.startedAt || bulkJob.status === 'COMPLETED' || bulkJob.status === 'FAILED') { return undefined; } const elapsed = Date.now() - bulkJob.startedAt.getTime(); const processed = bulkJob.processedUrls; const remaining = bulkJob.totalUrls - processed; if (processed === 0) { return undefined; } const avgTimePerUrl = elapsed / processed; const estimatedRemainingTime = avgTimePerUrl * remaining; return new Date(Date.now() + estimatedRemainingTime); } /** * Cancel a bulk job */ async cancelBulkJob(jobId: string, userId: string): Promise { try { // Update database status await prisma.bulkJob.updateMany({ where: { id: jobId, userId, }, data: { status: 'CANCELLED', finishedAt: new Date(), }, }); // Remove job from queue const queueJob = await this.trackingQueue.getJob(jobId); if (queueJob) { await queueJob.remove(); } logger.info(`Bulk job cancelled: ${jobId}`, { userId }); return true; } catch (error) { logger.error('Failed to cancel bulk job:', error); return false; } } /** * Get user's bulk jobs */ async getUserBulkJobs( userId: string, limit = 20, offset = 0 ): Promise { try { const bulkJobs = await prisma.bulkJob.findMany({ where: { userId }, orderBy: { createdAt: 'desc' }, take: limit, skip: offset, }); return Promise.all( bulkJobs.map(async (bulkJob) => { const job: BulkTrackingJob = { id: bulkJob.id, userId: bulkJob.userId, ...(bulkJob.organizationId ? { organizationId: bulkJob.organizationId } : {}), ...(bulkJob.projectId ? { projectId: bulkJob.projectId } : {}), urls: JSON.parse(bulkJob.urlsJson as string), options: JSON.parse(bulkJob.configJson as string), status: bulkJob.status as BulkTrackingJob['status'], progress: { total: bulkJob.totalUrls, processed: bulkJob.processedUrls, successful: bulkJob.successfulUrls, failed: bulkJob.failedUrls, }, results: bulkJob.resultsJson ? JSON.parse(bulkJob.resultsJson as string) : undefined, createdAt: bulkJob.createdAt, startedAt: bulkJob.startedAt || undefined, finishedAt: bulkJob.finishedAt || undefined, estimatedCompletionAt: this.calculateEstimatedCompletion(bulkJob), }; return job; }) ); } catch (error) { logger.error('Failed to get user bulk jobs:', error); return []; } } /** * Export bulk job results to CSV */ async exportResultsToCsv(jobId: string, userId: string): Promise { try { const job = await this.getBulkJob(jobId, userId); if (!job || !job.results) { throw new Error('Job not found or no results available'); } const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const fileName = `bulk-results-${jobId}-${timestamp}.csv`; const filePath = path.join(this.uploadsDir, fileName); const csvWriter = createObjectCsvWriter({ path: filePath, header: [ { id: 'url', title: 'URL' }, { id: 'label', title: 'Label' }, { id: 'status', title: 'Status' }, { id: 'checkId', title: 'Check ID' }, { id: 'error', title: 'Error' }, { id: 'startedAt', title: 'Started At' }, { id: 'finishedAt', title: 'Finished At' }, { id: 'durationMs', title: 'Duration (ms)' }, ], }); const records = job.results.map(result => ({ url: result.url, label: result.label || '', status: result.status, checkId: result.checkId || '', error: result.error || '', startedAt: result.timing.startedAt.toISOString(), finishedAt: result.timing.finishedAt?.toISOString() || '', durationMs: result.timing.durationMs || '', })); await csvWriter.writeRecords(records); logger.info(`Results exported to CSV: ${filePath}`, { jobId, userId }); return filePath; } catch (error) { logger.error('Failed to export results to CSV:', error); throw new Error(`Failed to export results: ${error instanceof Error ? error.message : 'Unknown error'}`); } } /** * Clean up old bulk jobs and files */ async cleanupOldJobs(maxAgeHours = 72): Promise { try { const cutoff = new Date(Date.now() - (maxAgeHours * 60 * 60 * 1000)); // Delete old jobs from database const result = await prisma.bulkJob.deleteMany({ where: { createdAt: { lt: cutoff, }, status: { in: ['COMPLETED', 'FAILED', 'CANCELLED'], }, }, }); // Clean up old files try { const files = await fs.readdir(this.uploadsDir); for (const file of files) { const filePath = path.join(this.uploadsDir, file); const stats = await fs.stat(filePath); if (stats.mtime < cutoff) { await fs.unlink(filePath); logger.info(`Cleaned up old file: ${file}`); } } } catch (error) { logger.warn('Failed to cleanup old files:', error); } logger.info(`Cleaned up ${result.count} old bulk jobs`); } catch (error) { logger.error('Failed to cleanup old jobs:', error); } } /** * Get queue statistics */ async getQueueStats(): Promise<{ waiting: number; active: number; completed: number; failed: number; delayed: number; }> { try { const [waiting, active, completed, failed, delayed] = await Promise.all([ this.trackingQueue.getWaiting(), this.trackingQueue.getActive(), this.trackingQueue.getCompleted(), this.trackingQueue.getFailed(), this.trackingQueue.getDelayed(), ]); return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length, }; } catch (error) { logger.error('Failed to get queue stats:', error); return { waiting: 0, active: 0, completed: 0, failed: 0, delayed: 0, }; } } }