npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

paced-work-stream

v1.0.3

Published

Node.js stream working at constant pace and concurrent

Downloads

7

Readme

PacedWorkStream

NPM Version Node Build Status Coverage Status dependencies Status

Node.js transform stream working at constant pace and concurrent for object mode

Features

  • Work time at once can be specified. (workMS option)
  • Concurrent workers can be specified. (concurrency option)
  • Fires done event after when all workers have finished asynchrous -processes
  • Counting tag system to call this.countTag(<tag>) in _workPromise, you can get summarized results tagCounts grouped by tag.
  • Node.js 6.10 or later

Targets

  • API client that needs to handle the rate-limit
  • DB client that needs to handle the read/write capacity units like AWS DynamoDB

Install

$ npm install -save paced-work-stream

How to Use

Create a PacedWorkStream

new PacedWorkStream(options, workPromise)

  • options <Object>
    • concurrency is the number of concurrent processes.
    • workMS is milliseconds of work time at once that contains process-time and wait-time.
    • delay is enable to start concurrent process in order delay for a time that divided workMS by concurrency, default is false. workPromise must return functions wrap each promise. Refer to the following figure for detailed operation pattern.
    • highWaterMark is maximum object buffer size. If you use flow mode, you should set it at least concurrency.
  • workPromise is function(item): must return a Promise processing the item or a Function that returns a Promise.

Delay Figure

Create subclass that extends PacedWorkStream

  • super(options) must be called in the constructor.
  • _workPromise method must be overrided and return a Promise processing the item or a Function that returns a Promise.
class MyWorkStream extends PacedWorkStream {
  constructor(options) {
    super(options);
  }
  _workPromise(item) {
    return () => {
      this.countTag(item.tag);
      return Promise.resolve(item.value);
    };
  }
}

Examples

const es = require('event-stream');
const devnull = require('dev-null');
const PacedWorkStream = require('paced-work-stream');

const pwStream = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);

    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })
  .on('done', function() {
    console.log(this.tagCounts);
  }).on('error', (err) => {
    console.error(err);
  });

const reader = es.readArray([11, 12, 21, 22, 31])
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));
  • Pay attention to handling done event to get last tagCounts because workers haven't processed items on finish event.
  • If stream need not output, the stream must pipe dev-null.

Console output

$ node example.js
2016-09-11T03:17:50.000Z Begin 11
2016-09-11T03:17:50.003Z Begin 12
2016-09-11T03:17:50.605Z End 11
2016-09-11T03:17:50.605Z End 12
2016-09-11T03:17:51.009Z Begin 21
2016-09-11T03:17:51.009Z Begin 22
2016-09-11T03:17:51.606Z End 21
2016-09-11T03:17:51.606Z End 22
2016-09-11T03:17:52.004Z Begin 31
2016-09-11T03:17:52.607Z End 31
{ workDone: 5 }

Using with Promised Lifestream

Promised Lifestream is useful for stream pipeline. The following example gets the same result as above.

'use strict';

const es = require('event-stream');
const PromisedLife = require('promised-lifestream');

const PacedWorkStream = require('paced-work-stream');

const pacedWorker = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);

    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })

PromisedLife([
  es.readArray([11, 12, 21, 22, 31]),
  pacedWorker
])
.then(() => {
  console.log(pacedWorker.tagCounts);
})
.catch(err => {
  console.error(err);
});

License

MIT