npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

campbellcache

v1.0.1

Published

Implementation of Herdcache for Javascript

Downloads

291

Readme

CampbellCache

npm install campbellcache


Overview

The cache borrows heavily from the concepts laid out in spray-caching.

An equivalent library to Campbell Cache for java is that of herdcache. Originally I was going to name this herdcache-js. However, my White Campbell ducks got taken by a fox; so in honour of Henrietta and Puddle I renamed it (think of it as a Herd of Ducks :-)

CampbellCache is intended to be used when caching items in Memcached, especially that of AWS's implementation: Elasticache.

It is the caching of a value against a key. i.e. "Customer123", which you will retrieve from the cache later on, rather than having to do an expensive operation to obtain the information for "Customer123"

When using campbellcache, the concept is to execute a function that generates the value only if the item is not in the cache. If a subsequent lookup for the same key is currently running, rather than having 2 lookups executing for that same key, only 1 will be executing. With both requests for that same key, being satisfied by the 1 execution (either by memcached cache hit lookup or the execution of the function to generate the value).

Rather than the tradition method of performing the following yourself, campbellcache does this execution for you:

  1. lookup 'key' in cache
  2. 'key' is not in cache
  3. execute an expensive operation (call a 3rd party system)
  4. store item in cache under 'key'

When performing the above manually, If 2 requests are running concurrently for the 'key', 3. has not completed for the 1st request. Then you would have 2 requests running step 3. With campbellcache, you would only have 1.

This having multiple consumers waiting on 1 producer, is achieved by the cache returning an RxJS Observable. The consumer then subscribes to be notified of the value (unsubscribing after they have recieved the value).

The value returned from the Observable is that of a CacheItem. This wrapper object, provides methods that allow you let you know if the item is from the cache or not, or have a default value if nothing was returned:

  • isFromCache()
  • isNotFromCache()
  • value("default if null") #returns the default if null
  • value() #will return null
  • getKey()
  • isError() #if there was an error
  • hasValue() #check if null

This scenario of having many requests to a the same cache key (e.g. a resource URI) arriving at the same time. I.e. a popular news story or similar, is known a stampeading/thundering herd. The result being: you have a lot of requests to the same memcached server, at the same time, to fetch the same value. This puts huge load on that 1 memcached server, which could result in an outage. If the item is not in the cache all the requests end up hitting the upstream services that generate the cache value; which again couple cripple that upstream service.

Returning an Observable from the cache means that multiple resources for a missing cache value that hasn’t been calculated, wait on the same Observable that is executing for the initial/first request for the cache key.

Returning a Observable also means that one request for a cache value from memcached can satisfy many requests.

HerdCache is not a distributed caching library. The herd process only works within the 1 running proccess. If you have 2 EC2 Instances/Processes running campbellcache, the protection mechanism is per Instance/Process


Creating the Cache object

Herdcache can be used against a local memcached or run against AWS memcache elasticache (utilising the auto discovery of hosts).

No Memcached

If you wish to use the Cache as an in memory cache, and not use that of memcached. You can create the as follows. It uses caching-map as the in memory store:

const campbellcache = new CampbellCache({
    autodiscovery: false,
    in_memory: true
})

Local Memcached

The below shows how to connect to 2 local memcached by providing an array of connections

const campbellcache = new CampbellCache({
    autodiscovery: false,
    hosts: ["127.0.0.1:11211","127.0.0.1:11212"],
})

AWS Elasticache

The easiest way to use elasticache is to set the environment variable: EC_MEMCACHED_CONFIGURL to the configuration url of your elasticache cluster:

export EC_MEMCACHED_CONFIGURL=domtest.xxxxx.cfg.use1.cache.amazonaws.com

If you have configured your elasticache cluster on a different port do:

export EC_MEMCACHED_CONFIGURL=domtest.xxxxx.cfg.use1.cache.amazonaws.com:11211

Once you have the above set you create the client as follows:

const campbellcache = new CampbellCache({
    autodiscovery: true,
    autodiscovery_intervalInMs : 60000,
    autodiscovery_startIntervalInMs : 10000,
})

Memcached Hosts Discovery by DNS name

