Merge pull request #2404 from automatisch/add-status-to-executions
feat: Add status column to executions
This commit is contained in:
@@ -0,0 +1,57 @@
|
||||
export const up = async (knex) => {
|
||||
await knex.schema.alterTable('executions', (table) => {
|
||||
table.string('status');
|
||||
});
|
||||
|
||||
const batchSize = 5000;
|
||||
let lastProcessedId = null;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const executionsBatch = await knex('executions')
|
||||
.select('id')
|
||||
.whereNull('status')
|
||||
.modify((query) => {
|
||||
if (lastProcessedId) {
|
||||
query.where('id', '>', lastProcessedId); // Adjusted for lexicographic order
|
||||
}
|
||||
})
|
||||
.orderBy('id') // Ensure consistent ordering
|
||||
.limit(batchSize);
|
||||
|
||||
if (executionsBatch.length === 0) break;
|
||||
|
||||
const executionIds = executionsBatch.map((row) => row.id);
|
||||
|
||||
await knex.raw(
|
||||
`
|
||||
WITH execution_statuses AS (
|
||||
SELECT
|
||||
execution_id,
|
||||
CASE
|
||||
WHEN bool_or(status = 'failure') THEN 'failure'
|
||||
ELSE 'success'
|
||||
END as derived_status
|
||||
FROM execution_steps
|
||||
WHERE execution_id = ANY(?)
|
||||
GROUP BY execution_id
|
||||
)
|
||||
UPDATE executions
|
||||
SET status = execution_statuses.derived_status
|
||||
FROM execution_statuses
|
||||
WHERE executions.id = execution_statuses.execution_id
|
||||
`,
|
||||
[executionIds]
|
||||
);
|
||||
|
||||
lastProcessedId = executionsBatch[executionsBatch.length - 1].id;
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
};
|
||||
|
||||
export const down = async (knex) => {
|
||||
await knex.schema.alterTable('executions', (table) => {
|
||||
table.dropColumn('status');
|
||||
});
|
||||
};
|
||||
@@ -68,10 +68,19 @@ class ExecutionStep extends Base {
|
||||
}
|
||||
}
|
||||
|
||||
async updateExecutionStatus() {
|
||||
const execution = await this.$relatedQuery('execution');
|
||||
|
||||
await execution.$query().patch({
|
||||
status: this.status === 'failure' ? 'failure' : 'success',
|
||||
});
|
||||
}
|
||||
|
||||
async $afterInsert(queryContext) {
|
||||
await super.$afterInsert(queryContext);
|
||||
Telemetry.executionStepCreated(this);
|
||||
await this.increaseUsageCount();
|
||||
await this.updateExecutionStatus();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -149,4 +149,32 @@ describe('ExecutionStep model', () => {
|
||||
expect(increaseUsageCountSpy).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateExecutionStatus', () => {
|
||||
it('should update execution status to failure when step status is failure', async () => {
|
||||
const execution = await createExecution();
|
||||
const executionStep = await createExecutionStep({
|
||||
executionId: execution.id,
|
||||
status: 'failure',
|
||||
});
|
||||
|
||||
await executionStep.updateExecutionStatus();
|
||||
|
||||
const updatedExecution = await execution.$query();
|
||||
expect(updatedExecution.status).toBe('failure');
|
||||
});
|
||||
|
||||
it('should update execution status to success when step status is success', async () => {
|
||||
const execution = await createExecution();
|
||||
const executionStep = await createExecutionStep({
|
||||
executionId: execution.id,
|
||||
status: 'success',
|
||||
});
|
||||
|
||||
await executionStep.updateExecutionStatus();
|
||||
|
||||
const updatedExecution = await execution.$query();
|
||||
expect(updatedExecution.status).toBe('success');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user