"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.FlowProducer = void 0; const events_1 = require("events"); const lodash_1 = require("lodash"); const uuid_1 = require("uuid"); const utils_1 = require("../utils"); const job_1 = require("./job"); const queue_keys_1 = require("./queue-keys"); const redis_connection_1 = require("./redis-connection"); /** * This class allows to add jobs with dependencies between them in such * a way that it is possible to build complex flows. * Note: A flow is a tree-like structure of jobs that depend on each other. * Whenever the children of a given parent are completed, the parent * will be processed, being able to access the children's result data. * All Jobs can be in different queues, either children or parents, */ class FlowProducer extends events_1.EventEmitter { constructor(opts = {}, Connection = redis_connection_1.RedisConnection) { super(); this.opts = opts; this.opts = Object.assign({ prefix: 'bull' }, opts); if (!opts.connection) { console.warn([ 'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer', 'without providing explicitly a connection or connection options is deprecated. This behaviour will', 'be removed in the next major release', ].join(' ')); } this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts === null || opts === void 0 ? void 0 : opts.connection), false, opts.skipVersionCheck); this.connection.on('error', (error) => this.emit('error', error)); this.connection.on('close', () => { if (!this.closing) { this.emit('ioredis:close'); } }); this.queueKeys = new queue_keys_1.QueueKeys(opts.prefix); } emit(event, ...args) { return super.emit(event, ...args); } off(eventName, listener) { super.off(eventName, listener); return this; } on(event, listener) { super.on(event, listener); return this; } once(event, listener) { super.once(event, listener); return this; } /** * Returns a promise that resolves to a redis client. Normally used only by subclasses. */ get client() { return this.connection.client; } /** * Helper to easily extend Job class calls. */ get Job() { return job_1.Job; } waitUntilReady() { return this.client; } /** * Adds a flow. * * This call would be atomic, either it fails and no jobs will * be added to the queues, or it succeeds and all jobs will be added. * * @param flow - an object with a tree-like structure where children jobs * will be processed before their parents. * @param opts - options that will be applied to the flow object. */ async add(flow, opts) { var _a; if (this.closing) { return; } const client = await this.connection.client; const multi = client.multi(); const parentOpts = (_a = flow === null || flow === void 0 ? void 0 : flow.opts) === null || _a === void 0 ? void 0 : _a.parent; const parentKey = (0, utils_1.getParentKey)(parentOpts); const parentDependenciesKey = parentKey ? `${parentKey}:dependencies` : undefined; const jobsTree = this.addNode({ multi, node: flow, queuesOpts: opts === null || opts === void 0 ? void 0 : opts.queuesOptions, parent: { parentOpts, parentDependenciesKey, }, }); await multi.exec(); return jobsTree; } /** * Get a flow. * * @param opts - an object with options for getting a JobNode. */ async getFlow(opts) { if (this.closing) { return; } const client = await this.connection.client; const updatedOpts = Object.assign({ depth: 10, maxChildren: 20, }, opts); const jobsTree = this.getNode(client, updatedOpts); return jobsTree; } /** * Adds multiple flows. * * A flow is a tree-like structure of jobs that depend on each other. * Whenever the children of a given parent are completed, the parent * will be processed, being able to access the children's result data. * * All Jobs can be in different queues, either children or parents, * however this call would be atomic, either it fails and no jobs will * be added to the queues, or it succeeds and all jobs will be added. * * @param flows - an array of objects with a tree-like structure where children jobs * will be processed before their parents. */ async addBulk(flows) { if (this.closing) { return; } const client = await this.connection.client; const multi = client.multi(); const jobsTrees = this.addNodes(multi, flows); await multi.exec(); return jobsTrees; } /** * Add a node (job) of a flow to the queue. This method will recursively * add all its children as well. Note that a given job can potentially be * a parent and a child job at the same time depending on where it is located * in the tree hierarchy. * * @param multi - ioredis ChainableCommander * @param node - the node representing a job to be added to some queue * @param parent - parent data sent to children to create the "links" to their parent * @returns */ addNode({ multi, node, parent, queuesOpts }) { var _a; const prefix = node.prefix || this.opts.prefix; const queue = this.queueFromNode(node, new queue_keys_1.QueueKeys(prefix), prefix); const queueOpts = queuesOpts && queuesOpts[node.queueName]; const jobsOpts = (0, lodash_1.get)(queueOpts, 'defaultJobOptions'); const jobId = ((_a = node.opts) === null || _a === void 0 ? void 0 : _a.jobId) || (0, uuid_1.v4)(); const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, (jobsOpts ? jobsOpts : {})), node.opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts }), jobId); const parentKey = (0, utils_1.getParentKey)(parent === null || parent === void 0 ? void 0 : parent.parentOpts); if (node.children && node.children.length > 0) { // Create parent job, will be a job in status "waiting-children". const parentId = jobId; const queueKeysParent = new queue_keys_1.QueueKeys(node.prefix || this.opts.prefix); const waitChildrenKey = queueKeysParent.toKey(node.queueName, 'waiting-children'); job.addJob(multi, { parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey, waitChildrenKey, parentKey, }); const parentDependenciesKey = `${queueKeysParent.toKey(node.queueName, parentId)}:dependencies`; const children = this.addChildren({ multi, nodes: node.children, parent: { parentOpts: { id: parentId, queue: queueKeysParent.getQueueQualifiedName(node.queueName), }, parentDependenciesKey, }, queuesOpts, }); return { job, children }; } else { job.addJob(multi, { parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey, parentKey, }); return { job }; } } /** * Adds nodes (jobs) of multiple flows to the queue. This method will recursively * add all its children as well. Note that a given job can potentially be * a parent and a child job at the same time depending on where it is located * in the tree hierarchy. * * @param multi - ioredis ChainableCommander * @param nodes - the nodes representing jobs to be added to some queue * @returns */ addNodes(multi, nodes) { return nodes.map(node => { var _a; const parentOpts = (_a = node === null || node === void 0 ? void 0 : node.opts) === null || _a === void 0 ? void 0 : _a.parent; const parentKey = (0, utils_1.getParentKey)(parentOpts); const parentDependenciesKey = parentKey ? `${parentKey}:dependencies` : undefined; return this.addNode({ multi, node, parent: { parentOpts, parentDependenciesKey, }, }); }); } async getNode(client, node) { const queue = this.queueFromNode(node, new queue_keys_1.QueueKeys(node.prefix), node.prefix); const job = await this.Job.fromId(queue, node.id); if (job) { const { processed = {}, unprocessed = [] } = await job.getDependencies({ processed: { count: node.maxChildren, }, unprocessed: { count: node.maxChildren, }, }); const processedKeys = Object.keys(processed); const childrenCount = processedKeys.length + unprocessed.length; const newDepth = node.depth - 1; if (childrenCount > 0 && newDepth) { const children = await this.getChildren(client, [...processedKeys, ...unprocessed], newDepth, node.maxChildren); return { job, children }; } else { return { job }; } } } addChildren({ multi, nodes, parent, queuesOpts }) { return nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })); } getChildren(client, childrenKeys, depth, maxChildren) { const getChild = (key) => { const [prefix, queueName, id] = key.split(':'); return this.getNode(client, { id, queueName, prefix, depth, maxChildren, }); }; return Promise.all([...childrenKeys.map(getChild)]); } /** * Helper factory method that creates a queue-like object * required to create jobs in any queue. * * @param node - * @param queueKeys - * @returns */ queueFromNode(node, queueKeys, prefix) { return { client: this.connection.client, name: node.queueName, keys: queueKeys.getKeys(node.queueName), toKey: (type) => queueKeys.toKey(node.queueName, type), opts: { prefix }, qualifiedName: queueKeys.getQueueQualifiedName(node.queueName), closing: this.closing, waitUntilReady: async () => this.connection.client, removeListener: this.removeListener.bind(this), emit: this.emit.bind(this), on: this.on.bind(this), redisVersion: this.connection.redisVersion, }; } /** * * Closes the connection and returns a promise that resolves when the connection is closed. */ async close() { if (!this.closing) { this.closing = this.connection.close(); } await this.closing; } /** * * Force disconnects a connection. */ disconnect() { return this.connection.disconnect(); } } exports.FlowProducer = FlowProducer; //# sourceMappingURL=flow-producer.js.map