If you use some sort of service discovery, which constantly updates a DNS entry with healthy memcached node, then this is mode you want to use. The specified dns entry will be constantly polled for the memcached hosts. If the hosts change, the cache client will be updated to point to those entries automatically.

Set the value of the environment variable: EC_MEMCACHED_CONFIGURL to the DNS name that will resolve to your memcached hosts.

export EC_MEMCACHED_CONFIGURL=domsmemcached.service

If you have configured your memcached nodes on a different port do:

export EC_MEMCACHED_CONFIGURL=domsmemcached.service:11211

Once you have the above set you create the client as follows:

const campbellcache = new CampbellCache({
    autodiscovery: true,
    autodiscovery_intervalInMs : 60000,
    autodiscovery_startIntervalInMs : 10000,
    autodiscovery_by_dns: true
})

If the DNS Name isn't set in the environment variable, pass it as a constructor arg:

const campbellcache = new CampbellCache({
    autodiscovery: true,
    autodiscovery_url: "domsmemcached.service",
    autodiscovery_intervalInMs : 60000,
    autodiscovery_startIntervalInMs : 10000,
    autodiscovery_by_dns: true
})

Memcached Options

You can pass to the constructor configuration options that specify how the memcached client will work (https://www.npmjs.com/package/memcached) will work:

const campbellcache = new CampbellCache({
    autodiscovery: true,
    autodiscovery_intervalInMs : 60000,
    autodiscovery_startIntervalInMs : 10000,
    memcached_opts : {
        maxValue : 1048576,
        timeout: 2500,
        poolSize:  10,
        retries: 0,
        reconnect : 120000,
        retry: 30000,
        idle:  60000,
        remove : false,
        keyCompression : false
        retries: 0,
        minTimeout: 1000,
        maxTimeout: 10000,
        factor: 3,
        randomize: false,
        debug: false
    }
})
  • maxValue: 1048576, the maximum size of a value.

  • poolSize: 10, the maximum size of the connection pool.

  • reconnect: 18000000, the time between reconnection attempts (in milliseconds).

  • timeout: 2500, the time after which Memcached sends a connection timeout (in milliseconds).

  • retries: 0, the number of socket allocation retries per request.

  • retry: 30000, the time between a server failure and an attempt to set it up back in service.

  • remove: false, if true, authorizes the automatic removal of dead servers from the pool.

  • keyCompression: true, whether to use md5 as hashing scheme when keys exceed maxKeySize (250).

  • idle: 5000, the idle timeout for the connections.

  • retries: 0, Connection pool retries to pull connection from pool

  • factor: 3, Connection pool retry exponential backoff factor

  • minTimeout: 1000, Connection pool retry min delay before retrying

  • maxTimeout: 10000, Connection pool retry max delay before retrying

  • randomize: false, Connection pool retry timeout randomization

  • debug: false, Output the commands and responses from memcached

It is recommended not to use key compression. The reason being that key caching involves compressing the key to an MD5 string. This makes it possible that for different key strings have had a md5 collision; and you end up caching a value against a string which you do not expect. This could have nasty consequences for your application.


Autodiscovery

Autodiscovery is the mechanism by which the memcached servers to connect to are discovered by polling a configuration url, or a dns entry.

The default auto discovery primarily for AWS elasticcache:(http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/AutoDiscovery.html)

If the list of memcached servers you wish to connect to are discovered via a dns address (ipv4) then you can use DNS Autodiscovery, by specifying the configuration parameter autodiscovery_by_dns as true.

The configuration server url is by default taken from the environment variable EC_MEMCACHED_CONFIGURL. However, this can be override and specified at construction time via autodiscovery_url.

There are several other options that control the autodiscovery, and the frequency by which it talks to configuration server to find if there are new nodes.

The following shows the options available:

const campbellcache = new CampbellCache({
    autodiscovery: true,
    autodiscovery_intervalInMs : 60000,
    autodiscovery_startIntervalInMs : 0,
    autodiscovery_timeoutInMs: 3000,
    autodiscovery_oldClientTTL : 120000,
    autodiscovery_url: "127.0.0.1:11211",
    autodiscovery_by_dns: false
})
  • autodiscovery : If autodiscovery is actually enabled. If not enable, you need to specify the memcached servers via hosts.

  • autodiscovery_by_dns : If the list of memcached hosts to connect to is actually discovery by resolving a dns address (which will be constantly polled for changes)

  • autodiscovery_intervalInMs : The frequency, in millis, by which to poll checking for new memcached servers

  • autodiscovery_startIntervalInMs : The delay for the first polling request to take place.

  • autodiscovery_timeoutInMs : The checking for new servers is a network request. This is the number of milliseconds the network request is allowed to take at a maximum

  • autodiscovery_oldClientTTL : When new servers are discovered, the old memcached client is shutdown, and a new one created (under the campbell cache hood). This is the amount of time to wait, in milliseconds, before the old memcached client is shutdown (allows time for old operations to finish)

  • autodiscovery_url : url of the configuration server. Default is to use the environment variable: EC_MEMCACHED_CONFIGURL


Methods

There's a few methods in campbell cache that you might interact with, buy by far the most likely and recommended is apply:

Apply

apply(key,supplier,options)

Supplier is a function that returns an Observable:

    function deathByBoom() {
        return new Rx.Observable.create(function(observer) {
                        observer.next("You Got Nuked by the BoomStick"))
                    }).take(1).shareReplay();
    }

    campbellcache.apply("GearsOfWarDeath",deathByBoom);

