npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

pubsubplus-opentelemetry-js-integration

v1.0.1

Published

Solace PubSub+ OpenTelemetry Integration for Solace JS API

Downloads

9

Readme

Solace PubSub+ OpenTelemetry Integration for Solace JS API

NPM Published Version Apache License

This module provides manual instrumentation for the solclientjs (Solclientjs) API.

Compatible with OpenTelemetry JS API and SDK 1.0+.

Installation

npm install --save pubsubplus-opentelemetry-js-integration

Supported Versions

  • OpenTelemetry SDK: >=0.5.5
  • Node: 16+

Usage

Solace PubSub+ OpenTelemetry Integration for Solace JS API allows the user to manually collect trace data and export them to the backend of choice, to give observability to distributed systems when working with the Solace PubSub+ broker.

                                       |

Manual instrumentation example: inject context with a setter on publish:

import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import {
    context,
    propagation,
    trace,
} from '@opentelemetry/api';
import {
    CompositePropagator,
    W3CTraceContextPropagator,
    W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
    SemanticAttributes,
    MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapSetter, Version } from 'pubsubplus-opentelemetry-js-integration';


// example of a manually instrumented publish function
const publishMessage = async function (
    solaceSession: solace.Session,
    queueName: string,
    solaceMessage: solace.Message|undefined
) {
    // check whether valid message present
    if(!solaceMessage) {
        console.log('No valid Solace message found!');
        return; // skip instrumentation if no message
    }

    // set global propagator
    // setup composite propagator
    const compositePropagator = new CompositePropagator({
        propagators: [
            new W3CBaggagePropagator(),
            new W3CTraceContextPropagator(),
        ],
    });
    propagation.setGlobalPropagator(compositePropagator);

    // Create a provider for activating and tracking spans
    const tracerProvider = new BasicTracerProvider();

    // Register the tracer
    tracerProvider.register();

    // Get a tracer
    const tracer = opentelemetry.trace.getTracer('solace-pubsub-publisher-test', '1.0.0');

    // uses currently active context
    let ctx = context.active();

    // create a new publish span
    const span = tracer.startSpan(queueName + ' send', { kind: opentelemetry.SpanKind.CLIENT }, ctx);

    // Create a new context from the current context which has the span "active"
    ctx = trace.setSpan(ctx, span);

    if(span.isRecording()) {
        span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, 'SolacePubSub+');
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, queueName);
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC);
        span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
        span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
        span.setAttribute('messaging.api.lang', 'nodejs');
        span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'send');
    }

    // added baggage entries just for api demo purposes
    const baggage = propagation.createBaggage({
        "baggageEntry1": {
          value: 'test1',
          metadata: undefined,
        },
        "baggageEntry2": {
          value: 'test2',
          metadata: undefined,
      },
    });
    ctx = propagation.setBaggage(ctx, baggage);
 
    // new instance of the SolaceW3CTextMapSetter to be used for context/baggage injection
    const setter = new SolaceW3CTextMapSetter();

    // inject context using w3c context propagator and SolaceW3CTextMapSetter
    propagation.inject(ctx, solaceMessage, setter);

    try {
        console.log('Sending message to queue "' + queueName + '"...');
        solaceMessage.setDestination(solace.SolclientFactory.createDurableQueueDestination(queueName));
        solaceMessage.setDeliveryMode(solace.MessageDeliveryModeType.PERSISTENT);

        // dump message before sending to broker here
        // console.log('SOLACE MESSAGE DUMP: ', solaceMessage.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');

        // use publisher session to publish message to Queue on Solace broker
        solaceSession.send(solaceMessage);

        // Message published.
        span.setStatus({
            code: opentelemetry.SpanStatusCode.OK,
            message: 'Message Sent'
        });
 
    } catch (error: any) {
        // log publisher error
        span.recordException(error);
        span.setStatus({
            code: opentelemetry.SpanStatusCode.ERROR,
            message: String(error)
        });
    }
 
    span.end();
}

// publish instrumented message to the broker
(async function instrument() {
    // Initialize factory with the most recent API defaults
    const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
    factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
    solace.SolclientFactory.init(factoryProps);

    // enable logging to JavaScript console at TRACE level
    // NOTICE: works only with "solclientjs-debug.js"
    // also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
    solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);

    console.log(
        '\n'
        + '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
        + `\nVersion: ${Version.version}`
        + `\nDate: ${Version.date}`
        + `\nTarget: ${JSON.stringify(Version.target)}`
        + `\nFormattedDate: ${Version.formattedDate}`
        + `\nSummary: ${Version.summary}`
        + `\ntoString: ${Version.toString()}`
        + '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
        + '\n\n'
    );

    const pubSession: solace.Session|null = await createPubSession();

    // wait for about 4 sec for session to come up
    setTimeout(async () => {
        console.log('Session 4 seconds TimeOut ended, sending message now...');
        // if created session
        if(pubSession) {
            const message = await createMessage(); // create new Solace.Message instance here
            await publishMessage(pubSession, QUEUE_NAME_ON_BROKER, message); // publish with OTEL support
            // wait for about 5 sec, then close, destroy and clean up session resources
            setTimeout(async () => {
                await disconnectSession(pubSession); // disconnect session after 5 seconds 
            }, 5000);
        }
    }, 4000);

})();

