@specialblend/kafka-pipe
v0.0.9
Published
A functional/fluent utility for kafka
Downloads
3
Maintainers
Readme
kafka-pipe
A functional/fluent utility for kafka, built on top of kafka-node
.
Install
npm install @specialblend/kafka-pipe
Classes
Constants
PipeConsumer
Callable kafka Consumer with pipe and error helper methods
Kind: global class
pipeConsumer.pipe(handler) ⇒ PipeConsumer
Pipe incoming messages to provided handler
Kind: instance method of PipeConsumer
Returns: PipeConsumer - self
| Param | Type | Description | | --- | --- | --- | | handler | function | message handler function |
pipeConsumer.error(handler) ⇒ PipeConsumer
Alias for this.on('error')
Kind: instance method of PipeConsumer
Returns: PipeConsumer - self
| Param | Type | Description | | --- | --- | --- | | handler | function | error handler function |
pipeConsumer.__call__(handler) ⇒ PipeConsumer
Make instance callable alias of this.pipe
Kind: instance method of PipeConsumer
Returns: PipeConsumer - self
| Param | Type | Description | | --- | --- | --- | | handler | function | message handler function |
PipeProducer
Callable kafka Producer when instance is called directly, acts like PipeProducer.send
Kind: global class
- PipeProducer
- new PipeProducer(client, options)
- .send(payload) ⇒ Promise.<*>
- .call(payload) ⇒ Promise.<*>
new PipeProducer(client, options)
Create
| Param | Type | Description | | --- | --- | --- | | client | Client | kafka client | | options | Object | opyions |
pipeProducer.send(payload) ⇒ Promise.<*>
Send a payload
Kind: instance method of PipeProducer
Returns: Promise.<*> - result
| Param | Type | Description | | --- | --- | --- | | payload | Array.<String> | payload |
pipeProducer.__call__(payload) ⇒ Promise.<*>
Make instance callable alias of this.send
Kind: instance method of PipeProducer
Returns: Promise.<*> - result
| Param | Type | Description | | --- | --- | --- | | payload | Array.<String> | payload |
PipeSender
Callable kafka PipeProducer which allows presetting a destination topic and options
Kind: global class
- PipeSender
- new PipeSender(client, topic, payloadOptions, producerOptions)
- .send(messages) ⇒ Promise.<*>
- .call(payload) ⇒ Promise.<*>
new PipeSender(client, topic, payloadOptions, producerOptions)
Curry topic and payload options
| Param | Type | Description | | --- | --- | --- | | client | Client | kafka client | | topic | String | kafka topic name | | payloadOptions | Object | options to include with outgoing payloads | | producerOptions | Object | producer options |
pipeSender.send(messages) ⇒ Promise.<*>
Send messages to preset topic, with preset options
Kind: instance method of PipeSender
Returns: Promise.<*> - returned Promise
| Param | Type | Description | | --- | --- | --- | | messages | Array.<String> | an array of messages to send |
pipeSender.__call__(payload) ⇒ Promise.<*>
Make instance callable alias of this.send
Kind: instance method of PipeSender
Returns: Promise.<*> - result
| Param | Type | Description | | --- | --- | --- | | payload | Array.<String> | payload |
PipeTransformer
Consumer/producer mixin that
pipes messages from sourceTopic
into transformer
function,
and sends result to destinationTopic
,
or deadLetterTopic
on error
Kind: global class
new PipeTransformer(transformer, client, sourceTopic, destinationTopic, deadLetterTopic)
create a PipeTransformer
| Param | Type | Description | | --- | --- | --- | | transformer | function | the transformer function | | client | Client | kafka Client | | sourceTopic | String | name of topic to read from | | destinationTopic | String | name of topic to send to | | deadLetterTopic | String | name of topic to send failed payloads |
Client
Kafka Client
Kind: global class
new Client(kafkaHost, options)
Create a kafka Client
| Param | Type | Description | | --- | --- | --- | | kafkaHost | String | kafka host | | options | Object | options |
createConsumer ⇒ PipeConsumer
Curried factory of PipeConsumer
Kind: global constant
createProducer ⇒ PipeProducer
Curried factory of PipeProducer
Kind: global constant
createSender ⇒ PipeSender
Curried factory of PipeProducer
Kind: global constant
createTransformer ⇒ PipeTransformer
Curried factory of PipeTransformer
Kind: global constant