@nypl/nypl-streams-client
v2.0.0
Published
Internal NYPL client for reading/writing avro encoded streams
Downloads
90
Maintainers
Keywords
Readme
NYPL Streams Client
This is a helper module for reading and writing to NYPL streams with/without Avro encoding.
Installation
Install it via npm for use inside your project:
npm i @nypl/nypl-streams-client --save
Usage
const NyplStreamsClient = require('@nypl/nypl-Streams-client')
const streamsClient = new NyplStreamsClient({ nyplDataApiClientBase: 'https://example.com/api/v0.1/' })
See docs/usage.md for complete documentation of Client methods and use.
Example 1: Writing data to a stream
To write a single record to a stream (encoded to "MyStream" schema):
try {
const response = await streamsClient.write('MyStream', { id: 'id1', field1: 1, field2: 2 })
console.log('Finished writing to stream ' + response.Records.length)
} catch (e) {
console.error('Error writing to stream: ', e)
}
To write multiple records to a stream, batched and rate-limited to avoid write errors:
const records = [ { id: 'id1', field1: 1, field2: 2 }, { id: 'id2', field1: 1 }, ... ] // Array of any length
const options = {
recordsPerSecond: 500 // This is the default and well below the 1000/s AWS constraint
}
try {
const response = await streamsClient.write('MyStream', records, options)
console.log('Finished writing to stream ' + resp.Records.length)
console.log(`Failed to write: ${resp.FailedRecordCount} record(s)`)
} catch (e) {
console.error('Error writing to stream: ', e)
}
Above will resolve after records.length / 500
seconds. The resolved value is a hash merged from the hashes returned from each putRecords call.
Example 2: Decoding data obtained from a stream
The streams client can be used for decoding data obtained directly from a stream (i.e. via a Lambda Kinesis source).
Example lambda handler with a kinesis trigger:
exports.handler = async (event, context, callback) => {
// Initialize streams client:
const streamsClient = new NyplStreamsClient({ nyplDataApiClientBase: 'http://example.com/api/v0.1/' })
const record = event.Records[0]
if (record.kinesis) {
const encoded = event.Records.map(record => record.kinesis.data)
try {
const decoded = await streamsClient.decodeData('SchemaName', encoded)
} catch (e) => {
console.error('Error decoding event: ', e)
}
}
}
CLI
The library includes a CLI for writing arbitary events to streams. Care should be taken to construct events that confirm to the relevant schema.
For example, to write a SierraBibRetrievalRequest
encoded event to the SierraBibRetriever-qa
stream:
./cli/nypl-streams.js --envfile config/qa.env --profile nypl-digital-dev write SierraBibRetriever-qa --schemaName SierraBibRetrievalRequest '{ "id": "21747246" }'
Git workflow
- Cut feature branch from main.
- Create PR to merge feature branch into main
- After PR approved by multiple co-workers, the author merges the PR.
Publishing to NPMJS
Once the PR has been approved and merged, check out the target branch locally and:
- Bump the version:
- Bump the version number in
package.json
- Run
nvm use; npm i
to updatepackage-lock.json
- Commit changes
- Git tag it (e.g.
git tag -a v2.1.1
) - Push changes to origin (including tags via
git push --tags
)
- Publish changes to NPMJS:
- Run npm publish --dry-run to verify nothing is being packaged that should not be!
npm publish
Testing
npm test