diff --git a/packages/backend/src/jobs/delete-user.ee.js b/packages/backend/src/jobs/delete-user.ee.js new file mode 100644 index 00000000..a6d58f33 --- /dev/null +++ b/packages/backend/src/jobs/delete-user.ee.js @@ -0,0 +1,37 @@ +import appConfig from '../config/app.js'; +import User from '../models/user.js'; +import ExecutionStep from '../models/execution-step.js'; + +export const deleteUserJob = async (job) => { + const { id } = job.data; + + const user = await User.query() + .withSoftDeleted() + .findById(id) + .throwIfNotFound(); + + const executionIds = ( + await user + .$relatedQuery('executions') + .withSoftDeleted() + .select('executions.id') + ).map((execution) => execution.id); + + await ExecutionStep.query() + .withSoftDeleted() + .whereIn('execution_id', executionIds) + .hardDelete(); + await user.$relatedQuery('executions').withSoftDeleted().hardDelete(); + await user.$relatedQuery('steps').withSoftDeleted().hardDelete(); + await user.$relatedQuery('flows').withSoftDeleted().hardDelete(); + await user.$relatedQuery('connections').withSoftDeleted().hardDelete(); + await user.$relatedQuery('identities').withSoftDeleted().hardDelete(); + + if (appConfig.isCloud) { + await user.$relatedQuery('subscriptions').withSoftDeleted().hardDelete(); + await user.$relatedQuery('usageData').withSoftDeleted().hardDelete(); + } + + await user.$relatedQuery('accessTokens').withSoftDeleted().hardDelete(); + await user.$query().withSoftDeleted().hardDelete(); +}; diff --git a/packages/backend/src/jobs/execute-action.js b/packages/backend/src/jobs/execute-action.js new file mode 100644 index 00000000..2d283c11 --- /dev/null +++ b/packages/backend/src/jobs/execute-action.js @@ -0,0 +1,46 @@ +import Step from '../models/step.js'; +import actionQueue from '../queues/action.js'; +import { processAction } from '../services/action.js'; +import { + REMOVE_AFTER_30_DAYS_OR_150_JOBS, + REMOVE_AFTER_7_DAYS_OR_50_JOBS, +} from '../helpers/remove-job-configuration.js'; +import delayAsMilliseconds from '../helpers/delay-as-milliseconds.js'; + +const DEFAULT_DELAY_DURATION = 0; + +export const executeActionJob = async (job) => { + const { stepId, flowId, executionId, computedParameters, executionStep } = + await processAction(job.data); + + if (executionStep.isFailed) return; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + + if (!nextStep) return; + + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + delay: DEFAULT_DELAY_DURATION, + }; + + if (step.appKey === 'delay') { + jobOptions.delay = delayAsMilliseconds(step.key, computedParameters); + } + + if (step.appKey === 'filter' && !executionStep.dataOut) { + return; + } + + await actionQueue.add(jobName, jobPayload, jobOptions); +}; diff --git a/packages/backend/src/jobs/execute-flow.js b/packages/backend/src/jobs/execute-flow.js new file mode 100644 index 00000000..ac6e0634 --- /dev/null +++ b/packages/backend/src/jobs/execute-flow.js @@ -0,0 +1,54 @@ +import triggerQueue from '../queues/trigger.js'; +import { processFlow } from '../services/flow.js'; +import Flow from '../models/flow.js'; +import { + REMOVE_AFTER_30_DAYS_OR_150_JOBS, + REMOVE_AFTER_7_DAYS_OR_50_JOBS, +} from '../helpers/remove-job-configuration.js'; + +export const executeFlowJob = async (job) => { + const { flowId } = job.data; + + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const user = await flow.$relatedQuery('user'); + const allowedToRunFlows = await user.isAllowedToRunFlows(); + + if (!allowedToRunFlows) { + return; + } + + const triggerStep = await flow.getTriggerStep(); + + const { data, error } = await processFlow({ flowId }); + + const reversedData = data.reverse(); + + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + }; + + for (const triggerItem of reversedData) { + const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + triggerItem, + }; + + await triggerQueue.add(jobName, jobPayload, jobOptions); + } + + if (error) { + const jobName = `${triggerStep.id}-error`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + error, + }; + + await triggerQueue.add(jobName, jobPayload, jobOptions); + } +}; diff --git a/packages/backend/src/jobs/execute-trigger.js b/packages/backend/src/jobs/execute-trigger.js new file mode 100644 index 00000000..b81d6ff7 --- /dev/null +++ b/packages/backend/src/jobs/execute-trigger.js @@ -0,0 +1,32 @@ +import actionQueue from '../queues/action.js'; +import Step from '../models/step.js'; +import { processTrigger } from '../services/trigger.js'; +import { + REMOVE_AFTER_30_DAYS_OR_150_JOBS, + REMOVE_AFTER_7_DAYS_OR_50_JOBS, +} from '../helpers/remove-job-configuration.js'; + +export const executeTriggerJob = async (job) => { + const { flowId, executionId, stepId, executionStep } = await processTrigger( + job.data + ); + + if (executionStep.isFailed) return; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + }; + + await actionQueue.add(jobName, jobPayload, jobOptions); +}; diff --git a/packages/backend/src/jobs/remove-cancelled-subscriptions.ee.js b/packages/backend/src/jobs/remove-cancelled-subscriptions.ee.js new file mode 100644 index 00000000..b8d33619 --- /dev/null +++ b/packages/backend/src/jobs/remove-cancelled-subscriptions.ee.js @@ -0,0 +1,15 @@ +import { DateTime } from 'luxon'; +import Subscription from '../models/subscription.ee.js'; + +export const removeCancelledSubscriptionsJob = async () => { + await Subscription.query() + .delete() + .where({ + status: 'deleted', + }) + .andWhere( + 'cancellation_effective_date', + '<=', + DateTime.now().startOf('day').toISODate() + ); +}; diff --git a/packages/backend/src/jobs/send-email.js b/packages/backend/src/jobs/send-email.js new file mode 100644 index 00000000..ed818493 --- /dev/null +++ b/packages/backend/src/jobs/send-email.js @@ -0,0 +1,31 @@ +import logger from '../helpers/logger.js'; +import mailer from '../helpers/mailer.ee.js'; +import compileEmail from '../helpers/compile-email.ee.js'; +import appConfig from '../config/app.js'; + +export const sendEmailJob = async (job) => { + const { email, subject, template, params } = job.data; + + if (isCloudSandbox() && !isAutomatischEmail(email)) { + logger.info( + 'Only Automatisch emails are allowed for non-production environments!' + ); + + return; + } + + await mailer.sendMail({ + to: email, + from: appConfig.fromEmail, + subject: subject, + html: compileEmail(template, params), + }); +}; + +const isCloudSandbox = () => { + return appConfig.isCloud && !appConfig.isProd; +}; + +const isAutomatischEmail = (email) => { + return email.endsWith('@automatisch.io'); +};