cassmask
v1.7.8
Published
An Observable Driven Apache Cassandra ORM
Downloads
39
Maintainers
Readme
CassMask (tested with Apache-Cassandra 3.x)
DataTypes Thoroughly tested
- TIMEUUID
- UUID
- TIMESTAMP
- INT
- TEXT
DataTypes not tested (will probably work)
- ASCII
- BIGINT
- SMALLINT
- TINYINT
- VARCHAR
- VARINT
- FLOAT
- DOUBLE
- DECIMAL
- BOOLEAN
DataTypes not tested (will probably NOT work)
- BLOB
- INET
DataTypes that WILL not work
- COUNTER
- MAP
- SET
- LIST
- User Defined Data types
Table of Contents
Quick Start
$ npm install cassmask
Connect to Cassandra with a ClientOptions object.
import { connect } from 'cassmask';
// ClientOptions Object
const config = {
contactPoints: ['127.0.0.1'],
protocolOptions: { port: 9042 },
queryOptions: { consistency: 1 },
keyspace: 'demo-space'
};
connect(config, (err, result) => {
// callback after connection
});
Create a model (click here for a ES5 example)
import * as cassmask from 'cassmask';
import { toTimeStamp, now, uuid } from 'cassmask';
// An interface for your Schema is not necessary but
// it enables Typescript to type-check the key-value paires
// placed into cassmask's query functions
interface IModelSchema extends cassmask.ISchema {
col1?: cassmask.UUID;
col2?: cassmask.TIMEUUID;
col3?: cassmask.TIMESTAMP;
col4?: cassmask.TEXT;
col5?: cassmask.INT;
}
class ModelSchema extends cassmask.Schema {
// declare schema type values
col1 = {
type: cassmask.UUID,
default: uuid()
};
col2 = {
type: cassmask.TIMEUUID,
default: now()
};
col3 = {
type: cassmask.TIMESTAMP,
default: toTimeStamp(now())
};
col4 = {
type: cassmask.TEXT,
required: true
};
col5 = {
type: cassmask.INT,
required: 'Col5 must be present!!'
};
// Primary Keys (Partition & Clustering Column)
keys = ['col1', 'col2'];
constructor() {
super();
}
// Define validate_[column name] function to create
// a validate function for that key
validate_col4(col4, next) {
if(col4.length >= 5) next();
else next('col4 is not long enough!');
}
validate_col5(col5, next) {
if(col5 >= 5) next();
else next('col5 value is not large enough!');
}
// Define pre_create, pre_update, pre_remove, pre_find
// post_create, post_update, post_remove, post_find
// functions to set hook events
pre_create(next, err) {
this.col4 += ' test pre hook!';
next();
}
// Other defined functions will be integrated into Entity scope
}
export default cassmask.model<IModelSchema>('Model', new ModelSchema());
Find, create, remove, update...
import Model from 'path/to/model.ts';
let holder = [];
/*
Existing rows in Model Table
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before1 test pre hook!', col5: 6 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before2 test pre hook!', col5: 7 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before3 test pre hook!', col5: 8 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before4 test pre hook!', col5: 9 }
*/
Model.create([{
col4: 'test1',
col5: 49
}, {
col4: 'test2',
col5: 23
}, {
col4: 'test3',
col5: 97
}, {
col4: 'test4',
col5: 57
}]).remove([{
col1: 'generated uuid', // before2
col2: 'generated timeuuid'
}, {
col1: 'generated uuid', // before4
col2: 'generated timeuuid'
}]).find().seam().subscribe( // Every next argument will be an Entity object for every query executed
x => holder.push(x),
err => console.log(err),
() => {}
);
/*
Model Table
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before1 test pre hook!', col5: 6 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before3 test pre hook!', col5: 8 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test1 test pre hook!', col5: 49 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test2 test pre hook!', col5: 23 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test3 test pre hook!', col5: 97 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test4 test pre hook!', col5: 57 }
*/
Why Observables?
Observables are fancy promises that operate under event streams which give us opportunities to seam these streams together and filter results. For more information about observables and the ReactiveX RxJS library click here.
Everything is an Observable.
In CassMask every query, event hook, and validation is executed inside an observable and it uses RxJS concatenate to seam them together to ensure proper sequence on subscribe.
Event Sequence Per Query Function/Query Object
| REQUIRE -> | VALIDATION -> | PRE -> | QUERY -> | POST | |:-------:|:-------------:|:------:|:--------:|:----:| | condition | function | function | function | function |
NOTE: require is create specific and validation is create/update specific
The Entity.
Once a query is executed in the stream an Entity object (or array of Entities) will be passed as the observer.next() argument giving the developer an opportunity to interact with it. The Entity object gives us access to a couple useful functions, save() and remove(). Save() will take the current attributes of the Entity and create a query string to UPDATE (INSERT) a row in the table. Similar to save(), remove() will take the current attributes and DELETE from the table. An Entity (or array of Entities) will be created from either the response row array from the corresponding query, or if the query does not respond with data (UPDATE, INSERT, DELETE), will be created from the key value pairs that were used in the original Schema query function.
Important Limitations.
In order to use CassMask well, it's important to understand CQL. If the query does not respond with data (SELECT) then a particular Entity instance may not have the attributes to INSERT, UPDATE, and/or DELETE rows in a table. Specifically the necessary Partition Key and Clustering Columns to obtain the specificity required to UPDATE/DELETE a certain row. The remove() and update() querying functions in your Schema object will require your Primary Keys in order to function correctly so the problem is not as apparent as create() where certain keys may be generated on the database itself (uuid, timeuuid, timestamp, etc..).
An event hook could be used to protentially solve this delema. We could hook into the 'create' event and execute a SELECT query to find the most recently appended row and pass this object through the observer.next() argument, which could be used in your endpoint. This is not done automatically because of the level of involvement required to create tables with a certain clustering order.
Entity Example
let holder = [];
// findOne with no argument will find the first row in the
// table and pass an Entity with the found row to next()
// find with no argument will find the all rows in the
// table and pass an Entity with the found rows to next()
Model.findOne().find().seam().subscribe( // two queries are executed and two next() functions are called
model => holder.push(model),
err => console.log(err),
() => {
holder[0].col4 = 'awesome_example'; // index 0 is the findOne query argument
holder[0].col5 = 49;
/*
Model Table after holder[0].save() but before holder[1] interaction
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example', col5: 49 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'before3', col5: 2 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test1', col5: 49 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test2', col5: 23 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test3', col5: 97 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'test4', col5: 57 }
*/
holder[0].save().subscribe(model => {
holder[1].forEach(entity => { // holder[1] is an array of Entity Object from find()
entity.col4 = 'awesome_example2';
entity.col5 = 99;
entity.save().subscribe(x => {}, err => console.log(err));
});
}, err => console.log(err));
});
/*
Model Table after holder[1] interaction
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
{ col1: '[uuid]', col2: '[timeuuid]', col3: '[created time]', col4: 'awesome_example2', col5: 99 }
*/
Validation
Validation in CassMask can be configured per schema property basis. Configure a validation function to pass an error message and stop the event stream when conditions are not met.
Valiation Example (click here for an ES5 example)
interface IModelSchema extends cassmask.ISchema {
id?: cassmask.UUID;
name?: cassmask.TEXT;
points?: cassmask.INT;
}
class ModelSchema extends cassmask.Schema {
// declare schema type values
id = {
type: cassmask.UUID,
default: uuid()
};
name = {
type: cassmask.TEXT,
required: 'Must have name!'
};
points = cassmask.INT;
// Primary Keys (Partition & Clustering Column)
keys = ['id', 'name'];
constructor() {
super();
}
// Define validate_[column name] function to create
// a validate function for that key
validate_name(name, next) {
if(name.length >= 5) next();
else next('name is not long enough!');
}
}
export default cassmask.model<IModelSchema>('Model', new ModelSchema());
Event Hooks
In CassMask there are PRE and POST event hooks for 'create', 'update', 'remove', and 'find'. These hooks are embedded inside each Entity (Instance) scope and they can be used to interact with the Entity object in the stream before/after a corresponding query execution/response.
Socket.io example
events.ts
import Model from './path/to/model.ts';
let EventEmitter = require('events').EventEmitter;
let ModelEvents = new EventEmitter();
// Set max event listeners (0 == unlimited)
ModelEvents.setMaxListeners(0);
/*
The lastEntity; for create(), update(), and remove(): Entity object that was originally passed into the query
for find() and findOne(): the rows found in the table
the next; this is a callback that expects zero or one argument.
callback will call observer next() and complete() functions
argument will be passed into next()
The client; this is the cassandra client, which can be used to execute additional queries
*/
Model.post('create', function(next, err) {
// callback code goes here
Model.Events.emit('create', this);
next(this);
});
export default ModelEvents;
socket.ts
/**
* Broadcast updates to client when the model changes
*/
import ModelEvents from './path/to/events.ts';
// Model events to emit
let events = ['create'];
function modelRegister(socket) {
// Bind model events to socket events
for (let i = 0, eventsLength = events.length; i < eventsLength; i++) {
let event = events[i];
let listener = createListener('Model:' + event, socket);
// the Emitter will listen for changes in the model
ModelEvents.on(event, listener);
// when a disconnect comes from the socket then remove the listener
socket.on('disconnect', removeListener(event, listener));
}
}
// create listener funtion
function createListener(event, socket) {
return function(row) {
socket.emit(event, row);
};
}
// remove listener function
function removeListener(event, listener) {
return function() {
ModelEvents.removeListener(event, listener);
};
}
export default modelRegister;
execute modelRegister(socket) in your socketio.config onConnect function
CassMask API
connect(config: ClientOptions, cb: Function): void
- connects to the cassandra nodes
- cb function to be fired once connection sring returns
model(tableName: string, schema: Schema, indexes?: Array<string | Array<string>>): Model
- creates a Model object with querying capabilities
- indexes is an array of secondary indexes you can create for this model's table
- NOTE: to change/add the indexes you will need to drop the table for cassmask to create it again
Model API
seam(): Rx.Observable<any>
- returns single concatenated observables
find(items?: Object, opts?: FindOptions): Schema
- first argument can be empty or an object
- If no arguments or empty object, will SELECT all rows in the table
- first argument should contain the columns for the WHERE clause
- columns should be in the same order as the primary keys
- second argument is a find options object
- options.attributes: Array | Object, columns to be included in the SELECT response
- if attributes is an object, exclude: Array will exclude columns from the SELECT response
- options.attributes: Array | Object, columns to be included in the SELECT response
- query will return an array of object or a single object
findOne(items?: Object, opts?: FindOptions): Schema
- first argument can be empty or an object
- If no arguments or empty object, will SELECT the first row in the table
- first argument should contain the columns for the WHERE clause
- columns should be in the same order as the primary keys
- second argument is a find options object
- options.attributes: Array | Object, columns to be included in the SELECT response
- if attributes is an object, exclude: Array will exclude columns from the SELECT response
- options.attributes: Array | Object, columns to be included in the SELECT response
- query will return a single object
findById(id: string, opts? FindOptions): Schema
- first argument must be an id
- second argument is a find options object
- options.attributes: Array | Object, columns to be included in the SELECT response
- if attributes is an object, exclude: Array will exclude columns from the SELECT response
- options.attributes: Array | Object, columns to be included in the SELECT response
- query will return a single or array of objects depending schema design
create(items: Object | Array<Object>, opts?: SchemaOptions): Schema
- first argument can be an object or array of objects
- objects must contain all columns to be inserted into the row
- primary keys are mandatory
update(object: UpdateObject | Array<UpdateObject>, opts?: SchemaOptions): Schema
- first argument can be an object or array of objects
- objects must contain two subobjects, 'set' and 'where'
- 'set' should contain all the columns you wish to SET (primary keys not allowed)
- 'where' should contain the primary key columns to find the row to UPDATE
remove(object?: Object | Array<Object>, opts?: SchemaOptions): Schema
- first argument can be empty, an object, or array of objects
- objects must contain the primary keys for the WHERE clause to DELETE the row
post(hook: string | Array<string>, cb: Function): void;
- specify one or multiple hooks ('create', 'update', 'remove', or 'find') as the first argument
- specify the callback as the function to execute
- callback passes two arguments, next(object?), err(message?), and entity: Entity | Array
- callback must call one of them
- sets the cooresponding hook function the Schema/Entity scopes
pre(hook: string | Array<string>, cb: Function): void;
- specify one or multiple hooks ('create', 'update', 'remove', or 'find') as the first argument
- specify the callback as the function to execute
- callback passes two arguments, next(object?), err(message?), and entity which are both functions
- callback must call one of them
- sets the cooresponding hook function the Schema/Entity scopes
methods(scope: Object): void;
- scope object containing properties that will be integrated into all instantiated Entity objects
createIndex(indexes: string | Array<string | Array<string>>): void
- creates a secondary index for the talbe on table creation
- NOTE: to change/add the indexes you will need to drop the table for cassmask to create it again
UpdateObject
- set: Object; object with table attribute key value pairs to update
- where: Object; object with PRIMARY KEY key value pairs
- ex: { set: {name: 'test'}, where: {id: '123456'}}
SchemaOptions
- using?: string; is the CQL update parameter(s) appended in the INSERT, DELETE, UPDATE string
- ex: {using: 'TTL 86400'}
- if?: string; is the CQL conditions to execute INSERT, DELETE, UPDATE string
- ex: {if: 'exists'}
- ex: {if: 'played > 10'}
FindOptions
- attributes?: Array<string | Object>; "columns to select or exclude from SELECT query"
- ex: {attributes: ['name', 'created', 'id']}
- ex: {attributes: {exclude: ['catagory']}}
- groupBy?: string; CQL group by string to append to SELECT query
- ex: {groupBy: 'created, name'}
- orderBy?: string; CQL order by string to append to SELECT query
- ex: {orderBy: 'created desc'}
- perParitionLimit?: number; limit select per partition
- ex: {perPartitionLimit: 3}
- limit?: number; limit the SELECT response to certain number of rows
- ex: {limit: 1}
- allowFiltering?: boolean; append ALLOW FILTERING to query string
- ex: {allowFiltering: true}
- It is recommended that this not be used as it causes cluster wide search
Entity API
new Entity(item: Object, model: Schema)
- item should be an object with the key value pairs for a row in a table
- state should be the Schema state this Entity belongs to
- every Schema will have a state which contains all the important information regarding the model, table, and more
import { Entity } from 'cassmask';
import Model from '/path/to/model.ts';
const object = {
col1: [uuid],
col2: [timeuuid],
col3: [created time],
col4: 'awesome',
col5: 67
};
let entity = new Entity(object, Model);
entity.save();
isEmpty(): boolean
- will return true if no column attributes are set in the Entity object
- else false
merge(item: Object): Entity
- will merge object will the Entity, overriding any matching attributes in the Entity with the object's
save(): Rx.Observable
- creates a query string based off this Entity's attributes
- will distinguish if the query is an update or insert and execute the appropriate post callback
- will NOT execute a pre callback as the object
- executes UPDATE query on subscribe
remove(): Rx.Observable
- creates a query string based off this Entity's attributes
- will NOT execute a pre callback as the object
- executes DELETE query on subscribe
TODO
- Data Type support (Map, Sets, Lists, custom)