async-job-state
v0.2.1
Published
A small library to handle long-running async tasks.
Downloads
5
Readme
async-job-state
github, npm
A small helper library for creating asynchronous tasks with reporting and remaining time estimation.
TODO: better readme
Usage
Simple usage:
import {
// job execution
collectAsyncJob,
startAsyncJob,
asyncJobStreamTransform,
// job creation helpers:
indefiniteAsyncJob,
arrayMapAsyncJob,
rangeAsyncJob,
sleep,
// useful types:
type AsyncJobState,
AsyncJobPhase,
} from 'async-job-state';
const customTestJob = async function*(state: AsyncJobState) {
yield state.itemIndex * 2;
if(state.itemIndex >= 5) return;
};
const state = await startAsyncJob(customTestJob, {
// optional delay between iterations
delay: () => new Promise(r => setTimeout(r, 100)),
onStart(state) {
console.log('Job started at index:', state.itemIndex);
return { myAdditionalStateProperty: 'test job!' };
},
onProgress(state, item) {
console.log('Progress:', state.itemIndex, item);
// do something with the item
},
onEnd(state) {
console.log('Job ended at index:', state.itemIndex);
}
});
console.log(state.myAdditionalStateProperty);
// cancel the job by setting state.cancelled = true
await state.donePromise;
// alternatively to startAsyncJob which returns the state, you can use:
const results = await collectAsyncJob(customTestJob, { /* options */ });
Common job helpers:
console.log(
await collectAsyncJob(indefiniteAsyncJob((stop, state) => {
// calling stop() will end the task after the current iteration
if (state.itemIndex === 3) stop(); // you can `return stop()` instead to end the task immediately
return state.itemIndex;
}))
); // [0, 1, 2, 3]
console.log(
await arrayMapAsyncJob(['a', 'b', 'c'], async (item, state) => {
// callbacks can be async
return await Promise.resolve(item.toUpperCase());
})
); // ['A', 'B', 'C']
console.log(
await collectAsyncJob(rangeAsyncJob(3, (item) => item * 10))
); // [0, 10, 20]
// stream status reporting
const { state, transform } = asyncJobStreamTransform({
timingAverageWindow: 5,
startImmediately: true,
// if startImmediately is not set to true, the job will only start automatically
// when the first stream item is processed, but then first time measurement will be off
});
// optionally set state.totalItems for time remaining estimates.
const jobStream = stream.Readable.from(customTestJob()).pipe(transform);
// use your stream as usual, state will be updated on each processed item
Types:
enum AsyncJobPhase {
NotStarted = 0,
Started = 1,
Ended = 2
}
interface AsyncJobState {
itemIndex: number;
itemsTotal: number | undefined;
cancelled: boolean;
readonly phase: AsyncJobPhase;
readonly startDate: Date;
readonly endDate: Date | null;
readonly clock: DurationIntervalClock | null;
readonly estimatedRemainingSeconds: number | undefined;
readonly error: any;
readonly donePromise: Promise<void>;
}
interface DurationIntervalClock {
durationsSamples: number[];
intervalSamples: number[];
checkHasGoodAverage(q?: number): boolean;
lastDuration: number | undefined;
lastInterval: number | undefined;
averageDuration: number | undefined;
averageInterval: number | undefined;
}
interface AsyncJobOptions<ResultValue, StateEx = void> {
delay?: () => Promise<any>;
timingAverageWindow?: number;
onStart?: (state: AsyncJobState)
=> MaybePromise<StateEx | AsyncJobStateReplacement<StateEx>>;
onEnd?: (state: ExtendedAsyncJobState<StateEx>)
=> MaybePromise<void | Partial<StateEx & AsyncJobState> | AsyncJobStateReplacement<StateEx>>;
onProgress?: (state: ExtendedAsyncJobState<StateEx>, item: ResultValue)
=> MaybePromise<void | Partial<StateEx & AsyncJobState> | AsyncJobStateReplacement<StateEx>>;
}
type AsyncJob<R> = AsyncIterable<Awaited<R>, void, void>;
type AsyncJobFactory<R> = (state: AsyncJobState) => AsyncJob<R>;
// Returning from a callback an object wrapped using this function will cause the state to be completely replaced with the return value.
// If a raw object is returned from a callback, Object.assign is used to update the state.
function asyncJobStateReplacement<T extends AsyncJobState>(state: T): AsyncJobStateReplacement<T>;
async function startAsyncJob<R, StateEx>(factory: AsyncJob<R> | AsyncJobFactory<R>, options: AsyncJobOptions<R, StateEx>): Promise<ExtendedAsyncJobState<StateEx>>;
async function collectAsyncJob<R, StateEx>(factory: AsyncJob<R> | AsyncJobFactory<R>, options?: AsyncJobOptions<R, StateEx>): Promise<R[]>;