The supplier function is only executed if an item is not found in the cache that is deemed useable by the isCachedValueValidPredicate function of the options dict (If this function is not provided it is deemed usable).

The lookup in memcached for an item, and the execution of the Supplier, are only executed when a subscribe, registered for wishing to be notified of the outcome.

The supplier function returns an Rx.Observable that performs the operation of calculating the value that will be written to the cache. In the above example the value is that of the string "You Got Nuked by the BoomStick". This string will be passed to the isSupplierValueCachablePredicate function to determine if the value should be cached or not (If this function is not provided it is deemed cacheable)

If you do not register your interest in the outcome of the Observable returned by apply, nothing will execute. The Observable is lazy

var gearsObservable = apply("GearsOfWarDeath",supplier,options)

In the above nothing will be executed. The lookup in memcached, nor the execution of the supplier

var gearsDeathObservable = apply("GearsOfWarDeath",supplier,options)

// Register Interest and kick of the Observable
var sub = gearsDeathObservable.subscribe(function(deathBy) {
    console.log('Died By:' + deathBy.value());
});

_NOTE : you are responsible for unsubscribing the subscriber (more on this later)

The options is dictionary that configures how the apply function operates:

     {
        ttl : 5,
        isSupplierValueCachablePredicate:<boolean function(item)>,
        isCachedValueValidPredicate: <boolean function(item)>,
        waitForMemcachedSet : true|false,
     };
  • ttl : The number of seconds the item should be stored in the cache.
  • isSupplierValueCachablePredicate : A function that takes the item from the observable and return true if the item is cachable.
  • isCachedValueValidPredicate : If the item retrieved from the cache is useable
  • waitForMemcachedSet : If the supplier was called to generate a value, shoudl we wait for the item to be written to memcached before waiting notify observes of the value.

Why is the supplier a function?

With the apply (or set) method you provide the supplier as a function. This function generates the Observable (or the Promise - from v0.0.3 onwards).

The reason for providing a function that generates the observable (or promise), rather than just providing the Observable or Promise; is that Observable or Promise does not need to be created if the value is either:

  • Fetched from memcached (or elasticache)
  • Is a cached Observable (an Observable that is already executing - the herd protection)

In both these cases, the Observable (or Promise) does not need to be created. CampbellCache's apply or set method will only execute the supplier function that creates the Observable, if it needs to.

For example, in the following lets assume that slow_google_request is a requestpromise to www.google.co.uk that takes 1 second to execute.

