diff --git a/packages/backend/src/workers/action.js b/packages/backend/src/workers/action.js index 3159a7d6..b0728059 100644 --- a/packages/backend/src/workers/action.js +++ b/packages/backend/src/workers/action.js @@ -1,76 +1,6 @@ -import { Worker } from 'bullmq'; +import { generateWorker } from './worker.js'; +import { executeActionJob } from '../jobs/execute-action.js'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import Step from '../models/step.js'; -import actionQueue from '../queues/action.js'; -import { processAction } from '../services/action.js'; -import { - REMOVE_AFTER_30_DAYS_OR_150_JOBS, - REMOVE_AFTER_7_DAYS_OR_50_JOBS, -} from '../helpers/remove-job-configuration.js'; -import delayAsMilliseconds from '../helpers/delay-as-milliseconds.js'; - -const DEFAULT_DELAY_DURATION = 0; - -const actionWorker = new Worker( - 'action', - async (job) => { - const { stepId, flowId, executionId, computedParameters, executionStep } = - await processAction(job.data); - - if (executionStep.isFailed) return; - - const step = await Step.query().findById(stepId).throwIfNotFound(); - const nextStep = await step.getNextStep(); - - if (!nextStep) return; - - const jobName = `${executionId}-${nextStep.id}`; - - const jobPayload = { - flowId, - executionId, - stepId: nextStep.id, - }; - - const jobOptions = { - removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, - removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, - delay: DEFAULT_DELAY_DURATION, - }; - - if (step.appKey === 'delay') { - jobOptions.delay = delayAsMilliseconds(step.key, computedParameters); - } - - if (step.appKey === 'filter' && !executionStep.dataOut) { - return; - } - - await actionQueue.add(jobName, jobPayload, jobOptions); - }, - { connection: redisConfig } -); - -actionWorker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); -}); - -actionWorker.on('failed', (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} - \n ${err.stack} - `; - - logger.error(errorMessage); - - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); +const actionWorker = generateWorker('action', executeActionJob); export default actionWorker; diff --git a/packages/backend/src/workers/delete-user.ee.js b/packages/backend/src/workers/delete-user.ee.js index 9081df20..c47093b9 100644 --- a/packages/backend/src/workers/delete-user.ee.js +++ b/packages/backend/src/workers/delete-user.ee.js @@ -1,69 +1,6 @@ -import { Worker } from 'bullmq'; +import { generateWorker } from './worker.js'; +import { deleteUserJob } from '../jobs/delete-user.ee.js'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import appConfig from '../config/app.js'; -import User from '../models/user.js'; -import ExecutionStep from '../models/execution-step.js'; - -const deleteUserWorker = new Worker( - 'delete-user', - async (job) => { - const { id } = job.data; - - const user = await User.query() - .withSoftDeleted() - .findById(id) - .throwIfNotFound(); - - const executionIds = ( - await user - .$relatedQuery('executions') - .withSoftDeleted() - .select('executions.id') - ).map((execution) => execution.id); - - await ExecutionStep.query() - .withSoftDeleted() - .whereIn('execution_id', executionIds) - .hardDelete(); - await user.$relatedQuery('executions').withSoftDeleted().hardDelete(); - await user.$relatedQuery('steps').withSoftDeleted().hardDelete(); - await user.$relatedQuery('flows').withSoftDeleted().hardDelete(); - await user.$relatedQuery('connections').withSoftDeleted().hardDelete(); - await user.$relatedQuery('identities').withSoftDeleted().hardDelete(); - - if (appConfig.isCloud) { - await user.$relatedQuery('subscriptions').withSoftDeleted().hardDelete(); - await user.$relatedQuery('usageData').withSoftDeleted().hardDelete(); - } - - await user.$relatedQuery('accessTokens').withSoftDeleted().hardDelete(); - await user.$query().withSoftDeleted().hardDelete(); - }, - { connection: redisConfig } -); - -deleteUserWorker.on('completed', (job) => { - logger.info( - `JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!` - ); -}); - -deleteUserWorker.on('failed', (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message} - \n ${err.stack} - `; - - logger.error(errorMessage); - - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); +const deleteUserWorker = generateWorker('delete-user', deleteUserJob); export default deleteUserWorker; diff --git a/packages/backend/src/workers/email.js b/packages/backend/src/workers/email.js index 92bf0367..4cd2c1bb 100644 --- a/packages/backend/src/workers/email.js +++ b/packages/backend/src/workers/email.js @@ -1,62 +1,6 @@ -import { Worker } from 'bullmq'; +import { generateWorker } from './worker.js'; +import { sendEmailJob } from '../jobs/send-email.js'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import mailer from '../helpers/mailer.ee.js'; -import compileEmail from '../helpers/compile-email.ee.js'; -import appConfig from '../config/app.js'; - -const isCloudSandbox = () => { - return appConfig.isCloud && !appConfig.isProd; -}; - -const isAutomatischEmail = (email) => { - return email.endsWith('@automatisch.io'); -}; - -const emailWorker = new Worker( - 'email', - async (job) => { - const { email, subject, template, params } = job.data; - - if (isCloudSandbox() && !isAutomatischEmail(email)) { - logger.info( - 'Only Automatisch emails are allowed for non-production environments!' - ); - - return; - } - - await mailer.sendMail({ - to: email, - from: appConfig.fromEmail, - subject: subject, - html: compileEmail(template, params), - }); - }, - { connection: redisConfig } -); - -emailWorker.on('completed', (job) => { - logger.info( - `JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!` - ); -}); - -emailWorker.on('failed', (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message} - \n ${err.stack} - `; - - logger.error(errorMessage); - - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); +const emailWorker = generateWorker('email', sendEmailJob); export default emailWorker; diff --git a/packages/backend/src/workers/flow.js b/packages/backend/src/workers/flow.js index 8c08d5e1..7b04bef2 100644 --- a/packages/backend/src/workers/flow.js +++ b/packages/backend/src/workers/flow.js @@ -1,97 +1,6 @@ -import { Worker } from 'bullmq'; +import { generateWorker } from './worker.js'; +import { executeFlowJob } from '../jobs/execute-flow.js'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import flowQueue from '../queues/flow.js'; -import triggerQueue from '../queues/trigger.js'; -import { processFlow } from '../services/flow.js'; -import Flow from '../models/flow.js'; -import { - REMOVE_AFTER_30_DAYS_OR_150_JOBS, - REMOVE_AFTER_7_DAYS_OR_50_JOBS, -} from '../helpers/remove-job-configuration.js'; - -const flowWorker = new Worker( - 'flow', - async (job) => { - const { flowId } = job.data; - - const flow = await Flow.query().findById(flowId).throwIfNotFound(); - const user = await flow.$relatedQuery('user'); - const allowedToRunFlows = await user.isAllowedToRunFlows(); - - if (!allowedToRunFlows) { - return; - } - - const triggerStep = await flow.getTriggerStep(); - - const { data, error } = await processFlow({ flowId }); - - const reversedData = data.reverse(); - - const jobOptions = { - removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, - removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, - }; - - for (const triggerItem of reversedData) { - const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; - - const jobPayload = { - flowId, - stepId: triggerStep.id, - triggerItem, - }; - - await triggerQueue.add(jobName, jobPayload, jobOptions); - } - - if (error) { - const jobName = `${triggerStep.id}-error`; - - const jobPayload = { - flowId, - stepId: triggerStep.id, - error, - }; - - await triggerQueue.add(jobName, jobPayload, jobOptions); - } - }, - { connection: redisConfig } -); - -flowWorker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); -}); - -flowWorker.on('failed', async (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} - \n ${err.stack} - `; - - logger.error(errorMessage); - - const flow = await Flow.query().findById(job.data.flowId); - - if (!flow) { - await flowQueue.removeRepeatableByKey(job.repeatJobKey); - - const flowNotFoundErrorMessage = ` - JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found! - `; - - logger.error(flowNotFoundErrorMessage); - } - - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); +const flowWorker = generateWorker('flow', executeFlowJob); export default flowWorker; diff --git a/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js b/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js index 6ee0ae17..83df0865 100644 --- a/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js +++ b/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js @@ -1,44 +1,9 @@ -import { Worker } from 'bullmq'; -import { DateTime } from 'luxon'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import Subscription from '../models/subscription.ee.js'; +import { generateWorker } from './worker.js'; +import { removeCancelledSubscriptionsJob } from '../jobs/remove-cancelled-subscriptions.ee.js'; -const removeCancelledSubscriptionsWorker = new Worker( +const removeCancelledSubscriptionsWorker = generateWorker( 'remove-cancelled-subscriptions', - async () => { - await Subscription.query() - .delete() - .where({ - status: 'deleted', - }) - .andWhere( - 'cancellation_effective_date', - '<=', - DateTime.now().startOf('day').toISODate() - ); - }, - { connection: redisConfig } + removeCancelledSubscriptionsJob ); -removeCancelledSubscriptionsWorker.on('completed', (job) => { - logger.info( - `JOB ID: ${job.id} - The cancelled subscriptions have been removed!` - ); -}); - -removeCancelledSubscriptionsWorker.on('failed', (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message} - \n ${err.stack} - `; - logger.error(errorMessage); - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); - export default removeCancelledSubscriptionsWorker; diff --git a/packages/backend/src/workers/trigger.js b/packages/backend/src/workers/trigger.js index 64056dd9..e25915fc 100644 --- a/packages/backend/src/workers/trigger.js +++ b/packages/backend/src/workers/trigger.js @@ -1,62 +1,6 @@ -import { Worker } from 'bullmq'; +import { generateWorker } from './worker.js'; +import { executeTriggerJob } from '../jobs/execute-trigger.js'; -import * as Sentry from '../helpers/sentry.ee.js'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; -import actionQueue from '../queues/action.js'; -import Step from '../models/step.js'; -import { processTrigger } from '../services/trigger.js'; -import { - REMOVE_AFTER_30_DAYS_OR_150_JOBS, - REMOVE_AFTER_7_DAYS_OR_50_JOBS, -} from '../helpers/remove-job-configuration.js'; - -const triggerWorker = new Worker( - 'trigger', - async (job) => { - const { flowId, executionId, stepId, executionStep } = await processTrigger( - job.data - ); - - if (executionStep.isFailed) return; - - const step = await Step.query().findById(stepId).throwIfNotFound(); - const nextStep = await step.getNextStep(); - const jobName = `${executionId}-${nextStep.id}`; - - const jobPayload = { - flowId, - executionId, - stepId: nextStep.id, - }; - - const jobOptions = { - removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, - removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, - }; - - await actionQueue.add(jobName, jobPayload, jobOptions); - }, - { connection: redisConfig } -); - -triggerWorker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); -}); - -triggerWorker.on('failed', (job, err) => { - const errorMessage = ` - JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} - \n ${err.stack} - `; - - logger.error(errorMessage); - - Sentry.captureException(err, { - extra: { - jobId: job.id, - }, - }); -}); +const triggerWorker = generateWorker('flow', executeTriggerJob); export default triggerWorker; diff --git a/packages/backend/src/workers/worker.js b/packages/backend/src/workers/worker.js new file mode 100644 index 00000000..5528a24a --- /dev/null +++ b/packages/backend/src/workers/worker.js @@ -0,0 +1,28 @@ +import { Worker } from 'bullmq'; + +import * as Sentry from '../helpers/sentry.ee.js'; +import redisConfig from '../config/redis.js'; +import logger from '../helpers/logger.js'; + +export const generateWorker = (workerName, job) => { + const worker = new Worker(workerName, job, { connection: redisConfig }); + + worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - has been successfully completed!`); + }); + + worker.on('failed', (job, err) => { + logger.error(` + JOB ID: ${job.id} - has failed to be completed! ${err.message} + \n ${err.stack} + `); + + Sentry.captureException(err, { + extra: { + jobId: job.id, + }, + }); + }); + + return worker; +};