@yingyeothon/actor-system
v0.4.0
Published
A basic actor system only using a queue and a lock.
Downloads
142
Readme
Actor system
A basic actor system only using a queue and a lock.
Usage
The simplest case
import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";
const subsys = {
queue: new InMemorySupport.InMemoryQueue(),
lock: new InMemorySupport.InMemoryLock(),
awaiter: new InMemorySupport.InMemoryAwaiter(),
};
class Adder {
private value = 0;
constructor(public readonly id: string) {}
public onMessage = (message: { delta: number }) => {
this.value += message.delta;
};
}
const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };
// `send` means that produces a message to an actor and try to process it if it is possible.
// If other thread attaches this actor, it would process my message, too.
await Actor.send(env, { item: { delta: 1 } });
await Actor.send(env, { item: { delta: 2 } });
await Actor.send(env, { item: { delta: 3 } });
await Actor.send(env, { item: { delta: 4 } });
// `adder` actor would process all messages sequentially in background.
Await policy
It supports awaitPolicy
that determines how long I should wait. Forget
is default that means I don't want to wait anymore. We can choose Act
that waits after onMesssage
call or Commit
that waits after onCommit
call. And in that cases, awaitTimeoutMillis
makes a timeout to wait.
Actor.send(env, {
item: { delta: 10 },
awaitPolicy: Actor.AwaitPolicy.Act,
awaitTimeoutMillis: 100,
})
.then(/* HAPPY */) // It would be called after `onMessage`.
.catch(/* SAD */);
With prepare and commit
It is too hard that an actor loads its context everytime to process onMessage
. If there are many of waiting messages it leads to huge latency. To overcome this, this library supports onPrepare
and onCommit
to make a processing cycle like onPrepare -> a loop of onMessage until a queue is empty -> onCommit
.
class Adder {
private value = 0;
constructor(public readonly id: string) {}
public onPrepare = async () => {
// Load context from the outer storage.
};
public onCommit = () => {
// Store context to the outer storage.
};
public onMessage = (message: { delta: number }) => {
// Modify context in memory.
this.value += message.delta;
};
}
const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.send(env, {
item: { delta: 10 },
awaitPolicy: Actor.AwaitPolicy.Commit,
awaitTimeoutMillis: 1000,
})
.then(/* HAPPY */) // It would be called after `onCommit`.
.catch(/* SAD */);
With fire-and-forget producer and dedicated consumer
Sometimes, we want to use fire-and-forget producer and dedicated consumer to improve overall latency. And, in this case, bulk-message-handler is better than single-message-handler.
// To reduce code size, use an environment tailored to `post`.
await Actor.post(
{
id: `adder-1`,
awaiter: {
wait: subsys.awaiter.wait,
},
queue: {
push: subsys.queue.push,
},
logger: subsys.logger, // optional
},
{ item: { delta: 10 } }
)
.then(/* HAPPY */)
.catch(/* SAD */);
// Or you can use `enqueue`, which doesn't even need `awaiter`.
Actor.enqueue(
{
id: adder.id,
queue: {
push: actorSubsys.queue.push,
},
logger: actorSubsys.logger, // optional
},
{ item: { delta: 1 } }
)
.then(/* HAPPY */)
.catch(/* SAD */);
class Adder {
private value = 0;
constructor(public readonly id: string) {}
// It can process multiple messages at one time.
public onMessages = (messages: { delta: number }[]) => {
for (const message of messages) {
this.value += message.delta;
}
};
}
// This `bulk` processor would be alive in 60 seconds.
const env = { ...Actor.bulkConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.tryToProcess(env, { aliveMillis: 60 * 1000 });
Shift
Preventing to be a victim, it supports aliveMillis
and shiftable
when processing messages from a queue. If a timeout occurred while executing tryToProcess
, it gives up to process and occurs shift
event. It is useful to use in a container which has a limited lifetime such as AWS Lambda.
const subsysWithShift = {
...subsys,
shift: async (actorId: string) => {
// Invoke a new AWS Lambda to process remaining messages in this actor.
},
};
const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-shift`) };
Actor.send(
env,
{
item: { delta: 10 },
},
{ aliveMillis: 5 * 1000, shiftable: true } // A usual timeout of API Gateway
);
It can be expanded a distributed actor system easily when both of a queue and a lock work properly in shared instances.
EventLoop
We can forget about awaiter
for a while and focus on the more basic structure. They are enqueue
and eventLoop
. This is useful if we want to process other tasks in a loop as well as process a message when the actor's lock is occupied. For example, we could process the game logic by polling game messages from the actor queue.
import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";
const subsys = {
queue: new InMemorySupport.InMemoryQueue(),
lock: new InMemorySupport.InMemoryLock(),
awaiter: new InMemorySupport.InMemoryAwaiter(),
};
class Game {
constructor(public readonly id: string) {}
public loop = async (poll: () => Promise<GameMessage[]>) => {
while (this.running) {
this.processMessages(await poll());
this.tick();
}
};
}
const subsys = {
queue: new InMemorySupport.InMemoryQueue(),
lock: new InMemorySupport.InMemoryLock(),
};
async function main() {
const game = new Game("GAME_ID");
await Actor.eventLoop<GameMessage>({
...subsys,
...game,
});
}
async function sendMessage(gameId: string, message: GameMessage) {
await Actor.enqueue(
{
...subsys,
id: gameId,
},
{ item: message }
);
}
License
MIT