If a second request comes in, that calls google 500ms after the 1st request the observable returns by campbellcache.apply will be the same obseverable. The slow_google_request function will not be called.

    var subscribers_executed = 0;

    // write into cache
    var obs1 = campbellcache.apply("google",slow_google_request, {
        ttl: 10
    });

    // call it again 500ms after 1st request
    setTimeout( () => {
        var obs2 = campbellcache.apply("google",slow_google_request);
        obs2.subscribe((value) => subscribers_executed++);
    },500);

    var sub = obs1.subscribe(function(httpBody) {
        subscribers_executed++;
        setTimeout(() => {
            reply({
                success : false,
                google_requests: google_requests,
                google_function_requests : slow_google_request_function_calls,
                value: httpBody.value(),
                subscribers_executed: subscribers_executed
            });
        },1000);
    });

Promises

From version 0.0.3, you can provide a Promise to the apply method. The apply method still returns a Obsverable, but you can easily convert this to a promise via .toPromise().

Here is an example:

var flakeyGoogleCount = 0;
function flakeyGooglePromise() {
    if( (flakeyGoogleCount++)%2 == 0) {
        return new Promise( (resolve,reject) => {
            var promise = requestpromise(
                {
                    json: true,
                    resolveWithFullResponse: true,
                    uri: 'http://www.google.co.uk',
                    timeout:10000
                }
            );
            promise.then((data) => {
                resolve(data.body);
            }).catch((err) => {
                reject(err);
            });
        })
    } else {
        return new Promise(function(resolve,reject) {
            reject("boom!");
        })
    }
}

server.route({
    method: 'GET',
    path: '/flakeypromise/{name}',
    config: {
      validate: {
        params: {
          name: Joi.string()
        },
      },
    },
    handler: function (request, reply) {
        // write into cache
        var promise = campbellcache.apply(
            "flakeypromise",
            flakeyGooglePromise,
            {
                ttl: 5
            }
        ).toPromise();

        promise.then((cacheItem) => {
            reply({
                'isFromCache' : cacheItem.isFromCache(),
                'value' : cacheItem.value(),
                'isError' : false
            })
        }).catch((error) => {
            reply({
                'isFromCache' : error.isFromCache(),
                'value' : error.value(),
                'isError' : true
            })
        })
    }
});

Unsubscribing

In RxJS, when you subscribe to an observable you are returned a subscription:

var subscription = obs.subscribe(function(httpBody) {
    reply('Hello, ' + encodeURIComponent(request.params.name) + '!' + httpBody.value());
});

Observables are normally streams, that are continually returning data (mouse movements, keyboard events). The Observable is non finite, i.e. continuously emitting data. However, when working with Promises the data being generated is usually finite (a http request, a lookup for an item in a data store).

In RxJS it is important to unsubscribe from non finite observable in order to release memory associate with the subscription. However, this is not required for a finite Observable (i.e. a Promise); it is a SafeSubscription, that does not require unsubscribe() to be called.


Set

set(key,supplier,options)

Sets a value in the cache from the given supplier.

This method always forces the setting of a value to the cache (if it is deemed cacheable).

The internal cache that contains observables will be replaced by this running supplier, so any other call to apply and get will be supplied this observable.

The key must be a string, and the supplier must be a function that returns an Observable or Promise that generates the value that is to be place in the cache

Like that of the apply method the options is dictionary that configures how the set function operates:

     {
        ttl : 5,
        isSupplierValueCachablePredicate:<boolean function(item)>,
        waitForMemcachedSet : true|false,
     };
  • ttl : The number of seconds the item should be stored in the cache.

  • isSupplierValueCachablePredicate : A function that takes the item from the observable and return true if the item is cachable.

  • waitForMemcachedSet : If the supplier was called to generate a value, shoudl we wait for the item to be written to memcached before waiting notify observes of the value.


Get

get(key)

Returns an Observable that could either be an Observable that is running an existing apply or set supplier; for the given key. Or returns a new Observable that executes a look up for an item in memcached.

Returns a CacheItem from the Observable (like all other methods), for which cacheItem.value() will return null if the item was not in the memcached. cacheItem.isFromCache() will tell you if the item is from the cache or not.


Clear

Removes an item from the cache, and clears any Observable from the internal cache.

Returns an Observerable that returns true or false, if the delete ran ok.

var delObs = campbellcache.clear("NO_SUCH_THING");
delObs.subscribe(function(retrievedValue) {
    assert.equal(true,retrievedValue);
});

