nodiak
v0.4.4
Published
Nodiak is a Node.js client for the Riak Distributed Database
Downloads
35
Readme
Table of Contents
- Overview
- Features
- Installation
- Getting Started
- Cluster info and status:
- Bucket Operations:
- RObject Operations
- Counter Operations
- Sibling Auto-Resolution
- Riak Search and Riak 2i's
- MapReduce
- Tests
- Todos
Overview
Nodiak is a node.js client to the Riak distributed database. The focus of Nodiak is to provide a client that mimics some of the patterns and functionality of Basho's official clients for other languages while still taking advantage of the benefits of asynchronous patterns that can be easily implemented in node.js.
Nodiak's design is split across two general concepts. The base client, which handles the base Riak HTTP API operations, and useful higher-level abstractions (Bucket, Counter, and RObject) that build on the base client functionality to make Riak easier to work with.
NOTE:
While the base client's methods are available in the event you need them. The _bucket and _object namespace won't be detailed in this document because you should use the convience classes and abstratctions whenever possible.
Features
- HTTP and HTTPS connections.
- Automatic exponential-backoff retries.
- Cluster info operations.
- Bulk/Batch object operations (Get, Save, Delete).
- Riak CRDT Counters.
- Sibling notification.
- Parallel async and 'at once' sibling fetching.
- Sibling resolution and auto-resolution support (default: client-side LWW).
- MapReduce phase chaining.
- Ad-hoc Javascript and Erlang MapReduce functions.
- Riak Search and Riak 2i support.
- Evented stream results handling.
- Easy multiple cluster support.
Installation
$ npm install nodiak
API
NOTE:
Version 0.4.0 includes minor, but breaking, changes to the streaming API. The 2i streaming no longer auto-fetches the Riak objects for the keys returned by the query, and the streaming API form now returns an instance of
EventEmitter
directly from the.stream()
calls rather than requiring an additional callback to be passed in.
#####standard callback form
bucket.objects.get(array_of_keys, function(err, objs){})
The argument signature of the callback for non-streaming results is always callback(error, result)
. Where error will always be either null
when there was no error, or an instance of Error
, or any array of Error
s with details of the errors when they occur. The result
varies depending on the API call. See the detailed API documentation below for details.
#####streaming callback form
bucket.objects.get(array_of_keys).stream()
.on('data', function(data) {})
.on('error', function(err) {})
.on('end', function(arg) {});
For streaming results the function .stream()
returns gets an instance of EventEmitter
, and this emitter fires 'data'
, 'error'
, and 'end'
events.
#Getting Started
####require('nodiak').getClient( [backend], [host], [port], [defaults] );
// get client instance (defaults to HTTP backend on localhost:8098)
var riak = require('nodiak').getClient();
w/ custom backend, host, and port:
var riak = require('nodiak').getClient('https', '10.0.0.2', 8443);
w/ non-default client settings, such as resource locations, maxSockets value, mime decoders/encoders, etc:
var my_defaults = {
connection: { maxSockets: 50 },
resources: {
riak_kv_wm_mapred: "/mapreduce",
riak_kv_wm_stats: "/statistics",
riak_solr_indexer_wm: "/search"
}
}
// the 'defaults' object should always be the last.
var riak = require('nodiak').getClient('https', my_defaults);
For multiple-clusters just create several instances:
var cache = require('nodiak').getClient('https', '10.0.0.2', 8443);
var sessions = require('nodiak').getClient('http', '192.168.2.20', 8098);
var db = require('nodiak').getClient();
##Cluster info and status: ####.ping( callback );
// check that you can reach Riak
riak.ping(function(err, response) {
console.log(response);
});
'OK'
####.stats( callback );
// get current stats from Riak
riak.stats(function(err, response) {
console.log(response); // prints the stats provided by the 'riak_kv_wm_stats' resource.
});
{ vnode_gets: 0, ring_num_partitions: 64, storage_backend: 'riak_kv_eleveldb_backend', ... }
####.resources( callback );
// ask Riak for its current resource endpoints.
NOTE:
These are what your Riak install is using, not necessarily what the nodiak client is using. If you want to synchronize these values, you'll have to manage that yourself. getClient() -> .resources(result) -> getClient(result), or something similar.
riak.resources(function(err, response) {
console.log(response);
});
{ riak_kv_wm_buckets: '/riak', riak_kv_wm_index: '/buckets', riak_kv_wm_keylist: '/buckets', riak_kv_wm_link_walker: '/riak', riak_kv_wm_mapred: '/mapred', ... }
Bucket Operations:
##Bucket instance attributes
- ####.name
The name of the bucket in Riak as a
String
.
- ####.props
An
Object
containing the Riak bucket properties. Defaults to{}
.
- ####.resolver
The
Function
used to do sibling resolution on.object.get()
requests. Defaults toBucket.siblingLastWriteWins
(see here).
- ####.getSiblingsSync
Controls whether or not fetching siblings happens using parallel async requests or as one big 'multipart/mixed' to be parsed at once. Defaults to
false
.
- ####.client
An instance of the underlying backend client that handles communication with Riak. Is set to the client that created the
Bucket
instance withclient.bucket()
.
##Bucket instance methods ####client.bucket( name );
// factory method on the client that returns an instance of a Bucket object.
var user_bucket = riak.bucket('users');
user_bucket.objects.all(function(err, r_objs) {
console.log(r_objs);
});
[[Object], [Object], [Object]] // Array of RObjects
w/ a simple chaining pattern:
riak.bucket('users').objects.all(function(err, r_objs) {
console.log(r_objs);
});
[[Object], [Object], [Object]] // Array of RObjects
####Bucket.getProps( [callback] );
// update the Bucket instance with its bucket props from Riak
var user_bucket = riak.bucket('users');
user_bucket.getProps(function(err, props) {
console.log(users.props);
});
{ name: 'users', allow_mult: false, basic_quorum: false, big_vclock: 50, ... }
####Bucket.saveProps( [merge], [callback] );
// save updated bucket properties back to Riak.
var users = riak.bucket('users');
users.props.n_val = 2;
users.props.allow_mult = true;
users.props.last_write_wins = false;
users.saveProps(true, function(err, props) {
console.log(users.props);
});
{ name: 'users', allow_mult: true, last_write_wins: false, n_val: 2, ... }
The optional merge
argument is a boolean indicating wether to first get the bucket's properties from Riak and merge your updates with them, or to simply write your updates to Riak right away. This defaults to true
and is helpful in preventing you from obliterating things like pre and post-commit hooks defined on the bucket.
####Bucket.object.new( key, [data], [metadata] )
// factory method that returns a new instance of an RObject.
var my_robj = riak.bucket('users').object.new('some_key', { store: 'this thing'}, { vclock: 'aBcDE10fgH92'});
It's important to understand that this new object has not been read from or written to Riak at this point. If you know this key already exists in, or you want to refresh its data from, Riak, then you can 'hydrate' it using the RObject's .fetch()
method.
####Bucket.object.exists( key, callback )
// checks if an object exists in Riak
riak.bucket('users').object.exists('some_key', function(err, result) {
console.log(result);
});
result
will be either true
or false
.
####Bucket.objects.get( keys, [options], [resolver_fn, callback] )
// get one or more objects from Riak as RObjects.
This method can take a single key or an Array
of keys as input. Each request happens asynchronously in parallel. The optional options
parameter is an object that can contain Riak query parameters. Additionally, the optional resolver_fn
parameter allows you to pass in a custom sibling resolution function in the event any siblings are found.
var my_keys = ['me', 'myself', 'andI'];
riak.bucket('users').objects.get(my_keys, { r: 1 }, function(errs, objs) {
console.log(objs);
console.warn(errs);
});
Results returned through objs
will be returned as an Array
of RObject
s if you passed in an Array
of keys to the request. If the request produced errors then the err
value will be an Array
of Error
's for each that occurred or null
if there were none. Error.data
will contain the key for the failed request. Keys that can't be found are considered errors, and will be returned in err
with a .status_code
of 404
. The value of objs
value will be null
none of the keys could be found in Riak or all requests produced errors.
If a single key is passed in (not wrapped in an Array
), then the single resulting RObject
will be returned directly through objs
for convenience, and will be null
if not found or another error occurred. In the case of an error err
will contain the single Error
directly or will be null
if none occurred. This is provided for convenience when you're not attempting to get multiple objects.
NOTE:
By default nodiak fetches object siblings using parallel async GET requests for each sibling. This behavior can be overridden by setting the
.getSiblingsSync
property on theBucket
instance totrue
. This will cause nodiak to get all the siblings as a single large 'multipart/mixed' document and parse the contents. Further details in the Sibling Auto-Resolution section.
####Bucket.objects.get( keys, [options] ).stream( [resolver_fn] )
// get one or more objects from Riak as a stream of RObjects.
All the same details apply here as in the standard Bucket.objects.get()
method, except to signal that you want to stream the objects back from Riak you drop the callback and the resolver from .get()
, place them in .stream()
, and attach your listeners to the parameter passed to the callback.
var my_keys = ['me', 'myself', 'andI'];
riak.bucket('users').objects.get(my_keys, { r: 1 }).stream()
.on('data', function(obj) {
console.log(obj);
})
.on('error', function(err) {
console.warn(err);
})
.on('end', function() {
// do something else after we're all done getting the objects.
});
####Bucket.objects.save( r_objects, [callback])
// save one or more RObjects to Riak.
You can pass a single RObject
or an Array
of RObject
s into this method. Just like with the .get()
and .delete()
methods, each save request will happen asynchronously in parallel.
var things_to_save = [];
for(var i = 0; i < 5; i++) {
var robj = riak.bucket('users').objects.new('me'+i, { about_me: "I write software." });
robj.addMeta('extra_info', 'data goes up in here');
robj.addToIndex('numbers_i_like', [20, 3, 6]);
things_to_save.push(robj);
}
riak.bucket('users').objects.save(things_to_save, function(errs, objs) {
console.log(objs);
console.warn(errs);
});
Results returned through objs
will be returned as an Array
of successfully saved RObject
s if you passed in an Array
of RObject
s to the request. If the request produced errors then the err
value will be an Array
of Error
's for each that occurred or null
if there were none. Error.data
will contain the RObject
for the failed save. The value of objs
value will be null
none of the RObjects
could be saved in Riak or all requests produced errors.
If a single RObject
is passed in (not wrapped in an Array
), then the single successfully saved RObject
will be returned directly through objs
for convenience, and will be null
if an error occurred. In the case of an error err
will contain the single Error
directly or will be null
if none occurred. This is provided for convenience when you're not bulk saving multiple objects.
####Bucket.objects.save( r_objects ).stream()
// save one or more RObjects to Riak and get streamed results.
All the same details apply here as in the standard Bucket.objects.save()
method, except to signal that you want to stream the result notifications back from Riak you drop the callback from .save()
, and append .stream()
, and attach your listeners to the returned EventListener
.
var things_to_save = [];
for(var i = 0; i < 5; i++) {
var robj = riak.bucket('users').objects.new('me'+i, { about_me: "I write software." });
robj.addMeta('extra_info', 'data goes up in here');
robj.addToIndex('numbers_i_like', [20, 3, 6]);
things_to_save.push(robj);
}
riak.bucket('users').objects.save(things_to_save).stream()
.on('data', function(obj) {
console.log(obj);
})
.on('error', function(err) {
console.warn(err);
})
.on('end', function() {
// do something else after we're all done getting the objects.
});
####Bucket.objects.delete( r_objects, [callback] )
// delete one or more RObjects from Riak.
You can pass a single RObject
or an Array
of RObject
s into this method. Just like with the .get()
and .save()
methods, each delete request will happen asynchronously in parallel.
riak.bucket('users').objects.delete(array_of_robjs, function(errs, objs) {
console.log(objs);
console.warn(errs);
});
Results returned through objs
will be returned as an Array
of successfully deleted RObject
s if you passed in an Array
of RObject
s to the request. If the request produced errors then the err
value will be an Array
of Error
's for each that occurred or null
if there were none. Error.data
will contain the RObject
for the failed delete. The value of objs
value will be null
none of the RObjects
could be deleted from Riak or all requests produced errors.
If a single RObject
is passed in (not wrapped in an Array
), then the single successfully deleted RObject
will be returned directly through objs
for convenience, and will be null
if an error occurred. In the case of an error err
will contain the single Error
directly or will be null
if none occurred. This is provided for convenience when you're not bulk deleting multiple objects.
####Bucket.objects.delete( r_objects ).stream()
// delete one or more RObjects to Riak and get streamed results.
All the same details apply here as in the standard Bucket.objects.delete()
method, except to signal that you want to stream the result notifications back from Riak you drop the callback from .save()
, and append .stream()
, and attach your listeners to the returned EventListener
.
riak.bucket('users').objects.delete(array_of_robjs).stream()
.on('data', function(obj) {
console.log(obj);
})
.on('error', function(err) {
console.warn(err);
})
.on('end', function() {
// do something else after we're all done deleting the objects.
});
####Bucket.objects.all( callback )
// traverse a bucket to get all the RObjects in it. DO NOT USE IN PRODUCTION, okay? OK.
This method is the equivalent of the dreaded list keys operation… except even more deadly! This will not only perform a list keys on the bucket, but it will also .get()
every single key it finds. Use this only for development/testing purposes. If you think you need this in production, you're doing it wrong!
riak.bucket('users').objects.all(function(err, objs) {
console.warn("I BETTER NOT BE IN PRODUCTION");
console.warn("NO, SERIOUSLY.");
console.log(objs);
});
#RObject Operations ##RObject instance attributes
- ####.bucket
The
Bucket
instance that created thisRObject
.
- ####.key
The key for this object in Riak. Defaults to
null
, which will cause Riak to generate a key when saving the RObject.
- ####.data
The data stored in Riak for this object. Defaults to
{}
.
- ####.metadata
The metadata on the object stored in Riak, ie. vclock, content_type, last_modified, etc. Defaults to
{}
. A fully populatedmetadata
object might look like this:
{ vclock: 'a85hYGBgzGDKBVIcR4M2cgcY8xzJYEpkzGNlcJ708CRfFgA=', meta: { info: 'you might want to know', extra: 'stuff you can store here' }, index: { words: { bin: ['other', 'that', 'the', 'this'] }, mynumbers: { int: ['5', '37', '4234'] } }, link: '</buckets/test>; rel="up"', last_modified: 'Thu, 04 Oct 2012 04:53:23 GMT', etag: '"BTYdk4S0eisdrIYeA1OkM"', content_type: 'application/json', status_code: 200 }
- ####.options
An
Object
for setting Riak query parameters. Defaults to{}
.
- ####.siblings
An
Array
of siblingRObjects
for this object in Riak. Defaults toundefined
.
##RObjects instance methods ####RObject.save( [callback] )
// saves this RObject instance to Riak. Uses RObject's .options
for the request if they exist.
var my_obj = riak.bucket('users').objects.new('my_key', { my_profile: "Is Amazing!" });
my_obj.save(function(err, obj) {
console.log(obj);
});
####RObject.delete( [callback] )
// deletes this RObject instance from Riak. Uses RObject's .options
for the request if they exist.
my_obj.delete(function(err, obj) {
console.log(obj);
});
####RObject.fetch( [resolver_fn], callback )
// fetch/refresh object data from Riak and update this RObject's data, metadata, siblings. Uses RObject's .options
for the request if they exist.
var my_obj = riak.bucket('users').objects.new('my_key');
my_obj.fetch(function(err, obj) {
console.log(obj);
});
Notice that this method can also take a custom resolver_fn
in the event that reading updated data from Riak results in finding siblings. If this isn't set it defaults to the resolver function on the Bucket
instance that produced this RObject
.
####RObject.setMeta( name, value )
// set an X-Riak-Meta entry on this RObject.
riak.bucket('users').object.get('me', function(err, obj) {
obj.setMeta('extra', 'stuff can go here'); // sets metedata.meta.extra to 'stuff can go here'
});
####RObject.getMeta( name )
// get an X-Riak-Meta entry to this RObject.
riak.bucket('users').object.get('me', function(err, obj) {
obj.getMeta('extra'); // will get the value of metedata.meta.extra
});
####RObject.removeMeta( name )
// remove an X-Riak-Meta entry from this RObject.
riak.bucket('users').object.get('me', function(err, obj) {
obj.removeMeta('extra'); // will delete metedata.meta.extra
});
####RObject.addToIndex( name, value )
// add an X-Riak-Index entry to this RObject.
The name
parameter sets the base name of the index. The type, _int
or _bin
, of the index will be determined by the data type of value
. Anything passed into value
that is an integer will be set in metadata.index.name.int
, and anything else will be set in metadata.index.name.bin
.
riak.bucket('users').object.get('me', function(err, obj) {
obj.addToIndex('things', 'apple'); // will add 'apple' to metadata.index.things.bin
obj.addToIndex('counts', [2, 4, 6]); // will add 2, 4, and 6 to metadata.index.counts.int
});
In the second example an Array
of values was passed into the value
parameter. This will add all the values in the array to the named index. All the values should be of the type in the array.
####RObject.getIndex( name )
// get all values as an Array
in an X-Riak-Index entry on this RObject.
riak.bucket('users').object.get('me', function(err, obj) {
obj.getIndex('things'); // value of metadata.index.things.bin or .int, whichever exists
});
####RObject.removeFromIndex( name, value )
// remove a specific value from an X-Riak-Index entry on this RObject.
Just like with the .addToIndex()
method, the actual type of the underlying index will be determined based on the data type of the value
you pass in.
riak.bucket('users').object.get('me', function(err, obj) {
obj.removeFromIndex('things', 'apple'); // removes 'apple' from metadata.index.things.bin
});
####RObject.clearIndex( name )
// clears all values out of an X-Riak-Index entry on this RObject.
riak.bucket('users').object.get('me', function(err, obj) {
obj.clearIndex('things'); // deletes metadata.index.things completely
});
#Counter Operations ##Counter instance attributes
- ####.bucket
The
Bucket
instance that thisCounter
belongs to.
- ####.name
The name of the
Counter
that will be worked on in theBucket
##Counter instance methods
NOTE:
In order to use Riak Counters you must set
allow_mult: true
on the bucket properties for the bucket where you'll create your counters; as the CRDT counter functionality in Riak depends on siblings being created on write.
####client.counter( bucket_name, counter_name );
// factory method on the client that returns an instance of a Counter object.
var likes = riak.counter('the_counts', 'likes');
####Counter.add( amount, callback )
// adds amount
to the current value of a Riak counter.
var likes = riak.counter('the_counts', 'likes');
likes.add(5, function(err) {
console.log(err);
});
####Counter.subtract( amount, callback )
// subtracts amount
from the current value of a Riak counter.
var likes = riak.counter('the_counts', 'likes');
likes.subtract(5, function(err) {
console.log(err);
});
####Counter.value( callback )
// gets the current value of a Riak counter.
var likes = riak.counter('the_counts', 'likes');
likes.value(function(err, count) {
console.log(count); // will be the value of the count as an integer
});
##Sibling Auto-Resolution
By default nodiak's Bucket class contains a client-side implementation of Last-Write-Wins for managing resolution. It takes an array of sibling RObjects and returns the resolved sibling. Implemented as follows:
Bucket.siblingLastWriteWins = function(siblings) {
function siblingLastModifiedSort(a, b) {
if(!a.metadata.last_modified || new Date(a.metadata.last_modified) < new Date(b.metadata.last_modified)) {
return 1;
}
else {
return -1;
}
}
siblings.sort(siblingLastModifiedSort);
return siblings[0];
};
The returned object then has the original set of siblings attached to it and is sent up to the user through a callback on the original get
or fetch
request. This is true of bulk/batch operations as well. Each RObject in the batch will be its resolved version with all its original siblings attached on a .siblings
property.
When you write your own auto-resolution functions they should conform to this spec, taking an Array
of RObjects
as input and returning a single resolved RObject
as output.
You can provide your own auto-resolution functions either by overriding a Bucket instance's default resolver, like:
var my_bucket = riak.bucket('some_bucket');
my_bucket.resolver = function(conflicted_siblings) {
// If they don't shut-up you'll turn this car around!
var resolved = conflicted_siblings.pop();
return resolved;
};
and/or on every request for an object, like:
var my_resolver = function(conflicted_siblings) {
// If they don't shut-up you'll turn this car around!
var resolved = conflicted_siblings.pop();
return resolved;
};
riak.bucket('some_bucket').object.get(['key1', 'key2'], my_resolver, function(err, r_objs) {
// Any RObject in the r_objs Array that had siblings will have been
// resolved using my_resolver, and will have it's originally siblings
// available through a .siblings property.
});
The RObject .fetch()
method also accepts a user defined resolver function preceding the callback in its arguments list. See the RObject.fetch()
documentation above for details.
NOTE:
Additionally, the
Bucket
class has aboolean
property.getSiblingsSync
that controls the behavior of how siblings will be retrieved from Riak. The default isfalse
which will cause nodiak to make parallel async requests to get all the siblings at once. If set totrue
then nodiak will attempt to retrieve all the siblings as a single large 'multipart/mixed' document.
The tradeoff is in the overhead in making lots and lots of smaller requests compared to one monolithic request. The former can impact your network and cluster more negatively as it has to deal with the concurrent requests, but the latter also induces additionally time blocking while lots of data is being read and processed for a single request.
##Riak Search and Riak 2i's
####Bucket.search.solr( query, callback )
// perform a Riak Search operation and retrieve the standard Riak Search response object.
The query
should be an Object
containing properties that map to the URL query params specified in Querying via the Solr Interface. The wt
parameter defaults to JSON.
var query = {
q: 'field1:been',
'q.op': 'and',
start: 25
};
riak.bucket('test').search.solr(query, true, function(err, response) {
console.log(response);
});
{ responseHeader: { QTime: 54, params: { q: 'field1:been', 'q.op': 'or', filter: '', wt: 'json' }, status: 0 }, response: { maxScore: '0.353553', numFound: 100, start: 0, docs: [ { props: {}, index: 'test', fields: { field1: 'has been set' }, id: '11OAOmQHJmn9fcCfO5gPZatHdyL' }, ... ] } }
####Bucket.search.solr( query ).stream()
// perform a Riak Search operation and stream back the docs
elements as RObject
s
Just as with the non-streaming version of this call, the query
should be an Object
containing properties that map to the URL query params specified in Querying via the Solr Interface. The wt
parameter defaults to JSON.
var query = {
q: 'field1:been',
'q.op': 'and',
start: 25
};
var compiled_results = [];
riak.bucket('test').search.solr(query).stream()
.on('data', function(obj) {
compiled_results.push(obj);
})
.on('error', function(err) {
console.warn(err);
})
.on('end', function() {
// we're all done fetching results.
});
This is simply provided as a convenience for when you know specifically that you want to fetch the actual objects in Riak referenced by the search result.
####Bucket.search.twoi( query, index, [options], callback )
// a 2i's range query that returns the list of matching keys (options is for Riak 1.4+).
The query format in nodiak for a 2i's search is an Array
tuple containing the beginning and end of the range you want to search ['a','zzzzzzzzz']
, or just a scalar value when you want to do an exact match 'match_this'
.
You do not need to provide the _int
or _bin
suffix to the index
name. This is derived for you from the type of data you pass in to your query. If the type is explicitly an integer Number
then _int
will be used, otherwise _bin
will be automatically assumed.
riak.bucket('test').search.twoi([0,10000], 'my_numbers', {max_results:50}, function(err, keys, continuation) {
console.log(keys);
// if you're using 2i paging support in Riak 1.4 then you can use
// the `continuation` to send into your next request in the options object.
});
The response will be an Array
of the matching keys with the duplicates removed. Riak by default adds a key to the matching set for every match, so if you have multiple entries in an index that match your query, then you'll get duplicate entries of that key in the results. If for some reason you need those duplicates then you can use the underlying backend adapter client directly.
####Bucket.search.twoi( query, index, options ).stream()
// a 2i's range query that streams back the matched keys (options is for Riak 1.4+).
var compiled_results = [];
riak.bucket('test').search.twoi([0,10000], 'my_numbers', ).stream()
.on('data', function(obj) {
compiled_results.push(obj);
});
.on('error', function(err) {
console.warn(err);
})
results.on('end', function(continuation) {
// we're all done fetching results.
// if you're using 2i paging support in Riak 1.4 then you can use
// the `continuation` to send into your next request in the options object.
});
NOTE:
If you try to use the
return_terms: true
option on the streaming form it currently breaks because I don't have a good idea in mind what a use case forreturn_terms: true
is, and so I'm having a hard time thinking up what the API or resulting data should look like.
My current thoughts are to iterate through the result set and stream back
RObject
s that simply have a.term
property set on them that has the original matched-term set on it.
##MapReduce
MapReduce queries in nodiak are performed by chaining map
, link
, and reduce
phases together on a set of inputs
and then finally by running the chain using execute
.
Each of these phases, including the inputs conforms to the spec set forward by Basho in their documentation.
Also, nodiak allows you supply your own ad-hoc native functions as the source
parameter on your map
and reduce
phase specifications. If your phase has 'language' : 'javascript'
set, then it will simply convert your function to it's string source representation before attaching it to the source
parameter.
For ad-hoc Erlang functions to work in Riak you have to add {allow_strfun, true}
to the riak_kv
section of your app.config files. When send in a native Javascript function as the source
parameter of your phase specification, and 'language' : 'erlang'
is set, then that native function must return from it a string representation of a valid Erlang MapReduce function.
NOTE:
Only use ad-hoc queries in development, they're slower and open up potential security risks compared to queries pre-deployed in your cluster.
###client.mapred.inputs( inputs, [include_data] ) ####.map( phase ) .link( phase ) .reduce( phase ) .execute( callback )
// MapReduce w/ pre-aggregated results.
riak.mapred.inputs([['a_bucket','key1'], ['b_bucket','key2']])
.map({
language: 'erlang',
module: 'riak_kv_mapreduce',
function: 'map_object_value',
arg: 'filter_notfound'})
.reduce({
language: 'erlang',
module: 'riak_kv_mapreduce',
function: 'reduce_count_inputs'})
.execute(function(err, results) {
if(!err) console.log(results);
}
);
####.execute( ).stream()
// MapReduce w/ streaming results.
var compiled_results = [];
riak.mapred.inputs('test')
.map({
language: 'erlang',
module: 'riak_kv_mapreduce',
function: 'map_object_value',
arg: 'filter_notfound'})
.reduce({
language: 'erlang',
module: 'riak_kv_mapreduce',
function: 'reduce_count_inputs'})
.execute().stream()
.on('data', function(result) {
compiled_results.push(result);
})
.on('end', function() {
console.log(compiled_results);
})
.on('error', function(err) {
// Handle this however you like.
});
NOTE:
You can also pass an
Array
ofRObject
s into theinputs
section of a MapReduce query. Nodiak will automatically run through each object and convert the RObject into the[bucket, key]
pairing that Riak expects. Additionally, if you setinclude_data
totrue
as the secondinputs
argument, then nodiak will also include the RObject's data, like[bucket, key, data]
.
Tests
The test suite can be run by simply:
$ cd /path/to/nodiak
$ npm install -d
$ npm test
The test suite can take several environment variables to modify what gets tested. This is useful for when you don't have dependent Riak features enabled (eg. skipping the 2i tests if you don't have a 2i capable Riak KV storage backend defined). The options and their defaults are as follows:
$ NODIAK_BACKEND=http NODIAK_HOST=localhost NODIAK_PORT=8098 NODIAK_SEARCH=false NODIAK_2I=false npm test`
#Todos
- Add a protobufs backend implementation.
License
(The MIT License)
Copyright (c) 2012 Coradine Aviation Systems
Copyright (c) 2013 Nathan Aschbacher
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.