pubsubplus-opentelemetry-js-integration
v1.0.1
Published
Solace PubSub+ OpenTelemetry Integration for Solace JS API
Downloads
9
Maintainers
Readme
Solace PubSub+ OpenTelemetry Integration for Solace JS API
This module provides manual instrumentation for the solclientjs
(Solclientjs) API.
Compatible with OpenTelemetry JS API and SDK 1.0+
.
Installation
npm install --save pubsubplus-opentelemetry-js-integration
Supported Versions
- OpenTelemetry SDK:
>=0.5.5
- Node: 16+
Usage
Solace PubSub+ OpenTelemetry Integration for Solace JS API allows the user to manually collect trace data and export them to the backend of choice, to give observability to distributed systems when working with the Solace PubSub+ broker.
|
Manual instrumentation example: inject context with a setter on publish:
import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import {
context,
propagation,
trace,
} from '@opentelemetry/api';
import {
CompositePropagator,
W3CTraceContextPropagator,
W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
SemanticAttributes,
MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapSetter, Version } from 'pubsubplus-opentelemetry-js-integration';
// example of a manually instrumented publish function
const publishMessage = async function (
solaceSession: solace.Session,
queueName: string,
solaceMessage: solace.Message|undefined
) {
// check whether valid message present
if(!solaceMessage) {
console.log('No valid Solace message found!');
return; // skip instrumentation if no message
}
// set global propagator
// setup composite propagator
const compositePropagator = new CompositePropagator({
propagators: [
new W3CBaggagePropagator(),
new W3CTraceContextPropagator(),
],
});
propagation.setGlobalPropagator(compositePropagator);
// Create a provider for activating and tracking spans
const tracerProvider = new BasicTracerProvider();
// Register the tracer
tracerProvider.register();
// Get a tracer
const tracer = opentelemetry.trace.getTracer('solace-pubsub-publisher-test', '1.0.0');
// uses currently active context
let ctx = context.active();
// create a new publish span
const span = tracer.startSpan(queueName + ' send', { kind: opentelemetry.SpanKind.CLIENT }, ctx);
// Create a new context from the current context which has the span "active"
ctx = trace.setSpan(ctx, span);
if(span.isRecording()) {
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, 'SolacePubSub+');
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, queueName);
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC);
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
span.setAttribute('messaging.api.lang', 'nodejs');
span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'send');
}
// added baggage entries just for api demo purposes
const baggage = propagation.createBaggage({
"baggageEntry1": {
value: 'test1',
metadata: undefined,
},
"baggageEntry2": {
value: 'test2',
metadata: undefined,
},
});
ctx = propagation.setBaggage(ctx, baggage);
// new instance of the SolaceW3CTextMapSetter to be used for context/baggage injection
const setter = new SolaceW3CTextMapSetter();
// inject context using w3c context propagator and SolaceW3CTextMapSetter
propagation.inject(ctx, solaceMessage, setter);
try {
console.log('Sending message to queue "' + queueName + '"...');
solaceMessage.setDestination(solace.SolclientFactory.createDurableQueueDestination(queueName));
solaceMessage.setDeliveryMode(solace.MessageDeliveryModeType.PERSISTENT);
// dump message before sending to broker here
// console.log('SOLACE MESSAGE DUMP: ', solaceMessage.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');
// use publisher session to publish message to Queue on Solace broker
solaceSession.send(solaceMessage);
// Message published.
span.setStatus({
code: opentelemetry.SpanStatusCode.OK,
message: 'Message Sent'
});
} catch (error: any) {
// log publisher error
span.recordException(error);
span.setStatus({
code: opentelemetry.SpanStatusCode.ERROR,
message: String(error)
});
}
span.end();
}
// publish instrumented message to the broker
(async function instrument() {
// Initialize factory with the most recent API defaults
const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
solace.SolclientFactory.init(factoryProps);
// enable logging to JavaScript console at TRACE level
// NOTICE: works only with "solclientjs-debug.js"
// also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);
console.log(
'\n'
+ '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
+ `\nVersion: ${Version.version}`
+ `\nDate: ${Version.date}`
+ `\nTarget: ${JSON.stringify(Version.target)}`
+ `\nFormattedDate: ${Version.formattedDate}`
+ `\nSummary: ${Version.summary}`
+ `\ntoString: ${Version.toString()}`
+ '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
+ '\n\n'
);
const pubSession: solace.Session|null = await createPubSession();
// wait for about 4 sec for session to come up
setTimeout(async () => {
console.log('Session 4 seconds TimeOut ended, sending message now...');
// if created session
if(pubSession) {
const message = await createMessage(); // create new Solace.Message instance here
await publishMessage(pubSession, QUEUE_NAME_ON_BROKER, message); // publish with OTEL support
// wait for about 5 sec, then close, destroy and clean up session resources
setTimeout(async () => {
await disconnectSession(pubSession); // disconnect session after 5 seconds
}, 5000);
}
}, 4000);
})();
Manual instrumentation example: Fetch context with getter for message processing on subscribe:
import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import { propagation } from '@opentelemetry/api';
import {
CompositePropagator,
W3CTraceContextPropagator,
W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
SemanticAttributes,
MessagingOperationValues,
MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapGetter, Version } from 'pubsubplus-opentelemetry-js-integration';
// example of a manually instrumented consume function
const consumeMessages = async function (
solaceSession: solace.Session,
queueName: string
) {
// check whether valid session
if(!solaceSession) {
// failed to start the queue consumer because not connected to Solace message router.
console.log('Failed to start the queue consumer because not connected to Solace router');
return; // skip instrumentation if no valid session
}
try {
// Create a message consumer
const messageConsumer = solaceSession.createMessageConsumer({
queueDescriptor: { name: queueName, type: solace.QueueType.QUEUE },
acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // Enabling Client ack
});
// Define message consumer event listeners:
messageConsumer.on(solace.MessageConsumerEventName.UP, function () {
// consumer is up
console.log('consumer is up');
});
messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function () {
// connection failed, consumer is not connected
console.log('connection failed, consumer is not connected');
});
messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () {
// consumer is now down
console.log('consumer is now down');
});
messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function () {
// consumer is down due to error situation
console.log('consumer is down due to error situation');
});
// register callback based message listener
messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message: solace.Message) {
// dump received message from broker here
// console.log('RECEIVED SOLACE MESSAGE DUMP: ', message.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');
// set as root context
opentelemetry.ROOT_CONTEXT
// Get a tracer
const tracer = opentelemetry.trace.getTracer('example-basic-tracer-node', '1.0.0');
// set global propagator
// setup composite propagator
const compositePropagator = new CompositePropagator({
propagators: [
new W3CBaggagePropagator(),
new W3CTraceContextPropagator(),
],
});
propagation.setGlobalPropagator(compositePropagator);
// Use to extract the span context and baggage from the Solace message
const getter = new SolaceW3CTextMapGetter();
// extract context using w3c context propagator and Solace getter
const parentContext = propagation.extract(opentelemetry.ROOT_CONTEXT, message, getter);
const baggage = propagation.getBaggage(parentContext);
// dump received baggage extracted from Solace message
// console.log('RECEIVED PROPAGATION BAGGAGE: ', baggage);
// Create New 'process' Span which could have a linked remote parent
let span = tracer.startSpan(
MessagingOperationValues.PROCESS,
{
kind: opentelemetry.SpanKind.CLIENT
},
parentContext || undefined
);
const messageDestination = message.getDestination();
if(messageDestination) {
span.setAttribute(
SemanticAttributes.MESSAGING_DESTINATION,
(messageDestination as solace.Destination).getName()
);
}
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM,'SolacePubSub+');
span.setAttribute(
SemanticAttributes.MESSAGING_DESTINATION_KIND,
MessagingDestinationKindValues.QUEUE
);
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'process');
try {
// processMessage(message); // - this is a made up client method for message processing, it can throw an error
// doMoreWork(message); // - this is another made up client method
span.setStatus({
code: opentelemetry.SpanStatusCode.OK,
message: 'Message processed'
});
// Need to explicitly ack otherwise it will not be deleted from the message router
message.acknowledge();
} catch (error: any) {
// log processing error
span.recordException(error);
span.setStatus({
code: opentelemetry.SpanStatusCode.ERROR,
message: String(error)
});
}
span.end();
});
// Connect the message consumer
messageConsumer.connect();
} catch (error: any) {
// failed to connect
console.log('Failed to connect: ', error);
}
};
// consume instrumented message(s) from the broker
(async function instrument() {
// Initialize factory with the most recent API defaults
const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
solace.SolclientFactory.init(factoryProps);
// enable logging to JavaScript console at TRACE level
// NOTICE: works only with "solclientjs-debug.js"
// also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);
console.log(
'\n'
+ '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
+ `\nVersion: ${Version.version}`
+ `\nDate: ${Version.date}`
+ `\nTarget: ${JSON.stringify(Version.target)}`
+ `\nFormattedDate: ${Version.formattedDate}`
+ `\nSummary: ${Version.summary}`
+ `\ntoString: ${Version.toString()}`
+ '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
+ '\n\n'
);
const consumerSession: solace.Session|null = await createConsumerSession();
// if created session
if(consumerSession) {
await consumeMessages(consumerSession, QUEUE_NAME_ON_BROKER);
// wait for about 10 sec, then close, destroy and clean up session resources
setTimeout(async () => {
await disconnectSession(consumerSession); // disconnect session after 10 seconds
}, 10000);
}
})();
|
Useful links
- For more information on OpenTelemetry, visit: https://opentelemetry.io/
- For more about OpenTelemetry JavaScript: https://github.com/open-telemetry/opentelemetry-js
- For help or feedback on this project, join us in Solace Community Discussions
License
Apache 2.0 - See LICENSE for more information.