Logging

Version 1.0.0

Logging is performed via an EventEmitter. You can obtain the event emitted via the CampbellCache object, and then use your favourite logging library. Log lines 'events' are emitted against the event name campbell-cache-log-event:

CampbellCache.logEmitter.on('campbell-cache-log-event', function(level, message, ...args) {
    // Log the message through your logging package here
    // logger.log(level, message, ...args)
});

Here is an example using winston:

const winston = require('winston');
const CampbellCache = require('../lib/campbellcache');

const logger = winston.createLogger({
    level: 'debug'

});
logger.add(new winston.transports.Console({
    format: winston.format.combine(
        winston.format.splat(),
        winston.format.simple()
    ),
}));

var initialized = false;
module.exports.initialize = function() {
    if (!initialized) {
        initialized = true;
        CampbellCache.logEmitter.on('campbell-cache-log-event', function(level, message, ...args) {
            logger.log(level, message, ...args)
        });
    }
}

Verion 0.1.0

This version of Campbell Cache used Binford slf4j for logging (https://github.com/ivanplenty/binford-slf4j).

If you wish to see some logging output from cambell cache, you need to set up once the follow (create a separate file and require it):

The below goes to the console

var slf4j = require('binford-slf4j');
var binfordLogger = require('binford-logger');

slf4j.setLoggerFactory(binfordLogger.loggerFactory);
slf4j.loadConfig({
    level: slf4j.LEVELS.WARN,
    appenders:
        [{
            appender: binfordLogger.getDefaultAppender()
        }]
});

Examples

A set of examples that demonstrate the functionality of campbell cache.

Calling google.co.uk

The below demonstrates the calling of www.google.co.uk, caching the result for 10 seconds.

The www.google.co.uk request only starts after 1 second. Basically a simulation a slow http requests that has a 1+ second delay.

This means that if we run 50 concurrent connections, Only 1 request will result in calling www.google.co.uk, the other 49 will be waiting on that 1 request completing, and so forth. Then for 10 seconds everything will be quick, until the same occurs again. However, www.google.com is only called by 1 request every 10 seconds.

If we perform a load test for 120seconds, google will be called only 11 or 12 times

 wrk -c 50 -t 50 -d 120s http://127.0.0.1:3000/dom
Running 2m test @ http://127.0.0.1:3000/dom
  50 threads and 50 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   177.30ms  228.32ms   1.45s    91.74%
    Req/Sec     8.80      2.21    20.00     88.53%
  48201 requests in 2.00m, 2.05GB read
Requests/sec:    401.39
Transfer/sec:     17.52MB

Here is an example using Hapi.js and request promise.

The example uses the onRequest and onPreResponse of hapi.js to add the Observable's subscribers to a list of subscribers, that are then closed when the request finishes (not required for finite observables - which the example is. It is just demonstrating how to close a subscriber if you need to)

const Joi = require('joi');

const requestpromise = require('request-promise');

const Hapi = require('hapi');

const Rx = require('rxjs');

var slf4j = require('binford-slf4j');
var binfordLogger = require('binford-logger');

slf4j.setLoggerFactory(binfordLogger.loggerFactory);
slf4j.loadConfig({
    level: slf4j.LEVELS.DEBUG,
    appenders:
        [{
            appender: binfordLogger.getDefaultAppender()
        }]
});

const CampbellCache = require('campbellcache');

const campbellcache = new CampbellCache({
    autodiscovery: false,
    autodiscovery_oldClientTTL: 2000,
    hosts: ["127.0.0.1:11211"],
})

const server = new Hapi.Server({
    connections: {
        routes: {
            timeout: {
                server: 5000 //ms
                //socket: 1000 ms
            }
        }
    }
}
);

function slowGoogleObservable() {
    return new Rx.Observable.create(function(observer) {
        setTimeout(() => {
            var rep = requestpromise(
                                    {
                                        json: true,
                                        resolveWithFullResponse: true,
                                        uri: 'http://www.google.co.uk',
                                        timeout:10000
                                    }
            );

            rep.then(function (responseObj) {
                var body = responseObj.body;
                observer.next(body);
                observer.complete();
            })
            rep.catch(function (err) {
                observer.error(err);
                observer.complete();
            });
        },1000)
    }).take(1).shareReplay();
}

function addSubscriptionToRequest(subscription, request) {
    request.app.observable_subscriptions.push(subscription);
    console.log(request.app.observable_subscriptions);
}

server.connection({ port: 3000, host: 'localhost' });

server.ext('onRequest',(request,reply) => {
    // add empty list of subscriptions
    request.app.observable_subscriptions = [];
    reply.continue();
});

server.route({
    method: 'GET',
    path: '/',
    handler: function (request, reply) {
        reply('Hello, world!');
    }
});

server.route({
    method: 'GET',
    path: '/{name}',
    config: {
      validate: {
        params: {
          name: Joi.string()
        },
      },
    },
    handler: function (request, reply) {
        const log = request.log.bind(request);

        // write into cache
        var obs = campbellcache.apply("google",slowGoogleObservable, {
            ttl: 10
        });

        var sub = obs.subscribe(function(httpBody) {
            reply('Hello, ' + encodeURIComponent(request.params.name) + '!' + httpBody.value());
        });

        addSubscriptionToRequest(sub,request);
    }
});

server.ext('onPreResponse',(request,reply) => {
    // Make sure we unsubscribe any subscriptions
    for (let subscription of request.app.observable_subscriptions) {
        if (subscription) {
            console.log("Unsubscribing: " + subscription);
            subscription.unsubscribe();
        }
    }
    reply.continue();
});

server.start((err) => {

    if (err) {
        throw err;
    }
    console.log(`Server running at: ${server.info.uri}`);
});

Example Of Same Observable being returned

const Joi = require('joi');
const requestpromise = require('request-promise');
const Hapi = require('hapi');
const Rx = require('rxjs');

var slf4j = require('binford-slf4j');
var binfordLogger = require('binford-logger');

slf4j.setLoggerFactory(binfordLogger.loggerFactory);
slf4j.loadConfig({
    level: slf4j.LEVELS.DEBUG,
    appenders:
        [{
            appender: binfordLogger.getDefaultAppender()
        }]
});


const CampbellCache = require('campbellcache');

const campbellcache = new CampbellCache({
    autodiscovery: false,
    autodiscovery_oldClientTTL: 2000,
    hosts: ["127.0.0.1:11211"],
})

const server = new Hapi.Server({
    connections: {
        routes: {
            timeout: {
                server: 5000 //ms
                //socket: 1000 ms
            }
        }
    }
}
);

var slow_google_request_function_calls = 0;
var google_requests = 0;
function slow_google_request() {
    slow_google_request_function_calls++;
    return new Promise( (resolve,reject) => {
        setTimeout( () => {
            var promise = requestpromise(
                {
                    json: true,
                    resolveWithFullResponse: true,
                    uri: 'http://www.google.co.uk',
                    timeout:10000
                }
            );
            promise.then((data) => {
                google_requests++;
                resolve("success");
            }).catch((err) => {
                reject("fail");
            });
        },1000);
    })

}

server.connection({ port: 3000, host: 'localhost' });

server.route({
    method: 'GET',
    path: '/',
    config: {
      validate: {
        params: {
          name: Joi.string()
        },
      },
    },
    handler: function (request, reply) {
        const log = request.log.bind(request);
        var subscribers_executed = 0;

        // write into cache
        var obs1 = campbellcache.apply("google",slow_google_request, {
            ttl: 10
        });

        // call it again
        setTimeout( () => {
            var obs2 = campbellcache.apply("google",slow_google_request);
            obs2.subscribe(subscribers_executed++);
        },500);

        var sub = obs1.subscribe(function(httpBody) {
            subscribers_executed++;
            setTimeout(() => {
                reply({
                    success : false,
                    google_requests: google_requests,
                    google_function_requests : slow_google_request_function_calls,
                    value: httpBody.value(),
                    subscribers_executed: subscribers_executed
                });
            },1000);
        });

    }
});


server.start((err) => {

    if (err) {
        throw err;
    }
    console.log(`Server running at: ${server.info.uri}`);
});

More Docs Coming Soon....