Files
Andrei 58f8093689 Rebrand from 'Redirect Intelligence v2' to 'URL Tracker Tool V2' throughout UI
- Updated all component headers and documentation
- Changed navbar and footer branding
- Updated homepage hero badge
- Modified page title in index.html
- Simplified footer text to 'Built with ❤️'
- Consistent V2 capitalization across all references
2025-08-19 19:12:23 +00:00

791 lines
28 KiB
JavaScript

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Job = exports.PRIORITY_LIMIT = void 0;
const tslib_1 = require("tslib");
const lodash_1 = require("lodash");
const util_1 = require("util");
const utils_1 = require("../utils");
const backoffs_1 = require("./backoffs");
const scripts_1 = require("./scripts");
const unrecoverable_error_1 = require("./errors/unrecoverable-error");
const logger = (0, util_1.debuglog)('bull');
const optsDecodeMap = {
fpof: 'failParentOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
};
const optsEncodeMap = (0, lodash_1.invert)(optsDecodeMap);
exports.PRIORITY_LIMIT = 2 ** 21;
/**
* Job
*
* This class represents a Job in the queue. Normally job are implicitly created when
* you add a job to the queue with methods such as Queue.addJob( ... )
*
* A Job instance is also passed to the Worker's process function.
*
* @class Job
*/
class Job {
constructor(queue,
/**
* The name of the Job
*/
name,
/**
* The payload for this job.
*/
data,
/**
* The options object for this job.
*/
opts = {}, id) {
this.queue = queue;
this.name = name;
this.data = data;
this.opts = opts;
this.id = id;
/**
* The progress a job has performed so far.
* @defaultValue 0
*/
this.progress = 0;
/**
* The value returned by the processor when processing this job.
* @defaultValue null
*/
this.returnvalue = null;
/**
* Stacktrace for the error (for failed jobs).
* @defaultValue null
*/
this.stacktrace = null;
/**
* Number of attempts after the job has failed.
* @defaultValue 0
*/
this.attemptsMade = 0;
const _a = this.opts, { repeatJobKey } = _a, restOpts = tslib_1.__rest(_a, ["repeatJobKey"]);
this.opts = Object.assign({
attempts: 0,
delay: 0,
}, restOpts);
this.delay = this.opts.delay;
this.repeatJobKey = repeatJobKey;
this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
this.opts.backoff = backoffs_1.Backoffs.normalize(opts.backoff);
this.parentKey = (0, utils_1.getParentKey)(opts.parent);
this.parent = opts.parent
? { id: opts.parent.id, queueKey: opts.parent.queue }
: undefined;
this.toKey = queue.toKey.bind(queue);
this.scripts = new scripts_1.Scripts(queue);
this.queueQualifiedName = queue.qualifiedName;
}
/**
* Creates a new job and adds it to the queue.
*
* @param queue - the queue where to add the job.
* @param name - the name of the job.
* @param data - the payload of the job.
* @param opts - the options bag for this job.
* @returns
*/
static async create(queue, name, data, opts) {
const client = await queue.client;
const job = new this(queue, name, data, opts, opts && opts.jobId);
job.id = await job.addJob(client, {
parentKey: job.parentKey,
parentDependenciesKey: job.parentKey
? `${job.parentKey}:dependencies`
: '',
});
return job;
}
/**
* Creates a bulk of jobs and adds them atomically to the given queue.
*
* @param queue -the queue were to add the jobs.
* @param jobs - an array of jobs to be added to the queue.
* @returns
*/
static async createBulk(queue, jobs) {
const client = await queue.client;
const jobInstances = jobs.map(job => { var _a; return new this(queue, job.name, job.data, job.opts, (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId); });
const multi = client.multi();
for (const job of jobInstances) {
job.addJob(multi, {
parentKey: job.parentKey,
parentDependenciesKey: job.parentKey
? `${job.parentKey}:dependencies`
: '',
});
}
const results = (await multi.exec());
for (let index = 0; index < results.length; ++index) {
const [err, id] = results[index];
if (err) {
throw err;
}
jobInstances[index].id = id;
}
return jobInstances;
}
/**
* Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)
*
* @param queue - the queue where the job belongs to.
* @param json - the plain object containing the job.
* @param jobId - an optional job id (overrides the id coming from the JSON object)
* @returns
*/
static fromJSON(queue, json, jobId) {
const data = JSON.parse(json.data || '{}');
const opts = Job.optsFromJSON(json.opts);
const job = new this(queue, json.name, data, opts, json.id || jobId);
job.progress = JSON.parse(json.progress || '0');
job.delay = parseInt(json.delay);
job.timestamp = parseInt(json.timestamp);
if (json.finishedOn) {
job.finishedOn = parseInt(json.finishedOn);
}
if (json.processedOn) {
job.processedOn = parseInt(json.processedOn);
}
if (json.rjk) {
job.repeatJobKey = json.rjk;
}
job.failedReason = json.failedReason;
job.attemptsMade = parseInt(json.attemptsMade || '0');
job.stacktrace = getTraces(json.stacktrace);
if (typeof json.returnvalue === 'string') {
job.returnvalue = getReturnValue(json.returnvalue);
}
if (json.parentKey) {
job.parentKey = json.parentKey;
}
if (json.parent) {
job.parent = JSON.parse(json.parent);
}
return job;
}
static optsFromJSON(rawOpts) {
const opts = JSON.parse(rawOpts || '{}');
const optionEntries = Object.entries(opts);
const options = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if (optsDecodeMap[attributeName]) {
options[optsDecodeMap[attributeName]] =
value;
}
else {
options[attributeName] = value;
}
}
return options;
}
/**
* Fetches a Job from the queue given the passed job id.
*
* @param queue - the queue where the job belongs to.
* @param jobId - the job id.
* @returns
*/
static async fromId(queue, jobId) {
// jobId can be undefined if moveJob returns undefined
if (jobId) {
const client = await queue.client;
const jobData = await client.hgetall(queue.toKey(jobId));
return (0, utils_1.isEmpty)(jobData)
? undefined
: this.fromJSON(queue, jobData, jobId);
}
}
/**
* addJobLog
*
* @param queue Queue instance
* @param jobId Job id
* @param logRow Log row
* @param keepLogs optional maximum number of logs to keep
*
* @returns The total number of log entries for this job so far.
*/
static async addJobLog(queue, jobId, logRow, keepLogs) {
const client = await queue.client;
const logsKey = queue.toKey(jobId) + ':logs';
const multi = client.multi();
multi.rpush(logsKey, logRow);
if (keepLogs) {
multi.ltrim(logsKey, -keepLogs, -1);
}
const result = (await multi.exec());
return keepLogs ? Math.min(keepLogs, result[0][1]) : result[0][1];
}
toJSON() {
const _a = this, { queue, scripts } = _a, withoutQueueAndScripts = tslib_1.__rest(_a, ["queue", "scripts"]);
return withoutQueueAndScripts;
}
/**
* Prepares a job to be serialized for storage in Redis.
* @returns
*/
asJSON() {
return {
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: this.optsAsJSON(this.opts),
parent: this.parent ? Object.assign({}, this.parent) : undefined,
parentKey: this.parentKey,
progress: this.progress,
attemptsMade: this.attemptsMade,
finishedOn: this.finishedOn,
processedOn: this.processedOn,
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
};
}
optsAsJSON(opts = {}) {
const optionEntries = Object.entries(opts);
const options = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if (optsEncodeMap[attributeName]) {
options[optsEncodeMap[attributeName]] =
value;
}
else {
options[attributeName] = value;
}
}
return options;
}
/**
* Prepares a job to be passed to Sandbox.
* @returns
*/
asJSONSandbox() {
return Object.assign(Object.assign({}, this.asJSON()), { queueName: this.queueName, prefix: this.prefix });
}
/**
* Updates a job's data
*
* @param data - the data that will replace the current jobs data.
*/
updateData(data) {
this.data = data;
return this.scripts.updateData(this, data);
}
/**
* Updates a job's progress
*
* @param progress - number or object to be saved as progress.
*/
async updateProgress(progress) {
this.progress = progress;
await this.scripts.updateProgress(this.id, progress);
this.queue.emit('progress', this, progress);
}
/**
* Logs one row of log data.
*
* @param logRow - string with log data to be logged.
* @returns The total number of log entries for this job so far.
*/
async log(logRow) {
return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
}
/**
* Clears job's logs
*
* @param keepLogs - the amount of log entries to preserve
*/
async clearLogs(keepLogs) {
const client = await this.queue.client;
const logsKey = this.toKey(this.id) + ':logs';
if (keepLogs) {
await client.ltrim(logsKey, -keepLogs, -1);
}
else {
await client.del(logsKey);
}
}
/**
* Completely remove the job from the queue.
* Note, this call will throw an exception if the job
* is being processed when the call is performed.
*
* @param opts - Options to remove a job
*/
async remove({ removeChildren = true } = {}) {
await this.queue.waitUntilReady();
const queue = this.queue;
const job = this;
const removed = await this.scripts.remove(job.id, removeChildren);
if (removed) {
queue.emit('removed', job);
}
else {
throw new Error(`Job ${this.id} could not be removed because it is locked by another worker`);
}
}
/**
* Extend the lock for this job.
*
* @param token - unique token for the lock
* @param duration - lock duration in milliseconds
*/
extendLock(token, duration) {
return this.scripts.extendLock(this.id, token, duration);
}
/**
* Moves a job to the completed queue.
* Returned job to be used with Queue.prototype.nextJobFromJobData.
*
* @param returnValue - The jobs success message.
* @param token - Worker token used to acquire completed job.
* @param fetchNext - True when wanting to fetch the next job.
* @returns Returns the jobData of the next job in the waiting queue.
*/
async moveToCompleted(returnValue, token, fetchNext = true) {
await this.queue.waitUntilReady();
this.returnvalue = returnValue || void 0;
const stringifiedReturnValue = (0, utils_1.tryCatch)(JSON.stringify, JSON, [
returnValue,
]);
if (stringifiedReturnValue === utils_1.errorObject) {
throw utils_1.errorObject.value;
}
const args = this.scripts.moveToCompletedArgs(this, stringifiedReturnValue, this.opts.removeOnComplete, token, fetchNext);
const result = await this.scripts.moveToFinished(this.id, args);
this.finishedOn = args[14];
return result;
}
/**
* Moves a job to the failed queue.
*
* @param err - the jobs error message.
* @param token - token to check job is locked by current worker
* @param fetchNext - true when wanting to fetch the next job
* @returns void
*/
async moveToFailed(err, token, fetchNext = false) {
const client = await this.queue.client;
const message = err === null || err === void 0 ? void 0 : err.message;
const queue = this.queue;
this.failedReason = message;
let command;
const multi = client.multi();
this.saveStacktrace(multi, err);
//
// Check if an automatic retry should be performed
//
let moveToFailed = false;
let finishedOn, delay;
if (this.attemptsMade < this.opts.attempts &&
!this.discarded &&
!(err instanceof unrecoverable_error_1.UnrecoverableError || err.name == 'UnrecoverableError')) {
const opts = queue.opts;
// Check if backoff is needed
delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade, err, this, opts.settings && opts.settings.backoffStrategy);
if (delay === -1) {
moveToFailed = true;
}
else if (delay) {
const args = this.scripts.moveToDelayedArgs(this.id, Date.now() + delay, token, delay);
this.scripts.execCommand(multi, 'moveToDelayed', args);
command = 'delayed';
}
else {
// Retry immediately
this.scripts.execCommand(multi, 'retryJob', this.scripts.retryJobArgs(this.id, this.opts.lifo, token));
command = 'retryJob';
}
}
else {
// If not, move to failed
moveToFailed = true;
}
if (moveToFailed) {
const args = this.scripts.moveToFailedArgs(this, message, this.opts.removeOnFail, token, fetchNext);
this.scripts.execCommand(multi, 'moveToFinished', args);
finishedOn = args[14];
command = 'failed';
}
const results = await multi.exec();
const anyError = results.find(result => result[0]);
if (anyError) {
throw new Error(`Error "moveToFailed" with command ${command}: ${anyError}`);
}
const code = results[results.length - 1][1];
if (code < 0) {
throw this.scripts.finishedErrors(code, this.id, command, 'active');
}
if (finishedOn && typeof finishedOn === 'number') {
this.finishedOn = finishedOn;
}
if (delay && typeof delay === 'number') {
this.delay = delay;
}
}
/**
* @returns true if the job has completed.
*/
isCompleted() {
return this.isInZSet('completed');
}
/**
* @returns true if the job has failed.
*/
isFailed() {
return this.isInZSet('failed');
}
/**
* @returns true if the job is delayed.
*/
isDelayed() {
return this.isInZSet('delayed');
}
/**
* @returns true if the job is waiting for children.
*/
isWaitingChildren() {
return this.isInZSet('waiting-children');
}
/**
* @returns true of the job is active.
*/
isActive() {
return this.isInList('active');
}
/**
* @returns true if the job is waiting.
*/
async isWaiting() {
return (await this.isInList('wait')) || (await this.isInList('paused'));
}
/**
* @returns the queue name this job belongs to.
*/
get queueName() {
return this.queue.name;
}
/**
* @returns the prefix that is used.
*/
get prefix() {
return this.queue.opts.prefix;
}
/**
* Get current state.
*
* @returns Returns one of these values:
* 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
*/
getState() {
return this.scripts.getState(this.id);
}
/**
* Change delay of a delayed job.
*
* @param delay - milliseconds to be added to current time.
* @returns void
*/
async changeDelay(delay) {
await this.scripts.changeDelay(this.id, delay);
this.delay = delay;
}
/**
* Change job priority.
*
* @returns void
*/
async changePriority(opts) {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
}
/**
* Get this jobs children result values if any.
*
* @returns Object mapping children job keys with their values.
*/
async getChildrenValues() {
const client = await this.queue.client;
const result = (await client.hgetall(this.toKey(`${this.id}:processed`)));
if (result) {
return (0, utils_1.parseObjectValues)(result);
}
}
/**
* Get children job keys if this job is a parent and has children.
* @remarks
* Count options before Redis v7.2 works as expected with any quantity of entries
* on processed/unprocessed dependencies, since v7.2 you must consider that count
* won't have any effect until processed/unprocessed dependencies have a length
* greater than 127
* @see https://redis.io/docs/management/optimization/memory-optimization/#redis--72
* @returns dependencies separated by processed and unprocessed.
*/
async getDependencies(opts = {}) {
const client = await this.queue.client;
const multi = client.multi();
if (!opts.processed && !opts.unprocessed) {
multi.hgetall(this.toKey(`${this.id}:processed`));
multi.smembers(this.toKey(`${this.id}:dependencies`));
const [[err1, processed], [err2, unprocessed]] = (await multi.exec());
const transformedProcessed = (0, utils_1.parseObjectValues)(processed);
return { processed: transformedProcessed, unprocessed };
}
else {
const defaultOpts = {
cursor: 0,
count: 20,
};
if (opts.processed) {
const processedOpts = Object.assign(Object.assign({}, defaultOpts), opts.processed);
multi.hscan(this.toKey(`${this.id}:processed`), processedOpts.cursor, 'COUNT', processedOpts.count);
}
if (opts.unprocessed) {
const unprocessedOpts = Object.assign(Object.assign({}, defaultOpts), opts.unprocessed);
multi.sscan(this.toKey(`${this.id}:dependencies`), unprocessedOpts.cursor, 'COUNT', unprocessedOpts.count);
}
const [result1, result2] = (await multi.exec());
const [processedCursor, processed = []] = opts.processed
? result1[1]
: [];
const [unprocessedCursor, unprocessed = []] = opts.unprocessed
? opts.processed
? result2[1]
: result1[1]
: [];
const transformedProcessed = {};
for (let index = 0; index < processed.length; ++index) {
if (index % 2) {
transformedProcessed[processed[index - 1]] = JSON.parse(processed[index]);
}
}
return Object.assign(Object.assign({}, (processedCursor
? {
processed: transformedProcessed,
nextProcessedCursor: Number(processedCursor),
}
: {})), (unprocessedCursor
? { unprocessed, nextUnprocessedCursor: Number(unprocessedCursor) }
: {}));
}
}
/**
* Get children job counts if this job is a parent and has children.
*
* @returns dependencies count separated by processed and unprocessed.
*/
async getDependenciesCount(opts = {}) {
const client = await this.queue.client;
const multi = client.multi();
const updatedOpts = !opts.processed && !opts.unprocessed
? { processed: true, unprocessed: true }
: opts;
if (updatedOpts.processed) {
multi.hlen(this.toKey(`${this.id}:processed`));
}
if (updatedOpts.unprocessed) {
multi.scard(this.toKey(`${this.id}:dependencies`));
}
const [[err1, result1] = [], [err2, result2] = []] = (await multi.exec());
const processed = updatedOpts.processed ? result1 : undefined;
const unprocessed = updatedOpts.unprocessed
? updatedOpts.processed
? result2
: result1
: undefined;
return Object.assign(Object.assign({}, (updatedOpts.processed
? {
processed,
}
: {})), (updatedOpts.unprocessed ? { unprocessed } : {}));
}
/**
* Returns a promise the resolves when the job has completed (containing the return value of the job),
* or rejects when the job has failed (containing the failedReason).
*
* @param queueEvents - Instance of QueueEvents.
* @param ttl - Time in milliseconds to wait for job to finish before timing out.
*/
async waitUntilFinished(queueEvents, ttl) {
await this.queue.waitUntilReady();
const jobId = this.id;
return new Promise(async (resolve, reject) => {
let timeout;
if (ttl) {
timeout = setTimeout(() => onFailed(
/* eslint-disable max-len */
`Job wait ${this.name} timed out before finishing, no finish notification arrived after ${ttl}ms (id=${jobId})`), ttl);
}
function onCompleted(args) {
removeListeners();
resolve(args.returnvalue);
}
function onFailed(args) {
removeListeners();
reject(new Error(args.failedReason || args));
}
const completedEvent = `completed:${jobId}`;
const failedEvent = `failed:${jobId}`;
queueEvents.on(completedEvent, onCompleted);
queueEvents.on(failedEvent, onFailed);
this.queue.on('closing', onFailed);
const removeListeners = () => {
clearInterval(timeout);
queueEvents.removeListener(completedEvent, onCompleted);
queueEvents.removeListener(failedEvent, onFailed);
this.queue.removeListener('closing', onFailed);
};
// Poll once right now to see if the job has already finished. The job may have been completed before we were able
// to register the event handlers on the QueueEvents, so we check here to make sure we're not waiting for an event
// that has already happened. We block checking the job until the queue events object is actually listening to
// Redis so there's no chance that it will miss events.
await queueEvents.waitUntilReady();
const [status, result] = (await this.scripts.isFinished(jobId, true));
const finished = status != 0;
if (finished) {
if (status == -1 || status == 2) {
onFailed({ failedReason: result });
}
else {
onCompleted({ returnvalue: getReturnValue(result) });
}
}
});
}
/**
* Moves the job to the delay set.
*
* @param timestamp - timestamp where the job should be moved back to "wait"
* @param token - token to check job is locked by current worker
* @returns
*/
moveToDelayed(timestamp, token) {
const delay = timestamp - Date.now();
return this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token);
}
/**
* Moves the job to the waiting-children set.
*
* @param token - Token to check job is locked by current worker
* @param opts - The options bag for moving a job to waiting-children.
* @returns true if the job was moved
*/
moveToWaitingChildren(token, opts = {}) {
return this.scripts.moveToWaitingChildren(this.id, token, opts);
}
/**
* Promotes a delayed job so that it starts to be processed as soon as possible.
*/
async promote() {
const jobId = this.id;
await this.scripts.promote(jobId);
this.delay = 0;
}
/**
* Attempts to retry the job. Only a job that has failed or completed can be retried.
*
* @param state - completed / failed
* @returns If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
retry(state = 'failed') {
this.failedReason = null;
this.finishedOn = null;
this.processedOn = null;
this.returnvalue = null;
return this.scripts.reprocessJob(this, state);
}
/**
* Marks a job to not be retried if it fails (even if attempts has been configured)
*/
discard() {
this.discarded = true;
}
async isInZSet(set) {
const client = await this.queue.client;
const score = await client.zscore(this.queue.toKey(set), this.id);
return score !== null;
}
async isInList(list) {
return this.scripts.isJobInList(this.queue.toKey(list), this.id);
}
/**
* Adds the job to Redis.
*
* @param client -
* @param parentOpts -
* @returns
*/
addJob(client, parentOpts) {
const jobData = this.asJSON();
this.validateOptions(jobData);
return this.scripts.addJob(client, jobData, jobData.opts, this.id, parentOpts);
}
validateOptions(jobData) {
var _a;
const exceedLimit = this.opts.sizeLimit &&
(0, utils_1.lengthInUtf8Bytes)(jobData.data) > this.opts.sizeLimit;
if (exceedLimit) {
throw new Error(`The size of job ${this.name} exceeds the limit ${this.opts.sizeLimit} bytes`);
}
if (this.opts.delay && this.opts.repeat && !((_a = this.opts.repeat) === null || _a === void 0 ? void 0 : _a.count)) {
throw new Error(`Delay and repeat options could not be used together`);
}
if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) {
throw new Error(`RemoveDependencyOnFailure and failParentOnFailure options can not be used together`);
}
if (`${parseInt(this.id, 10)}` === this.id) {
//TODO: throw an error in next breaking change
console.warn('Custom Ids should not be integers: https://github.com/taskforcesh/bullmq/pull/1569');
}
if (this.opts.priority) {
if (Math.trunc(this.opts.priority) !== this.opts.priority) {
throw new Error(`Priority should not be float`);
}
if (this.opts.priority > exports.PRIORITY_LIMIT) {
throw new Error(`Priority should be between 0 and ${exports.PRIORITY_LIMIT}`);
}
}
}
saveStacktrace(multi, err) {
this.stacktrace = this.stacktrace || [];
if (err === null || err === void 0 ? void 0 : err.stack) {
this.stacktrace.push(err.stack);
if (this.opts.stackTraceLimit) {
this.stacktrace = this.stacktrace.slice(0, this.opts.stackTraceLimit);
}
}
const args = this.scripts.saveStacktraceArgs(this.id, JSON.stringify(this.stacktrace), err === null || err === void 0 ? void 0 : err.message);
this.scripts.execCommand(multi, 'saveStacktrace', args);
}
}
exports.Job = Job;
function getTraces(stacktrace) {
const traces = (0, utils_1.tryCatch)(JSON.parse, JSON, [stacktrace]);
if (traces === utils_1.errorObject || !(traces instanceof Array)) {
return [];
}
else {
return traces;
}
}
function getReturnValue(_value) {
const value = (0, utils_1.tryCatch)(JSON.parse, JSON, [_value]);
if (value !== utils_1.errorObject) {
return value;
}
else {
logger('corrupted returnvalue: ' + _value, value);
}
}
//# sourceMappingURL=job.js.map