feat(phase-6): Bulk CSV processing and background worker implementation
- Add BulkJob model to Prisma schema with relations - Implement BulkProcessorService for CSV parsing and job management - Create BulkTrackingWorker for background processing with BullMQ - Add comprehensive bulk API routes (upload, jobs, progress, export) - Integrate multer for CSV file uploads with validation - Add job progress tracking and estimation - Implement CSV export functionality for results - Add queue statistics and cleanup endpoints - Create shared types for bulk processing - Add comprehensive test suite for all bulk functionality - Implement graceful worker shutdown and error handling - Add rate limiting and authentication for all bulk endpoints Backward compatibility: Maintained for /api/track and /api/v1/track
This commit is contained in:
@@ -36,7 +36,10 @@
|
||||
"mermaid": "^10.6.1",
|
||||
"markdown-it": "^14.0.0",
|
||||
"file-type": "^19.0.0",
|
||||
"mime-types": "^2.1.35"
|
||||
"mime-types": "^2.1.35",
|
||||
"csv-parser": "^3.0.0",
|
||||
"csv-writer": "^1.6.0",
|
||||
"multer": "^1.4.5-lts.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/express": "^4.17.21",
|
||||
@@ -55,6 +58,8 @@
|
||||
"@types/supertest": "^2.0.16",
|
||||
"@types/puppeteer": "^7.0.4",
|
||||
"@types/markdown-it": "^13.0.7",
|
||||
"@types/mime-types": "^2.1.4"
|
||||
"@types/mime-types": "^2.1.4",
|
||||
"@types/multer": "^1.4.11",
|
||||
"@types/csv-parser": "^3.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import authRoutes from './routes/auth.routes';
|
||||
import trackingRoutes from './routes/tracking.routes';
|
||||
import analysisRoutes from './routes/analysis.routes';
|
||||
import exportRoutes from './routes/export.routes';
|
||||
import bulkRoutes from './routes/bulk.routes';
|
||||
|
||||
const app = express();
|
||||
const PORT = process.env.PORT || 3333;
|
||||
@@ -77,6 +78,7 @@ app.use('/api/v2/analyze', analysisRoutes);
|
||||
|
||||
// Export routes (v2)
|
||||
app.use('/api/v2/export', exportRoutes);
|
||||
app.use('/api/v2/bulk', bulkRoutes);
|
||||
|
||||
// Health check endpoint
|
||||
app.get('/health', (req, res) => {
|
||||
|
||||
438
apps/api/src/routes/bulk.routes.ts
Normal file
438
apps/api/src/routes/bulk.routes.ts
Normal file
@@ -0,0 +1,438 @@
|
||||
/**
|
||||
* Bulk Processing Routes for Redirect Intelligence v2
|
||||
*
|
||||
* Handles CSV upload, bulk job creation, and progress tracking
|
||||
*/
|
||||
|
||||
import express from 'express';
|
||||
import multer from 'multer';
|
||||
import path from 'path';
|
||||
import fs from 'fs/promises';
|
||||
import { z } from 'zod';
|
||||
import { requireAuth } from '../middleware/auth.middleware';
|
||||
import { BulkProcessorService } from '../services/bulk-processor.service';
|
||||
import { logger } from '../lib/logger';
|
||||
|
||||
const router = express.Router();
|
||||
const bulkProcessor = new BulkProcessorService();
|
||||
|
||||
// Configure multer for file uploads
|
||||
const upload = multer({
|
||||
dest: 'uploads/',
|
||||
limits: {
|
||||
fileSize: 5 * 1024 * 1024, // 5MB max file size
|
||||
files: 1,
|
||||
},
|
||||
fileFilter: (req, file, cb) => {
|
||||
// Only allow CSV files
|
||||
if (file.mimetype === 'text/csv' || file.originalname.toLowerCase().endsWith('.csv')) {
|
||||
cb(null, true);
|
||||
} else {
|
||||
cb(new Error('Only CSV files are allowed'));
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
// Validation schemas
|
||||
const CreateBulkJobSchema = z.object({
|
||||
projectId: z.string().optional(),
|
||||
urls: z.array(z.object({
|
||||
url: z.string().url(),
|
||||
label: z.string().optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
})).min(1).max(1000),
|
||||
options: z.object({
|
||||
method: z.enum(['GET', 'POST', 'HEAD']).default('GET'),
|
||||
userAgent: z.string().optional(),
|
||||
maxHops: z.number().min(1).max(20).default(10),
|
||||
timeout: z.number().min(1000).max(30000).default(15000),
|
||||
enableSSLAnalysis: z.boolean().default(true),
|
||||
enableSEOAnalysis: z.boolean().default(true),
|
||||
enableSecurityAnalysis: z.boolean().default(true),
|
||||
headers: z.record(z.string()).optional(),
|
||||
}).default({}),
|
||||
});
|
||||
|
||||
const BulkJobParamsSchema = z.object({
|
||||
jobId: z.string().min(1),
|
||||
});
|
||||
|
||||
const GetJobsQuerySchema = z.object({
|
||||
limit: z.string().transform(val => parseInt(val) || 20).refine(val => val > 0 && val <= 100),
|
||||
offset: z.string().transform(val => parseInt(val) || 0).refine(val => val >= 0),
|
||||
}).partial();
|
||||
|
||||
/**
|
||||
* POST /api/v2/bulk/upload
|
||||
* Upload CSV file and create bulk tracking job
|
||||
*/
|
||||
router.post('/upload', requireAuth, upload.single('file'), async (req, res) => {
|
||||
try {
|
||||
if (!req.file) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'No file uploaded',
|
||||
});
|
||||
}
|
||||
|
||||
const userId = req.user!.id;
|
||||
const organizationId = req.user!.memberships?.[0]?.organizationId;
|
||||
|
||||
// Parse options from request body
|
||||
const options = req.body.options ? JSON.parse(req.body.options) : {};
|
||||
|
||||
logger.info(`Processing CSV upload for user: ${userId}`, {
|
||||
filename: req.file.originalname,
|
||||
size: req.file.size,
|
||||
});
|
||||
|
||||
// Create bulk job from CSV
|
||||
const job = await bulkProcessor.createBulkJobFromCsv(
|
||||
userId,
|
||||
organizationId,
|
||||
req.file.path,
|
||||
options
|
||||
);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: {
|
||||
jobId: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
estimatedCompletionAt: job.estimatedCompletionAt,
|
||||
},
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('CSV upload failed:', error);
|
||||
|
||||
// Clean up uploaded file on error
|
||||
if (req.file) {
|
||||
await fs.unlink(req.file.path).catch(() => {});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to process CSV upload',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/v2/bulk/jobs
|
||||
* Create bulk tracking job with URL array
|
||||
*/
|
||||
router.post('/jobs', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const userId = req.user!.id;
|
||||
const organizationId = req.user!.memberships?.[0]?.organizationId;
|
||||
|
||||
// Validate request body
|
||||
const validatedData = CreateBulkJobSchema.parse(req.body);
|
||||
|
||||
logger.info(`Creating bulk job for user: ${userId}`, {
|
||||
urlCount: validatedData.urls.length,
|
||||
projectId: validatedData.projectId,
|
||||
});
|
||||
|
||||
// Create bulk job
|
||||
const job = await bulkProcessor.createBulkJob(userId, organizationId, validatedData);
|
||||
|
||||
res.status(201).json({
|
||||
success: true,
|
||||
data: {
|
||||
jobId: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
estimatedCompletionAt: job.estimatedCompletionAt,
|
||||
urls: job.urls.length, // Don't return the full URL list for privacy
|
||||
},
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Bulk job creation failed:', error);
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Validation failed',
|
||||
details: error.errors,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to create bulk job',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/v2/bulk/jobs
|
||||
* Get user's bulk jobs with pagination
|
||||
*/
|
||||
router.get('/jobs', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const userId = req.user!.id;
|
||||
const query = GetJobsQuerySchema.parse(req.query);
|
||||
|
||||
const jobs = await bulkProcessor.getUserBulkJobs(
|
||||
userId,
|
||||
query.limit || 20,
|
||||
query.offset || 0
|
||||
);
|
||||
|
||||
// Remove sensitive data from response
|
||||
const sanitizedJobs = jobs.map(job => ({
|
||||
id: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
createdAt: job.createdAt,
|
||||
startedAt: job.startedAt,
|
||||
finishedAt: job.finishedAt,
|
||||
estimatedCompletionAt: job.estimatedCompletionAt,
|
||||
projectId: job.projectId,
|
||||
urlCount: job.urls.length,
|
||||
options: job.options,
|
||||
}));
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: sanitizedJobs,
|
||||
meta: {
|
||||
limit: query.limit || 20,
|
||||
offset: query.offset || 0,
|
||||
total: sanitizedJobs.length,
|
||||
},
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to get bulk jobs:', error);
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Invalid query parameters',
|
||||
details: error.errors,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to retrieve bulk jobs',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/v2/bulk/jobs/:jobId
|
||||
* Get specific bulk job details and progress
|
||||
*/
|
||||
router.get('/jobs/:jobId', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const userId = req.user!.id;
|
||||
const { jobId } = BulkJobParamsSchema.parse(req.params);
|
||||
|
||||
const job = await bulkProcessor.getBulkJob(jobId, userId);
|
||||
|
||||
if (!job) {
|
||||
return res.status(404).json({
|
||||
success: false,
|
||||
error: 'Bulk job not found',
|
||||
});
|
||||
}
|
||||
|
||||
// Include results only if job is completed
|
||||
const responseData: any = {
|
||||
id: job.id,
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
createdAt: job.createdAt,
|
||||
startedAt: job.startedAt,
|
||||
finishedAt: job.finishedAt,
|
||||
estimatedCompletionAt: job.estimatedCompletionAt,
|
||||
projectId: job.projectId,
|
||||
urlCount: job.urls.length,
|
||||
options: job.options,
|
||||
};
|
||||
|
||||
// Include results if job is completed
|
||||
if (job.status === 'completed' && job.results) {
|
||||
responseData.results = job.results;
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: responseData,
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to get bulk job:', error);
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Invalid job ID',
|
||||
details: error.errors,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to retrieve bulk job',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* DELETE /api/v2/bulk/jobs/:jobId
|
||||
* Cancel a bulk job
|
||||
*/
|
||||
router.delete('/jobs/:jobId', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const userId = req.user!.id;
|
||||
const { jobId } = BulkJobParamsSchema.parse(req.params);
|
||||
|
||||
const success = await bulkProcessor.cancelBulkJob(jobId, userId);
|
||||
|
||||
if (!success) {
|
||||
return res.status(404).json({
|
||||
success: false,
|
||||
error: 'Bulk job not found or cannot be cancelled',
|
||||
});
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: 'Bulk job cancelled successfully',
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to cancel bulk job:', error);
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Invalid job ID',
|
||||
details: error.errors,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to cancel bulk job',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/v2/bulk/jobs/:jobId/export/csv
|
||||
* Export bulk job results as CSV
|
||||
*/
|
||||
router.get('/jobs/:jobId/export/csv', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const userId = req.user!.id;
|
||||
const { jobId } = BulkJobParamsSchema.parse(req.params);
|
||||
|
||||
const filePath = await bulkProcessor.exportResultsToCsv(jobId, userId);
|
||||
|
||||
// Set headers for file download
|
||||
res.setHeader('Content-Type', 'text/csv');
|
||||
res.setHeader('Content-Disposition', `attachment; filename="bulk-results-${jobId}.csv"`);
|
||||
|
||||
// Stream file and clean up after
|
||||
const fileStream = require('fs').createReadStream(filePath);
|
||||
fileStream.pipe(res);
|
||||
|
||||
fileStream.on('end', async () => {
|
||||
// Clean up file after download
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
});
|
||||
|
||||
fileStream.on('error', (error: Error) => {
|
||||
logger.error('File streaming error:', error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to stream results file',
|
||||
});
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to export bulk job results:', error);
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Invalid job ID',
|
||||
details: error.errors,
|
||||
});
|
||||
}
|
||||
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to export results',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/v2/bulk/stats
|
||||
* Get queue statistics
|
||||
*/
|
||||
router.get('/stats', requireAuth, async (req, res) => {
|
||||
try {
|
||||
const stats = await bulkProcessor.getQueueStats();
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: {
|
||||
queue: stats,
|
||||
timestamp: new Date().toISOString(),
|
||||
},
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to get queue stats:', error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to retrieve queue statistics',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* DELETE /api/v2/bulk/cleanup
|
||||
* Clean up old bulk jobs and files (admin only)
|
||||
*/
|
||||
router.delete('/cleanup', requireAuth, async (req, res) => {
|
||||
try {
|
||||
// Only allow admin users to run cleanup
|
||||
const user = req.user!;
|
||||
const isAdmin = user.memberships?.some(m => m.role === 'ADMIN' || m.role === 'OWNER');
|
||||
|
||||
if (!isAdmin) {
|
||||
return res.status(403).json({
|
||||
success: false,
|
||||
error: 'Admin privileges required',
|
||||
});
|
||||
}
|
||||
|
||||
const maxAgeHours = parseInt(req.query.maxAge as string) || 72; // Default 3 days
|
||||
await bulkProcessor.cleanupOldJobs(maxAgeHours);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Cleanup completed for jobs older than ${maxAgeHours} hours`,
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to cleanup old jobs:', error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to cleanup old jobs',
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
603
apps/api/src/services/bulk-processor.service.ts
Normal file
603
apps/api/src/services/bulk-processor.service.ts
Normal file
@@ -0,0 +1,603 @@
|
||||
/**
|
||||
* Bulk Processing Service for Redirect Intelligence v2
|
||||
*
|
||||
* Manages CSV upload, parsing, and bulk redirect analysis jobs
|
||||
*/
|
||||
|
||||
import fs from 'fs/promises';
|
||||
import path from 'path';
|
||||
import { Queue, Job } from 'bullmq';
|
||||
import IORedis from 'ioredis';
|
||||
import csvParser from 'csv-parser';
|
||||
import { createObjectCsvWriter } from 'csv-writer';
|
||||
import { z } from 'zod';
|
||||
import { logger } from '../lib/logger';
|
||||
import { prisma } from '../lib/prisma';
|
||||
|
||||
// Job types and data structures
|
||||
export interface BulkTrackingJob {
|
||||
id: string;
|
||||
userId: string;
|
||||
organizationId?: string;
|
||||
projectId?: string;
|
||||
urls: Array<{
|
||||
url: string;
|
||||
label?: string;
|
||||
metadata?: Record<string, any>;
|
||||
}>;
|
||||
options: {
|
||||
method: 'GET' | 'POST' | 'HEAD';
|
||||
userAgent?: string;
|
||||
maxHops: number;
|
||||
timeout: number;
|
||||
enableSSLAnalysis: boolean;
|
||||
enableSEOAnalysis: boolean;
|
||||
enableSecurityAnalysis: boolean;
|
||||
headers?: Record<string, string>;
|
||||
};
|
||||
status: 'pending' | 'processing' | 'completed' | 'failed' | 'cancelled';
|
||||
progress: {
|
||||
total: number;
|
||||
processed: number;
|
||||
successful: number;
|
||||
failed: number;
|
||||
};
|
||||
results?: Array<{
|
||||
url: string;
|
||||
label?: string;
|
||||
checkId?: string;
|
||||
status: 'success' | 'failed';
|
||||
error?: string;
|
||||
timing: {
|
||||
startedAt: Date;
|
||||
finishedAt?: Date;
|
||||
durationMs?: number;
|
||||
};
|
||||
}>;
|
||||
createdAt: Date;
|
||||
startedAt?: Date;
|
||||
finishedAt?: Date;
|
||||
estimatedCompletionAt?: Date;
|
||||
}
|
||||
|
||||
// Validation schemas
|
||||
const BulkJobCreateSchema = z.object({
|
||||
projectId: z.string().optional(),
|
||||
urls: z.array(z.object({
|
||||
url: z.string().url('Invalid URL format'),
|
||||
label: z.string().optional(),
|
||||
metadata: z.record(z.any()).optional(),
|
||||
})).min(1, 'At least one URL is required').max(1000, 'Maximum 1000 URLs per job'),
|
||||
options: z.object({
|
||||
method: z.enum(['GET', 'POST', 'HEAD']).default('GET'),
|
||||
userAgent: z.string().optional(),
|
||||
maxHops: z.number().min(1).max(20).default(10),
|
||||
timeout: z.number().min(1000).max(30000).default(15000),
|
||||
enableSSLAnalysis: z.boolean().default(true),
|
||||
enableSEOAnalysis: z.boolean().default(true),
|
||||
enableSecurityAnalysis: z.boolean().default(true),
|
||||
headers: z.record(z.string()).optional(),
|
||||
}).default({}),
|
||||
});
|
||||
|
||||
const CsvRowSchema = z.object({
|
||||
url: z.string().min(1, 'URL is required'),
|
||||
label: z.string().optional(),
|
||||
method: z.enum(['GET', 'POST', 'HEAD']).optional(),
|
||||
user_agent: z.string().optional(),
|
||||
max_hops: z.string().optional(),
|
||||
timeout: z.string().optional(),
|
||||
enable_ssl: z.string().optional(),
|
||||
enable_seo: z.string().optional(),
|
||||
enable_security: z.string().optional(),
|
||||
});
|
||||
|
||||
export type BulkJobCreateRequest = z.infer<typeof BulkJobCreateSchema>;
|
||||
export type CsvRow = z.infer<typeof CsvRowSchema>;
|
||||
|
||||
export class BulkProcessorService {
|
||||
private redis: IORedis;
|
||||
private trackingQueue: Queue;
|
||||
private readonly uploadsDir: string;
|
||||
|
||||
constructor() {
|
||||
this.redis = new IORedis({
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: parseInt(process.env.REDIS_PORT || '6379'),
|
||||
retryDelayOnFailover: 100,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
|
||||
this.trackingQueue = new Queue('bulk-tracking', {
|
||||
connection: this.redis,
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: 100, // Keep last 100 completed jobs
|
||||
removeOnFail: 50, // Keep last 50 failed jobs
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 2000,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
this.uploadsDir = path.join(process.cwd(), 'uploads');
|
||||
this.ensureUploadsDirectory();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure uploads directory exists
|
||||
*/
|
||||
private async ensureUploadsDirectory(): Promise<void> {
|
||||
try {
|
||||
await fs.mkdir(this.uploadsDir, { recursive: true });
|
||||
} catch (error) {
|
||||
logger.error('Failed to create uploads directory:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse CSV file and extract URL data
|
||||
*/
|
||||
async parseCsvFile(filePath: string): Promise<Array<{
|
||||
url: string;
|
||||
label?: string;
|
||||
metadata?: Record<string, any>;
|
||||
}>> {
|
||||
const results: Array<{ url: string; label?: string; metadata?: Record<string, any> }> = [];
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const stream = require('fs').createReadStream(filePath)
|
||||
.pipe(csvParser())
|
||||
.on('data', (row: any) => {
|
||||
try {
|
||||
// Validate and parse each row
|
||||
const validatedRow = CsvRowSchema.parse(row);
|
||||
|
||||
// Normalize URL
|
||||
let url = validatedRow.url.trim();
|
||||
if (!url.startsWith('http://') && !url.startsWith('https://')) {
|
||||
url = `https://${url}`;
|
||||
}
|
||||
|
||||
const parsedRow = {
|
||||
url,
|
||||
label: validatedRow.label?.trim() || undefined,
|
||||
metadata: {
|
||||
// Store additional CSV columns as metadata
|
||||
method: validatedRow.method || 'GET',
|
||||
userAgent: validatedRow.user_agent?.trim(),
|
||||
maxHops: validatedRow.max_hops ? parseInt(validatedRow.max_hops) : undefined,
|
||||
timeout: validatedRow.timeout ? parseInt(validatedRow.timeout) : undefined,
|
||||
enableSSL: this.parseBoolean(validatedRow.enable_ssl),
|
||||
enableSEO: this.parseBoolean(validatedRow.enable_seo),
|
||||
enableSecurity: this.parseBoolean(validatedRow.enable_security),
|
||||
},
|
||||
};
|
||||
|
||||
results.push(parsedRow);
|
||||
} catch (error) {
|
||||
logger.warn('Invalid CSV row skipped:', { row, error: error instanceof Error ? error.message : 'Unknown error' });
|
||||
}
|
||||
})
|
||||
.on('end', () => {
|
||||
logger.info(`CSV parsing completed: ${results.length} valid URLs found`);
|
||||
resolve(results);
|
||||
})
|
||||
.on('error', (error: Error) => {
|
||||
logger.error('CSV parsing failed:', error);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse boolean values from CSV
|
||||
*/
|
||||
private parseBoolean(value?: string): boolean | undefined {
|
||||
if (!value) return undefined;
|
||||
const normalized = value.toLowerCase().trim();
|
||||
if (normalized === 'true' || normalized === '1' || normalized === 'yes') return true;
|
||||
if (normalized === 'false' || normalized === '0' || normalized === 'no') return false;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new bulk tracking job
|
||||
*/
|
||||
async createBulkJob(
|
||||
userId: string,
|
||||
organizationId: string | undefined,
|
||||
jobData: BulkJobCreateRequest
|
||||
): Promise<BulkTrackingJob> {
|
||||
try {
|
||||
// Validate input
|
||||
const validatedData = BulkJobCreateSchema.parse(jobData);
|
||||
|
||||
const jobId = `bulk_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
// Create job record in database
|
||||
const bulkJob = await prisma.bulkJob.create({
|
||||
data: {
|
||||
id: jobId,
|
||||
userId,
|
||||
organizationId,
|
||||
projectId: validatedData.projectId,
|
||||
status: 'pending',
|
||||
totalUrls: validatedData.urls.length,
|
||||
processedUrls: 0,
|
||||
successfulUrls: 0,
|
||||
failedUrls: 0,
|
||||
configJson: JSON.stringify(validatedData.options),
|
||||
urlsJson: JSON.stringify(validatedData.urls),
|
||||
},
|
||||
});
|
||||
|
||||
// Queue the job for processing
|
||||
await this.trackingQueue.add(
|
||||
'process-bulk-tracking',
|
||||
{
|
||||
jobId,
|
||||
userId,
|
||||
organizationId,
|
||||
urls: validatedData.urls,
|
||||
options: validatedData.options,
|
||||
},
|
||||
{
|
||||
jobId,
|
||||
delay: 0, // Start immediately
|
||||
}
|
||||
);
|
||||
|
||||
const job: BulkTrackingJob = {
|
||||
id: jobId,
|
||||
userId,
|
||||
organizationId,
|
||||
projectId: validatedData.projectId,
|
||||
urls: validatedData.urls,
|
||||
options: validatedData.options,
|
||||
status: 'pending',
|
||||
progress: {
|
||||
total: validatedData.urls.length,
|
||||
processed: 0,
|
||||
successful: 0,
|
||||
failed: 0,
|
||||
},
|
||||
createdAt: bulkJob.createdAt,
|
||||
};
|
||||
|
||||
logger.info(`Bulk tracking job created: ${jobId}`, {
|
||||
userId,
|
||||
urlCount: validatedData.urls.length,
|
||||
organizationId,
|
||||
});
|
||||
|
||||
return job;
|
||||
} catch (error) {
|
||||
logger.error('Failed to create bulk job:', error);
|
||||
throw new Error(`Failed to create bulk job: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create bulk job from CSV file
|
||||
*/
|
||||
async createBulkJobFromCsv(
|
||||
userId: string,
|
||||
organizationId: string | undefined,
|
||||
filePath: string,
|
||||
options: Partial<BulkJobCreateRequest['options']> = {}
|
||||
): Promise<BulkTrackingJob> {
|
||||
try {
|
||||
// Parse CSV file
|
||||
const urls = await this.parseCsvFile(filePath);
|
||||
|
||||
if (urls.length === 0) {
|
||||
throw new Error('No valid URLs found in CSV file');
|
||||
}
|
||||
|
||||
// Create job with parsed URLs
|
||||
const jobData: BulkJobCreateRequest = {
|
||||
urls,
|
||||
options: {
|
||||
method: 'GET',
|
||||
maxHops: 10,
|
||||
timeout: 15000,
|
||||
enableSSLAnalysis: true,
|
||||
enableSEOAnalysis: true,
|
||||
enableSecurityAnalysis: true,
|
||||
...options,
|
||||
},
|
||||
};
|
||||
|
||||
const job = await this.createBulkJob(userId, organizationId, jobData);
|
||||
|
||||
// Clean up uploaded file
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
|
||||
return job;
|
||||
} catch (error) {
|
||||
// Clean up uploaded file on error
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get bulk job status and progress
|
||||
*/
|
||||
async getBulkJob(jobId: string, userId: string): Promise<BulkTrackingJob | null> {
|
||||
try {
|
||||
const bulkJob = await prisma.bulkJob.findFirst({
|
||||
where: {
|
||||
id: jobId,
|
||||
userId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!bulkJob) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Get job progress from queue
|
||||
const queueJob = await this.trackingQueue.getJob(jobId);
|
||||
const progress = queueJob?.progress || 0;
|
||||
|
||||
const job: BulkTrackingJob = {
|
||||
id: bulkJob.id,
|
||||
userId: bulkJob.userId,
|
||||
organizationId: bulkJob.organizationId || undefined,
|
||||
projectId: bulkJob.projectId || undefined,
|
||||
urls: JSON.parse(bulkJob.urlsJson as string),
|
||||
options: JSON.parse(bulkJob.configJson as string),
|
||||
status: bulkJob.status as BulkTrackingJob['status'],
|
||||
progress: {
|
||||
total: bulkJob.totalUrls,
|
||||
processed: bulkJob.processedUrls,
|
||||
successful: bulkJob.successfulUrls,
|
||||
failed: bulkJob.failedUrls,
|
||||
},
|
||||
results: bulkJob.resultsJson ? JSON.parse(bulkJob.resultsJson as string) : undefined,
|
||||
createdAt: bulkJob.createdAt,
|
||||
startedAt: bulkJob.startedAt || undefined,
|
||||
finishedAt: bulkJob.finishedAt || undefined,
|
||||
estimatedCompletionAt: this.calculateEstimatedCompletion(bulkJob),
|
||||
};
|
||||
|
||||
return job;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get bulk job:', error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate estimated completion time
|
||||
*/
|
||||
private calculateEstimatedCompletion(bulkJob: any): Date | undefined {
|
||||
if (!bulkJob.startedAt || bulkJob.status === 'completed' || bulkJob.status === 'failed') {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const elapsed = Date.now() - bulkJob.startedAt.getTime();
|
||||
const processed = bulkJob.processedUrls;
|
||||
const remaining = bulkJob.totalUrls - processed;
|
||||
|
||||
if (processed === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const avgTimePerUrl = elapsed / processed;
|
||||
const estimatedRemainingTime = avgTimePerUrl * remaining;
|
||||
|
||||
return new Date(Date.now() + estimatedRemainingTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a bulk job
|
||||
*/
|
||||
async cancelBulkJob(jobId: string, userId: string): Promise<boolean> {
|
||||
try {
|
||||
// Update database status
|
||||
await prisma.bulkJob.updateMany({
|
||||
where: {
|
||||
id: jobId,
|
||||
userId,
|
||||
},
|
||||
data: {
|
||||
status: 'cancelled',
|
||||
finishedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
// Remove job from queue
|
||||
const queueJob = await this.trackingQueue.getJob(jobId);
|
||||
if (queueJob) {
|
||||
await queueJob.remove();
|
||||
}
|
||||
|
||||
logger.info(`Bulk job cancelled: ${jobId}`, { userId });
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error('Failed to cancel bulk job:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user's bulk jobs
|
||||
*/
|
||||
async getUserBulkJobs(
|
||||
userId: string,
|
||||
limit = 20,
|
||||
offset = 0
|
||||
): Promise<BulkTrackingJob[]> {
|
||||
try {
|
||||
const bulkJobs = await prisma.bulkJob.findMany({
|
||||
where: { userId },
|
||||
orderBy: { createdAt: 'desc' },
|
||||
take: limit,
|
||||
skip: offset,
|
||||
});
|
||||
|
||||
return Promise.all(
|
||||
bulkJobs.map(async (bulkJob) => {
|
||||
const job: BulkTrackingJob = {
|
||||
id: bulkJob.id,
|
||||
userId: bulkJob.userId,
|
||||
organizationId: bulkJob.organizationId || undefined,
|
||||
projectId: bulkJob.projectId || undefined,
|
||||
urls: JSON.parse(bulkJob.urlsJson as string),
|
||||
options: JSON.parse(bulkJob.configJson as string),
|
||||
status: bulkJob.status as BulkTrackingJob['status'],
|
||||
progress: {
|
||||
total: bulkJob.totalUrls,
|
||||
processed: bulkJob.processedUrls,
|
||||
successful: bulkJob.successfulUrls,
|
||||
failed: bulkJob.failedUrls,
|
||||
},
|
||||
results: bulkJob.resultsJson ? JSON.parse(bulkJob.resultsJson as string) : undefined,
|
||||
createdAt: bulkJob.createdAt,
|
||||
startedAt: bulkJob.startedAt || undefined,
|
||||
finishedAt: bulkJob.finishedAt || undefined,
|
||||
estimatedCompletionAt: this.calculateEstimatedCompletion(bulkJob),
|
||||
};
|
||||
return job;
|
||||
})
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error('Failed to get user bulk jobs:', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Export bulk job results to CSV
|
||||
*/
|
||||
async exportResultsToCsv(jobId: string, userId: string): Promise<string> {
|
||||
try {
|
||||
const job = await this.getBulkJob(jobId, userId);
|
||||
if (!job || !job.results) {
|
||||
throw new Error('Job not found or no results available');
|
||||
}
|
||||
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const fileName = `bulk-results-${jobId}-${timestamp}.csv`;
|
||||
const filePath = path.join(this.uploadsDir, fileName);
|
||||
|
||||
const csvWriter = createObjectCsvWriter({
|
||||
path: filePath,
|
||||
header: [
|
||||
{ id: 'url', title: 'URL' },
|
||||
{ id: 'label', title: 'Label' },
|
||||
{ id: 'status', title: 'Status' },
|
||||
{ id: 'checkId', title: 'Check ID' },
|
||||
{ id: 'error', title: 'Error' },
|
||||
{ id: 'startedAt', title: 'Started At' },
|
||||
{ id: 'finishedAt', title: 'Finished At' },
|
||||
{ id: 'durationMs', title: 'Duration (ms)' },
|
||||
],
|
||||
});
|
||||
|
||||
const records = job.results.map(result => ({
|
||||
url: result.url,
|
||||
label: result.label || '',
|
||||
status: result.status,
|
||||
checkId: result.checkId || '',
|
||||
error: result.error || '',
|
||||
startedAt: result.timing.startedAt.toISOString(),
|
||||
finishedAt: result.timing.finishedAt?.toISOString() || '',
|
||||
durationMs: result.timing.durationMs || '',
|
||||
}));
|
||||
|
||||
await csvWriter.writeRecords(records);
|
||||
|
||||
logger.info(`Results exported to CSV: ${filePath}`, { jobId, userId });
|
||||
return filePath;
|
||||
} catch (error) {
|
||||
logger.error('Failed to export results to CSV:', error);
|
||||
throw new Error(`Failed to export results: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old bulk jobs and files
|
||||
*/
|
||||
async cleanupOldJobs(maxAgeHours = 72): Promise<void> {
|
||||
try {
|
||||
const cutoff = new Date(Date.now() - (maxAgeHours * 60 * 60 * 1000));
|
||||
|
||||
// Delete old jobs from database
|
||||
const result = await prisma.bulkJob.deleteMany({
|
||||
where: {
|
||||
createdAt: {
|
||||
lt: cutoff,
|
||||
},
|
||||
status: {
|
||||
in: ['completed', 'failed', 'cancelled'],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Clean up old files
|
||||
try {
|
||||
const files = await fs.readdir(this.uploadsDir);
|
||||
for (const file of files) {
|
||||
const filePath = path.join(this.uploadsDir, file);
|
||||
const stats = await fs.stat(filePath);
|
||||
|
||||
if (stats.mtime < cutoff) {
|
||||
await fs.unlink(filePath);
|
||||
logger.info(`Cleaned up old file: ${file}`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to cleanup old files:', error);
|
||||
}
|
||||
|
||||
logger.info(`Cleaned up ${result.count} old bulk jobs`);
|
||||
} catch (error) {
|
||||
logger.error('Failed to cleanup old jobs:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue statistics
|
||||
*/
|
||||
async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
}> {
|
||||
try {
|
||||
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
||||
this.trackingQueue.getWaiting(),
|
||||
this.trackingQueue.getActive(),
|
||||
this.trackingQueue.getCompleted(),
|
||||
this.trackingQueue.getFailed(),
|
||||
this.trackingQueue.getDelayed(),
|
||||
]);
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to get queue stats:', error);
|
||||
return {
|
||||
waiting: 0,
|
||||
active: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
delayed: 0,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user