wrapper-andela-pubsub
v1.0.33
Published
a new library built on the old pubsub implementation
Downloads
16
Readme
PubSub
This is the core andela library that manages all asyncronous communications via the google pubsub service, between microservices in the andela ecosystem.
The library provides the interfaces necessary to publish and subscribe to messages
Setup
Add the library via yarn using the command
$ yarn add wrapper-andela-pubsub@latest
This should include the latest version in your application.
To use the library add it to your application.
const { Pubsub } = require('wrapper-andela-pubsub')
const pubsubClient = new Pubsub()
Please note that the following values must exist in your environment for the client to be initialized.
ELASTICSEARCH_URL=
REDIS_URL=
POD_NAME=
Incase the app is running in kubernetes, the POD_NAME
will be set automatically
The following interfaces are available:
Publishing
This endpoint can be accessed by calling the emit
or publish
function, and it allows one to push messages to a message queue.
It is an async function and accepts two properties, payload which is an object that has the properties of data
and attributes
, and the topic to which the event should be published to.
If for example we want to publish an event named UserCreatedEvent
to the serviceA
topic, we will call the function as follows:
await pubsubClient.publish({
data: { Firstname: test1, Lastname: test2, Age: 30 },
attributes: { eventType: 'UserCreatedEvent' }
},
'serviceA')
or
await pubsubClient.emit({
data: { Firstname: test1, Lastname: test2, Age: 30 },
attributes: { eventType: 'UserCreatedEvent' }
},
'serviceA')
This will emit an event named UserCreatedEvent and will be received by any service that suscribes to that topic and event type. This can happen at any point in the runtime, so long as the pubsub client has been initialzed correctly at startup or during runtime.
Subscribing
This endpoint is exposed to allow one to receive messages from a specific topic, and to also specify how it will be handled. To receive messages from the queue, one has to specify the topic and subscription names, and the specific handlers for the different event types that they desire to track.
If a service, lets say serviceB
wants to receive messages from another service, lets say serviceA
, for a specific event type e.g. UserCreatedEvent
, it will subscribe as follows:
const handler = {
UserCreatedEvent: (payload) => {
// do something with payload
}
}
pubsubClient.subscribe(
[{ topicName: `serviceA', subscription: 'serviceB'}]
handler,
'serviceB'
true
)
The third parameter is the service name of the service that is subscribing to the event, and the fourth parameter is a boolean flag that the pubsub client registers to determine if it should publish schema for subscribed events. The service name resolves to an empty string by default, while the flag resolves to boolean by default.
The library validates published payloads based on rules defined in elastic search. When a payload is being published the following happens:
- The library checks is there is a schema definition for this service and event type in redis. If it doesn't find a schema definition in redis, it checks for the defition in elastoc search.
- If it finds a schema definition it, runs it against the rule engine to build the rules, and then validates the schema.
- If the schema is valid, the event is pushed to the messaging queue.
- If the event is invalid, it is rejected.
- If there is no schema definition, an error is logged, and then the event is published to the queue. This is to provide backward compatibility, and to allow for the teams to migrate gracefully.
Schema Definitions
The schema for all events running through the queue is stored in elastic search. This includes both published and subscribed events. To visualize the schema's for staging use https://kibana-staging.andela.com and for production use https://kibana-prod.andela.com.
To add a schema for a service, push the actual payload to https://api.andela.com for production environment and https://api-staging.andela.com for staging environment. To create for example for UserCreatedEvent
this is request to be made:
curl -X POST -H 'jwt-token: Bearer {insert token here}' -d '
{
"serviceName": "serviceA",
"emittedEventType": "UserCreatedEvent",
"timestamp": "2016-01-25 14:21:35.255+00",
"payload": "{\"firstName\":\"test\",\"middleName\":\"test2\",\"Age\":\"30\"}"
}
'
https://api.andela.com/api/v1/pubsub-registry
To view all the schema's you can use kibana
using the URL's provided above or make a get reques to the schemas endpoint e.g.
curl -X GET -H 'jwt-token: Bearer {insert token here}'
'https://api.andela.com/api/v1/pubsub-registry?serviceName?=xxx&eventType=xxx'