npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@yingyeothon/actor-system

v0.4.0

Published

A basic actor system only using a queue and a lock.

Downloads

142

Readme

Actor system

A basic actor system only using a queue and a lock.

Usage

The simplest case

import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
  awaiter: new InMemorySupport.InMemoryAwaiter(),
};

class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  public onMessage = (message: { delta: number }) => {
    this.value += message.delta;
  };
}

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };

// `send` means that produces a message to an actor and try to process it if it is possible.
// If other thread attaches this actor, it would process my message, too.
await Actor.send(env, { item: { delta: 1 } });
await Actor.send(env, { item: { delta: 2 } });
await Actor.send(env, { item: { delta: 3 } });
await Actor.send(env, { item: { delta: 4 } });

// `adder` actor would process all messages sequentially in background.

Await policy

It supports awaitPolicy that determines how long I should wait. Forget is default that means I don't want to wait anymore. We can choose Act that waits after onMesssage call or Commit that waits after onCommit call. And in that cases, awaitTimeoutMillis makes a timeout to wait.

Actor.send(env, {
  item: { delta: 10 },
  awaitPolicy: Actor.AwaitPolicy.Act,
  awaitTimeoutMillis: 100,
})
  .then(/* HAPPY */) // It would be called after `onMessage`.
  .catch(/* SAD */);

With prepare and commit

It is too hard that an actor loads its context everytime to process onMessage. If there are many of waiting messages it leads to huge latency. To overcome this, this library supports onPrepare and onCommit to make a processing cycle like onPrepare -> a loop of onMessage until a queue is empty -> onCommit.

class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  public onPrepare = async () => {
    // Load context from the outer storage.
  };
  public onCommit = () => {
    // Store context to the outer storage.
  };

  public onMessage = (message: { delta: number }) => {
    // Modify context in memory.
    this.value += message.delta;
  };
}

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.send(env, {
  item: { delta: 10 },
  awaitPolicy: Actor.AwaitPolicy.Commit,
  awaitTimeoutMillis: 1000,
})
  .then(/* HAPPY */) // It would be called after `onCommit`.
  .catch(/* SAD */);

With fire-and-forget producer and dedicated consumer

Sometimes, we want to use fire-and-forget producer and dedicated consumer to improve overall latency. And, in this case, bulk-message-handler is better than single-message-handler.

// To reduce code size, use an environment tailored to `post`.
await Actor.post(
  {
    id: `adder-1`,
    awaiter: {
      wait: subsys.awaiter.wait,
    },
    queue: {
      push: subsys.queue.push,
    },
    logger: subsys.logger, // optional
  },
  { item: { delta: 10 } }
)
  .then(/* HAPPY */)
  .catch(/* SAD */);

// Or you can use `enqueue`, which doesn't even need `awaiter`.
Actor.enqueue(
  {
    id: adder.id,
    queue: {
      push: actorSubsys.queue.push,
    },
    logger: actorSubsys.logger, // optional
  },
  { item: { delta: 1 } }
)
  .then(/* HAPPY */)
  .catch(/* SAD */);
class Adder {
  private value = 0;

  constructor(public readonly id: string) {}

  // It can process multiple messages at one time.
  public onMessages = (messages: { delta: number }[]) => {
    for (const message of messages) {
      this.value += message.delta;
    }
  };
}

// This `bulk` processor would be alive in 60 seconds.
const env = { ...Actor.bulkConsumer, ...subsys, ...new Adder(`adder-1`) };
Actor.tryToProcess(env, { aliveMillis: 60 * 1000 });

Shift

Preventing to be a victim, it supports aliveMillis and shiftable when processing messages from a queue. If a timeout occurred while executing tryToProcess, it gives up to process and occurs shift event. It is useful to use in a container which has a limited lifetime such as AWS Lambda.

const subsysWithShift = {
  ...subsys,
  shift: async (actorId: string) => {
    // Invoke a new AWS Lambda to process remaining messages in this actor.
  },
};

const env = { ...Actor.singleConsumer, ...subsys, ...new Adder(`adder-shift`) };
Actor.send(
  env,
  {
    item: { delta: 10 },
  },
  { aliveMillis: 5 * 1000, shiftable: true } // A usual timeout of API Gateway
);

It can be expanded a distributed actor system easily when both of a queue and a lock work properly in shared instances.

EventLoop

We can forget about awaiter for a while and focus on the more basic structure. They are enqueue and eventLoop. This is useful if we want to process other tasks in a loop as well as process a message when the actor's lock is occupied. For example, we could process the game logic by polling game messages from the actor queue.

import * as Actor from "@yingyeothon/actor-system";
import * as InMemorySupport from "@yingyeothon/actor-system/lib/support/inmemory";

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
  awaiter: new InMemorySupport.InMemoryAwaiter(),
};

class Game {
  constructor(public readonly id: string) {}

  public loop = async (poll: () => Promise<GameMessage[]>) => {
    while (this.running) {
      this.processMessages(await poll());
      this.tick();
    }
  };
}

const subsys = {
  queue: new InMemorySupport.InMemoryQueue(),
  lock: new InMemorySupport.InMemoryLock(),
};

async function main() {
  const game = new Game("GAME_ID");
  await Actor.eventLoop<GameMessage>({
    ...subsys,
    ...game,
  });
}

async function sendMessage(gameId: string, message: GameMessage) {
  await Actor.enqueue(
    {
      ...subsys,
      id: gameId,
    },
    { item: message }
  );
}

License

MIT