"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.Queue = void 0; const lodash_1 = require("lodash"); const uuid_1 = require("uuid"); const job_1 = require("./job"); const queue_getters_1 = require("./queue-getters"); const repeat_1 = require("./repeat"); const version_1 = require("../version"); /** * Queue * * This class provides methods to add jobs to a queue and some othe high-level * administration such as pausing or deleting queues. * */ class Queue extends queue_getters_1.QueueGetters { constructor(name, opts, Connection) { var _a; super(name, Object.assign({ blockingConnection: false }, opts), Connection); this.token = (0, uuid_1.v4)(); this.libName = 'bullmq'; this.jobsOpts = (_a = (0, lodash_1.get)(opts, 'defaultJobOptions')) !== null && _a !== void 0 ? _a : {}; this.waitUntilReady() .then(client => { if (!this.closing && !(opts === null || opts === void 0 ? void 0 : opts.skipMetasUpdate)) { return client.hmset(this.keys.meta, this.metaValues); } }) .catch(err => { // We ignore this error to avoid warnings. The error can still // be received by listening to event 'error' }); } emit(event, ...args) { return super.emit(event, ...args); } off(eventName, listener) { super.off(eventName, listener); return this; } on(event, listener) { super.on(event, listener); return this; } once(event, listener) { super.once(event, listener); return this; } /** * Returns this instance current default job options. */ get defaultJobOptions() { return Object.assign({}, this.jobsOpts); } get metaValues() { var _a, _b, _c, _d; return { 'opts.maxLenEvents': (_d = (_c = (_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.streams) === null || _b === void 0 ? void 0 : _b.events) === null || _c === void 0 ? void 0 : _c.maxLen) !== null && _d !== void 0 ? _d : 10000, version: `${this.libName}:${version_1.version}`, }; } /** * Get library version. * * @returns the content of the meta.library field. */ async getVersion() { const client = await this.client; return await client.hget(this.keys.meta, 'version'); } get repeat() { return new Promise(async (resolve) => { if (!this._repeat) { this._repeat = new repeat_1.Repeat(this.name, Object.assign(Object.assign({}, this.opts), { connection: await this.client })); this._repeat.on('error', e => this.emit.bind(this, e)); } resolve(this._repeat); }); } /** * Adds a new job to the queue. * * @param name - Name of the job to be added to the queue,. * @param data - Arbitrary data to append to the job. * @param opts - Job options that affects how the job is going to be processed. */ async add(name, data, opts) { if (opts && opts.repeat) { return (await this.repeat).addNextRepeatableJob(name, data, Object.assign(Object.assign({}, this.jobsOpts), opts), true); } else { const jobId = opts === null || opts === void 0 ? void 0 : opts.jobId; if (jobId == '0' || (jobId === null || jobId === void 0 ? void 0 : jobId.startsWith('0:'))) { throw new Error("JobId cannot be '0' or start with 0:"); } const job = await this.Job.create(this, name, data, Object.assign(Object.assign(Object.assign({}, this.jobsOpts), opts), { jobId })); this.emit('waiting', job); return job; } } /** * Adds an array of jobs to the queue. This method may be faster than adding * one job at a time in a sequence. * * @param jobs - The array of jobs to add to the queue. Each job is defined by 3 * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. */ addBulk(jobs) { return this.Job.createBulk(this, jobs.map(job => { var _a; return ({ name: job.name, data: job.data, opts: Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId }), }); })); } /** * Pauses the processing of this queue globally. * * We use an atomic RENAME operation on the wait queue. Since * we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue * is renamed to 'paused', no new jobs will be processed (the current ones * will run until finalized). * * Adding jobs requires a LUA script to check first if the paused list exist * and in that case it will add it there instead of the wait list. */ async pause() { await this.scripts.pause(true); this.emit('paused'); } /** * Close the queue instance. * */ async close() { if (!this.closing) { if (this._repeat) { await this._repeat.close(); } } return super.close(); } /** * Resumes the processing of this queue globally. * * The method reverses the pause operation by resuming the processing of the * queue. */ async resume() { await this.scripts.pause(false); this.emit('resumed'); } /** * Returns true if the queue is currently paused. */ async isPaused() { const client = await this.client; const pausedKeyExists = await client.hexists(this.keys.meta, 'paused'); return pausedKeyExists === 1; } /** * Get all repeatable meta jobs. * * @param start - Offset of first job to return. * @param end - Offset of last job to return. * @param asc - Determine the order in which jobs are returned based on their * next execution time. */ async getRepeatableJobs(start, end, asc) { return (await this.repeat).getRepeatableJobs(start, end, asc); } /** * Removes a repeatable job. * * Note: you need to use the exact same repeatOpts when deleting a repeatable job * than when adding it. * * @see removeRepeatableByKey * * @param name - job name * @param repeatOpts - * @param jobId - * @returns */ async removeRepeatable(name, repeatOpts, jobId) { const repeat = await this.repeat; const removed = await repeat.removeRepeatable(name, repeatOpts, jobId); return !removed; } /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations * themselves. You can use "getRepeatableJobs" in order to get the keys. * * @see getRepeatableJobs * * @param repeatJobKey - to the repeatable job. * @returns */ async removeRepeatableByKey(key) { const repeat = await this.repeat; const removed = await repeat.removeRepeatableByKey(key); return !removed; } /** * Removes the given job from the queue as well as all its * dependencies. * * @param jobId - The id of the job to remove * @param opts - Options to remove a job * @returns 1 if it managed to remove the job or 0 if the job or * any of its dependencies were locked. */ remove(jobId, { removeChildren = true } = {}) { return this.scripts.remove(jobId, removeChildren); } /** * Updates the given job's progress. * * @param jobId - The id of the job to update * @param progress - number or object to be saved as progress. */ async updateJobProgress(jobId, progress) { return this.scripts.updateProgress(jobId, progress); } /** * Logs one row of job's log data. * * @param jobId - The job id to log against. * @param logRow - string with log data to be logged. * @param keepLogs - max number of log entries to keep (0 for unlimited). * * @returns The total number of log entries for this job so far. */ async addJobLog(jobId, logRow, keepLogs) { return job_1.Job.addJobLog(this, jobId, logRow, keepLogs); } /** * Drains the queue, i.e., removes all jobs that are waiting * or delayed, but not active, completed or failed. * * @param delayed - Pass true if it should also clean the * delayed jobs. */ drain(delayed = false) { return this.scripts.drain(delayed); } /** * Cleans jobs from a queue. Similar to drain but keeps jobs within a certain * grace period. * * @param grace - The grace period * @param limit - Max number of jobs to clean * @param type - The type of job to clean * Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed. * @returns Id jobs from the deleted records */ async clean(grace, limit, type = 'completed') { const maxCount = limit || Infinity; const maxCountPerCall = Math.min(10000, maxCount); const timestamp = Date.now() - grace; let deletedCount = 0; const deletedJobsIds = []; while (deletedCount < maxCount) { const jobsIds = await this.scripts.cleanJobsInSet(type, timestamp, maxCountPerCall); this.emit('cleaned', jobsIds, type); deletedCount += jobsIds.length; deletedJobsIds.push(...jobsIds); if (jobsIds.length < maxCountPerCall) { break; } } return deletedJobsIds; } /** * Completely destroys the queue and all of its contents irreversibly. * This method will the *pause* the queue and requires that there are no * active jobs. It is possible to bypass this requirement, i.e. not * having active jobs using the "force" option. * * Note: This operation requires to iterate on all the jobs stored in the queue * and can be slow for very large queues. * * @param opts - Obliterate options. */ async obliterate(opts) { await this.pause(); let cursor = 0; do { cursor = await this.scripts.obliterate(Object.assign({ force: false, count: 1000 }, opts)); } while (cursor); } /** * Retry all the failed jobs. * * @param opts: { count: number; state: FinishedStatus; timestamp: number} * - count number to limit how many jobs will be moved to wait status per iteration, * - state failed by default or completed. * - timestamp from which timestamp to start moving jobs to wait status, default Date.now(). * * @returns */ async retryJobs(opts = {}) { let cursor = 0; do { cursor = await this.scripts.retryJobs(opts.state, opts.count, opts.timestamp); } while (cursor); } /** * Promote all the delayed jobs. * * @param opts: { count: number } * - count number to limit how many jobs will be moved to wait status per iteration * * @returns */ async promoteJobs(opts = {}) { let cursor = 0; do { cursor = await this.scripts.promoteJobs(opts.count); } while (cursor); } /** * Trim the event stream to an approximately maxLength. * * @param maxLength - */ async trimEvents(maxLength) { const client = await this.client; return client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength); } /** * Delete old priority helper key. */ async removeDeprecatedPriorityKey() { const client = await this.client; return client.del(this.toKey('priority')); } } exports.Queue = Queue; //# sourceMappingURL=queue.js.map