Manual instrumentation example: Fetch context with getter for message processing on subscribe:

import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import { propagation } from '@opentelemetry/api';
import {
    CompositePropagator,
    W3CTraceContextPropagator,
    W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
    SemanticAttributes,
    MessagingOperationValues,
    MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapGetter, Version } from 'pubsubplus-opentelemetry-js-integration';


// example of a manually instrumented consume function
const consumeMessages = async function (
    solaceSession: solace.Session,
    queueName: string
) {
    // check whether valid session
    if(!solaceSession) {
        // failed to start the queue consumer because not connected to Solace message router.
        console.log('Failed to start the queue consumer because not connected to Solace router');
        return; // skip instrumentation if no valid session
    }

    try {
        // Create a message consumer
        const messageConsumer = solaceSession.createMessageConsumer({
            queueDescriptor: { name: queueName, type: solace.QueueType.QUEUE },
            acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // Enabling Client ack
        });
        // Define message consumer event listeners:
        messageConsumer.on(solace.MessageConsumerEventName.UP, function () {
            // consumer is up
            console.log('consumer is up');
        });
        messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function () {
            // connection failed, consumer is not connected
            console.log('connection failed, consumer is not connected');
        });
        messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () {
            // consumer is now down
            console.log('consumer is now down');
        });
        messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function () {
            // consumer is down due to error situation
            console.log('consumer is down due to error situation');
        });
        // register callback based message listener
        messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message: solace.Message) {
            // dump received message from broker here
            // console.log('RECEIVED SOLACE MESSAGE DUMP: ', message.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');

            // set as root context
            opentelemetry.ROOT_CONTEXT

            // Get a tracer
            const tracer = opentelemetry.trace.getTracer('example-basic-tracer-node', '1.0.0');

            // set global propagator
            // setup composite propagator
            const compositePropagator = new CompositePropagator({
                propagators: [
                    new W3CBaggagePropagator(),
                    new W3CTraceContextPropagator(),
                ],
            });
            propagation.setGlobalPropagator(compositePropagator);

            // Use to extract the span context and baggage from the Solace message
            const getter = new SolaceW3CTextMapGetter();

            // extract context using w3c context propagator and Solace getter
            const parentContext = propagation.extract(opentelemetry.ROOT_CONTEXT, message, getter);

            const baggage = propagation.getBaggage(parentContext);
            // dump received baggage extracted from Solace message
            // console.log('RECEIVED PROPAGATION BAGGAGE: ', baggage);

            // Create New 'process' Span which could have a linked remote parent
            let span = tracer.startSpan(
                MessagingOperationValues.PROCESS,
                {
                    kind: opentelemetry.SpanKind.CLIENT
                },
                parentContext || undefined
            );  

            const messageDestination = message.getDestination();
            if(messageDestination) {
                span.setAttribute(
                    SemanticAttributes.MESSAGING_DESTINATION,
                    (messageDestination as solace.Destination).getName()
                );
            }
            span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM,'SolacePubSub+');
            span.setAttribute(
                SemanticAttributes.MESSAGING_DESTINATION_KIND,
                MessagingDestinationKindValues.QUEUE
            );
            span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
            span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
            span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'process');

            try {
                // processMessage(message); // - this is a made up client method for message processing, it can throw an error

                // doMoreWork(message); // - this is another made up client method

                span.setStatus({
                    code: opentelemetry.SpanStatusCode.OK,
                    message: 'Message processed'
                });

                // Need to explicitly ack otherwise it will not be deleted from the message router
                message.acknowledge();

            }  catch (error: any) {
                // log processing error
                span.recordException(error);
                span.setStatus({
                    code: opentelemetry.SpanStatusCode.ERROR,
                    message: String(error)
                });
            }

            span.end();

        });

        // Connect the message consumer
        messageConsumer.connect();
    } catch (error: any) {
        // failed to connect
        console.log('Failed to connect: ', error);
    }
};

// consume instrumented message(s) from the broker
(async function instrument() {
    // Initialize factory with the most recent API defaults
    const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
    factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
    solace.SolclientFactory.init(factoryProps);

    // enable logging to JavaScript console at TRACE level
    // NOTICE: works only with "solclientjs-debug.js"
    // also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
    solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);

    console.log(
        '\n'
        + '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
        + `\nVersion: ${Version.version}`
        + `\nDate: ${Version.date}`
        + `\nTarget: ${JSON.stringify(Version.target)}`
        + `\nFormattedDate: ${Version.formattedDate}`
        + `\nSummary: ${Version.summary}`
        + `\ntoString: ${Version.toString()}`
        + '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
        + '\n\n'
    );

    const consumerSession: solace.Session|null = await createConsumerSession();
    // if created session
    if(consumerSession) {
        await consumeMessages(consumerSession, QUEUE_NAME_ON_BROKER);
        // wait for about 10 sec, then close, destroy and clean up session resources
        setTimeout(async () => {
            await disconnectSession(consumerSession); // disconnect session after 10 seconds
        }, 10000);
    }

})();
                                       |

Useful links

License

Apache 2.0 - See LICENSE for more information.