ksqldb-js
v1.1.2
Published
Javascript KsqlDB client for Node.js
Downloads
7
Readme
ksqlDB-JS
Table of Contents
About the Project
ksqlDB-JS - a ksqlDB client for Node.js
Prerequisites
Node.js - https://nodejs.org/en/
ksqlDB - https://ksqldb.io/
Docker (for tests) -https://www.docker.com/
Getting Started
Install package from Node package manager
npm install ksqldb-js
Usage
Create a client in the application file:
const ksqldb = require('ksqldb-js');
const client = new ksqldb({ksqldbURL: '<url to ksqlDB server>'})
To run tests initiate Docker containers included in yaml file:
docker-compose up
npm test
Features
Create a pull query
client.pull("SELECT * FROM myTable;");
Create a push query
client.push('SELECT * FROM myTable EMIT CHANGES;',
(data) => {
console.log(data);
});
Terminate persistent query
e.g. a push query
client.terminate(queryId);
Insert rows of data into a stream
client.insertStream('myTable', [
{ "name": "jack", "email": "[email protected]", "age": 25 },
{ "name": "john", "email": "[email protected]", "age": 20 }
]);
List streams/queries
client.ksql('LIST STREAMS;');
Create table/streams
client.createStream('testStream',
columnsType = ['name VARCHAR', 'email varchar', 'age INTEGER'],
topic = 'testTopic',
value_format = 'json',
partitions = 1);
For custom SQL statements including complex joins use the .ksql method
client.ksql('DROP STREAM IF EXISTS testStream;');
SQL Query builder
Please use the built-in query builder to parametrize any SQL query to avoid SQL injection.
const builder = new queryBuilder();
const query = 'SELECT * FROM table WHERE id = ? AND size = ?';
const finishedQuery = builder.build(query, 123, "middle");
client.ksql(finishedQuery);
Create table as
Generating a materialized view that can be
client.createTableAs('testTable', 'sourceStream', selectArray = ['name', 'LATEST_BY_OFFSET(age) AS recentAge'],
propertiesObj = {topic:'newTestTopic'},
conditionsObj = {WHERE: 'age >= 21', GROUP_BY: 'name'});
Create stream as
client.createStreamAs('testStream', selectColumns = ['name', 'age'], 'sourceStream',
propertiesObj = {
kafka_topic: 'testTopic',
value_format: 'json',
partitions: 1
},
conditions = 'age > 50');
Pull from to
Pull stream data between two time points
client.pullFromTo('TESTSTREAM', 'America/Los_Angeles',
from = ['2022-01-01', '00', '00', '00'],
to = ['2022-01-01', '00', '00', '00']);
Troubleshooting methods (.inspectServerStatus, .inspectQueryStatus, .inspectClusterStatus )
Developers
Contributions
Contributions to the code, examples, documentation, etc. are very much appreciated.
- Please report issues and bugs directly in this GitHub project.
License
This product is licensed under the MIT License - see the LICENSE.md file for details.
This is an open source product.
This product is accelerated by OS Labs.
ksqlDB is licensed under the Confluent Community License.
Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation.