npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@raphaelmanke/aws-cdk-pipes-rfc

v0.0.16

Published

![](https://img.shields.io/github/v/release/raphaelmanke/aws-cdk-pipes-rfc-473?style=for-the-badge) ![](https://img.shields.io/npm/v/@raphaelmanke/aws-cdk-pipes-rfc?style=for-the-badge) ![](https://img.shields.io/github/license/raphaelmanke/aws-cdk-pipes-

Downloads

248

Readme

AWS CDK RFC-473 - EventBridge Pipes

cdk-constructs: Experimental

Installation

NPM

npm install @raphaelmanke/aws-cdk-pipes-rfc --save-dev

Yarn

yarn add -D @raphaelmanke/aws-cdk-pipes-rfc

Readme

This is a RFC for a CDK construct library for AWS EventBridge Pipes.

CDK RFC Issue - 473

Amazon Eventbridge Pipes Construct Library

This library contains constructs for working with Amazon EventBridge Pipes.

EventBridge Pipes let you create source to target connections between several

aws services. While transporting messages from a source to a target the messages

can be filtered, transformed and enriched.

diagram of pipes

For more details see the service

documentation. Cloudformation docs

Pipe

AWS EventBridge Pipe has itself is a fully managed service that does the heavy lifting of polling a source, then be able to filter out payloads based on filter criteria. This reduces the target invocations and can reduce costs. After filtering events the resulting events can be enriched in the enrichment phase of a Pipe. The result of the enrichment is then pushed to the Target. Before passing a payload to the enrichment and Target the payload can be transformed using a input Transformation. To give the EventBridge Pipe access to the services that are connected in a pipe, each Pipe assumes a IAM Role. This role must have iam policies attached to read from a source, invoke a enrichment service and finally push to a target service.

So a Pipe has the following components:

besides these (core) components that are used while processing data, there are additional attributes that describe a Pipe

  • Name
    • This the (physical-) identifier for the AWS resource, the actual Pipe. It is used in the ARN of the provisioned resource.
  • Description
    • This is text field for humans to identify what the pipe does.
  • Tags
    • AWS tags for the resource
graph LR
classDef required fill:#00941b
classDef optional fill:#5185f5

Source:::required
Filter:::optional
Enrichment_Input_Transformation[Input transformation]:::optional
Enrichment:::optional
Target_Input_Transformation[Input transformation]:::optional
Target:::required

Source --> Filter --> Enrichment_Input_Transformation --> Enrichment --> Target_Input_Transformation --> Target

Example implementation

interface PipeProps {
  readonly source: PipeSource;
  readonly target: PipeTarget;

  readonly filter?: PipeFilter;
  readonly enrichment?: PipeEnrichment;
  readonly role?: IRole; // role is optional, if not provided a new role is created
  readonly description: string;
  readonly tags?: Tags;
}

interface Pipe {
  readonly role: IRole;
  readonly source: PipeSource;
  readonly target: PipeTarget;

  readonly filter?: PipeFilter;
  readonly enrichment?: PipeEnrichment;
  readonly description: string;
  readonly tags?: Tags;

  constructor(scope: Scope, id: string, props: PipeProps);
}

Open questions

  1. Should the input Transformation be part of the PipeProps (alternative: a property of the PipeEnrichment and PipeTarget props) ?
    1. Pro PipeProps:
      1. In the case of a Refactoring, for example replace the target the input transformation doesn't have to be touched/moved
    2. Con PipeProps:
      1. Input transformation can occur twice in a Pipe definition. The naming needs to make sure for which phase the the transformation is meant. E.g. EnrichmentInputTransformation and TargetInputTransformation
      2. Setting the EnrichmentInputTransformation without an PipeEnrichment makes no sense and needs additional validation code. This can be omitted if the inputTransformation is a property of the PipeEnrichment or PipeTarget classes.
  2. Should the PipeFilter be part of the PipeSource property definition instead of a attribute on the Pipeclass?
    1. Pro:
      1. The possible filter keys depend on the source
      2. cloudformation itself put the FilterCriteria into the PipeSourceParameters
    2. Con:
      1. To align with the AWS console it should be on the same level as the Source itself. User that have tested pipes in the console can easier understand the api.
      2. It would be more robust to future AWS changes because the Filter can always be defined based on the cloudformation generated type definitions and don't have to be explicitly build for a new source.

Source

A source is a AWS Service that needs to be polled. The following Sources are possible:

The CfnPipe resource reference the source only by their ARN. Right now there is no validation in der CDK framework that checks if a ARN is valid or not. To overcome this shortcoming a PipeSource class representing a source is needed. This PipeSource is then implemented by all the supported sources.

export abstract class PipeSource {
  public readonly sourceArn: string;

  public readonly sourceParameters?:
    | CfnPipe.PipeSourceParametersProperty
    | IResolvable;

  constructor(sourceArn: string, props?: CfnPipe.PipeSourceParametersProperty) {
    this.sourceArn = sourceArn;
    this.sourceParameters = props;
  }

  public abstract grantRead(grantee: IRole): void;
}

This PipeSource class has a sourceArn that is mapped to the CfnPipe sourceArn attribute. The sourceParameters are the config options for the source. Depending on the source theses attributes are present under a different key. E.g. for a SQS queue the configuration attributes are:

{
	sqsQueueParameters : {...}
}

The specific Source class implementation hides this detail for the user and provide a interface with only the possible configuration options that are possible for the specific source.

interface PipeSourceSqsQueueParametersProperty {
  readonly batchSize?: number;
  readonly maximumBatchingWindowInSeconds?: number;
}

This interface for example is provided by the cloudformation specification and can be used as a base for possible configurations (additional validation can be added if useful).

To be able to consume a source the EventBridge Pipe has a IAM-role. This role needs to have a policy to read from the source. The grantRead method need to be implemented for that purpose. E.g. the SQS can leverage its L2 .grantConsumeMessages() method.

Example implementation

An example api for a source that polls for a SQS-queue then can look like:

export class SqsSource extends PipeSource {
  private queue: IQueue;

  constructor(
    queue: IQueue,
    props?: CfnPipe.PipeSourceSqsQueueParametersProperty
  ) {
    super(queue.queueArn, { sqsQueueParameters: props });
    this.queue = queue;
  }

  public grantRead(grantee: IRole): void {
    this.queue.grantConsumeMessages(grantee);
  }
}

It takes an existing SQS-queue and polling properties that are possible for that kind of source and does implement a grantRead method which creates the required IAM policy for the Pipe role.

Role

A IAM role is required that can be assumed by the pipes.amazonaws.com principal. This role needs IAM policies attached to read from a PipeSource, invoke a PipeEnrichment and push to a PipeTarget. The user can bring its own role. If the user does not provide a role, a new role will be created. In both cases the role should be exposed by the Pipe class so it is transparent for user which role is used within the Pipe.

Open questions

  1. How can be assured the pipes service has access to encrypted sources and targets? The role or pipes principal needs access to KMS.
  2. Can we allow IRole or do we need to make a restriction to allow Role only?
    1. We have to make sure the generated policies are attached to the role in both cases. If restricted to Role this can easily done by using L2 construct methods of the role or the source, enrichment or target and pass the role along. If a IRole is provided the role policies cannot be extended.

Filter

A filter does pattern matching based on the incoming payload and the specified filter criteria's. The matching is done in the same way the EventBridge pattern are matched. The possible fields that can be used to filter incoming payloads are depending on the source.

Example Implementation

The implementation is split into two types.

  1. generic Filter
    1. this filter is the basic class for defining a filter. It represent 1:1 the cloudformation filter specification.
  2. Source specific filter
    1. this filter gives the user guidance on which attributes for this specific source a filter can be created. It then takes care of that the actual data-key e.g. data, body, dynamodb see docs.
interface IPipeFilterPattern {
	pattern: string;
}

class PipeGenericFilterPattern {
	static fromJson(patternObject: Record<string, any>) :IPipeFilterPattern {
		return { pattern: JSON.stringify(patternObject) };
	}
}

interface SqsMessageAttributes : {
	messageId?: string;
	receiptHandle?: string;
	body?: any;
	attributes?: {
		ApproximateReceiveCount?: string;
		SentTimestamp?: string;
		SequenceNumber?: string;
		MessageGroupId?: string;
		SenderId?: string;
		MessageDeduplicationId?: string;
		ApproximateFirstReceiveTimestamp?: string;
	};
	messageAttributes?: any;
	md5OfBody?: string;
}

class PipeSqsFilterPattern extends PipeGenericFilterPattern {
	static fromSqsMessageAttributes(attributes: SqsMessageAttributes) :IPipeFilterPattern {
		return {
			pattern: JSON.stringify( attributes ),
		};

	}
}

Target

A Target is the end of the Pipe. After the payload from the source is pulled, filtered and enriched it is forwarded to the target. For now the following targets are supported:

  • API destination
  • API Gateway
  • Batch job queue
  • CloudWatch log group
  • ECS task
  • Event bus in the same account and Region
  • Firehose delivery stream
  • Inspector assessment template
  • Kinesis stream
  • Lambda function (SYNC or ASYNC)
  • Redshift cluster data API queries
  • SageMaker Pipeline
  • SNS topic
  • SQS queue
  • Step Functions state machine
    • Express workflows (ASYNC)
    • Standard workflows (SYNC or ASYNC)

The CfnPipe resource reference the target only by their ARN. Right now there is no validation in der CDK framework that checks if a ARN is valid or not. To overcome this shortcoming a PipeTarget class representing a target is needed. This PipeTarget is then implemented by all the supported targets.

The implementation is then similar to the Source implementation:

Example implementation

interface IPipeTarget {
  targetArn: string;
  targetParameters: CfnPipe.PipeTargetParametersProperty;

  grantPush(grantee: IRole): void;
}

export interface SqsTargetProps {
  queue: IQueue;
  sqsQueueParameters?: CfnPipe.PipeTargetSqsQueueParametersProperty;
}

export class SqsTarget implements IPipeTarget {
  private queue: IQueue;
  targetArn: string;
  targetParameters: CfnPipe.PipeTargetParametersProperty;

  constructor(props: SqsTargetProps) {
    this.queue = props.queue;
    this.targetArn = props.queue.queueArn;
    this.targetParameters = { sqsQueueParameters: props.sqsQueueParameters };
  }

  public grantPush(grantee: IRole): void {
    this.queue.grantSendMessages(grantee);
  }
}

Enrichment

In the enrichment step the filtered payloads can be used to invoke one of the following services

  • API destination
  • Amazon API Gateway
  • Lambda function
  • Step Functions state machine
    • only express workflow

The invocation is a synchros call to the service. The result of the enrichment step then can be used to combine it with the filtered payload to target. The enrichment has two main properties for all types of supported services

  • enrichment ARN
  • input transformation

The enrichment ARN is the AWS resource ARN that should be invoked. The Role must have access to invoke this ARN. The input transformation is used to map values from the filter step output to the input to the enrichment step. For API destination and Api Gateway enrichments there can additional request parameter be set like header, query params. These properties can either be static or dynamic based on the payload from the previous step or extracted from the input transformation.

Example implementation

export abstract class PipeEnrichment {
  public readonly enrichmentArn: string;
  public enrichmentParameters: CfnPipe.PipeEnrichmentParametersProperty;

  constructor(
    enrichmentArn: string,
    props: CfnPipe.PipeEnrichmentParametersProperty
  ) {
    this.enrichmentParameters = props;
    this.enrichmentArn = enrichmentArn;
  }

  abstract grantInvoke(grantee: IRole): void;
}

export class LambdaEnrichment extends PipeEnrichment {
  private lambda: IFunction;

  constructor(
    lambda: IFunction,
    props: { inputTransformation?: PipeInputTransformation }
  ) {
    super(lambda.functionArn, {
      inputTemplate: props.inputTransformation?.inputTemplate,
    });
    this.lambda = lambda;
  }

  grantInvoke(grantee: IRole): void {
    this.lambda.grantInvoke(grantee);
  }
}

Input Transformation

Input transformations are used to transform or extend payloads to a desired structure. This transformation mechanism can be used prior to the enrichment or target step.

There are two types of mappings. Both types can be either static values or use values from the output of the previous step. Additionally there are a few values that come from the pipe itself (see reservedVariables enum).

  • string
    • static
    • dynamic
  • json
    • static
    • dynamic

Example implementation

enum reservedVariables {
  PIPES_ARN = "<aws.pipes.pipe-arn>",
  PIPES_NAME = "<aws.pipes.pipe-name>",
  PIPES_TARGET_ARN = "<aws.pipes.target-arn>",
  PIPE_EVENT_INGESTION_TIME = "<aws.pipes.event.ingestion-time>",
  PIPE_EVENT = "<aws.pipes.event>",
  PIPE_EVENT_JSON = "<aws.pipes.event.json>",
}

type StaticString = string;
type JsonPath = `<$.${string}>`;
type KeyValue = Record<string, string | reservedVariables>;
type StaticJsonFlat = Record<string, StaticString | JsonPath | KeyValue>;
type InputTransformJson = Record<
  string,
  StaticString | JsonPath | KeyValue | StaticJsonFlat
>;

type PipeInputTransformationValue = StaticString | InputTransformJson;

export interface IInputTransformationProps {
  inputTemplate: PipeInputTransformationValue;
}

export class PipeInputTransformation {
  static fromJson(inputTemplate: Record<string, any>): PipeInputTransformation {
    return new PipeInputTransformation({ inputTemplate });
  }

  readonly inputTemplate: string;

  constructor(props: IInputTransformationProps) {
    this.inputTemplate = JSON.stringify(props);
  }
}

Open Question

  1. The EventBridge L2 construct has a InputTransformation as well see cdk docs. Should this be reused/extended?
  2. Should there be specific InputTransformation helper that are specific to a source similar to the Source filter.

Example config

  • SQS -> Filter -> API Gateway -> SQS

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/PipeSqsToSqs",
      "CreationTime": "2023-01-04T10:57:58+01:00",
      "CurrentState": "RUNNING",
      "DesiredState": "RUNNING",
      "Enrichment": "arn:aws:execute-api:eu-central-1:XXXXXXXXXXX:yia5vn3gz0/prod/GET/pets",
      "EnrichmentParameters": {
        "HttpParameters": {
          "HeaderParameters": {
            "FoHEader": "Bar"
          },
          "QueryStringParameters": {
            "FooQuery": "$.detail.price"
          }
        }
      },
      "LastModifiedTime": "2023-01-20T09:02:00+01:00",
      "Name": "PipeSqsToSqs",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_PipeSqsToSqs_26d9f0aa",
      "Source": "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:PipeSource",
      "SourceParameters": {
        "FilterCriteria": {
          "Filters": [
            {
              "Pattern": "{\"awsRegion\":[{\"prefix\":\"eu\"}]}"
            }
          ]
        },
        "SqsQueueParameters": {
          "BatchSize": 1
        }
      },
      "StateReason": "USER_INITIATED",
      "Tags": {},
      "Target": "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:PipeTarget",
      "TargetParameters": {}
    }
  • SQS -> Filter -> API Destination -> SQS

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/PipeSqsToSqs",
      "CreationTime": "2023-01-04T10:57:58+01:00",
      "CurrentState": "UPDATING",
      "DesiredState": "RUNNING",
      "Enrichment": "arn:aws:events:eu-central-1:XXXXXXXXXXX:api-destination/Foo/fe7e2cbd-43da-435c-8e86-a2fa1a83f467",
      "EnrichmentParameters": {
        "HttpParameters": {
          "HeaderParameters": {
            "Dynamic": "$.detail.id",
            "FooBAr": "Static"
          },
          "QueryStringParameters": {
            "FooQueryDynamic": "$.detail.bar",
            "FooStaticQuery": "FooStaticQuery"
          }
        }
      },
      "LastModifiedTime": "2023-01-20T09:10:01+01:00",
      "Name": "PipeSqsToSqs",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_PipeSqsToSqs_26d9f0aa",
      "Source": "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:PipeSource",
      "SourceParameters": {
        "FilterCriteria": {
          "Filters": [
            {
              "Pattern": "{\"awsRegion\":[{\"prefix\":\"eu\"}]}"
            }
          ]
        },
        "SqsQueueParameters": {
          "BatchSize": 1
        }
      },
      "StateReason": "USER_INITIATED",
      "Tags": {},
      "Target": "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:PipeTarget",
      "TargetParameters": {}
    }

    Source Policy:

    {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "sqs:ReceiveMessage",
                  "sqs:DeleteMessage",
                  "sqs:GetQueueAttributes"
              ],
              "Resource": [
                  "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:ExampleApiQueue"
              ]
          }
      ]
    }
  • Kinesis -> Cloudwatch logs

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/KinesisPipe",
      "CreationTime": "2023-03-25T13:46:14+01:00",
      "CurrentState": "RUNNING",
      "DesiredState": "RUNNING",
      "EnrichmentParameters": {},
      "LastModifiedTime": "2023-03-25T13:46:21+01:00",
      "Name": "KinesisPipe",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_KinesisPipe_1b3885bc",
      "Source": "arn:aws:kinesis:eu-central-1:XXXXXXXXXXX:stream/KinesisPipe",
      "SourceParameters": {
          "KinesisStreamParameters": {
              "BatchSize": 1,
              "MaximumBatchingWindowInSeconds": 1,
              "OnPartialBatchItemFailure": "AUTOMATIC_BISECT",
              "ParallelizationFactor": 1,
              "StartingPosition": "LATEST"
          }
      },
      "StateReason": "No records processed",
      "Tags": {},
      "Target": "arn:aws:logs:eu-central-1:XXXXXXXXXXX:log-group:/aws/events/rabbitmq",
      "TargetParameters": {}
    }

    Source Policy:

    {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:DescribeStream",
                  "kinesis:DescribeStreamSummary",
                  "kinesis:GetRecords",
                  "kinesis:GetShardIterator",
                  "kinesis:ListStreams",
                  "kinesis:ListShards"
              ],
              "Resource": [
                  "arn:aws:kinesis:eu-central-1:XXXXXXXXXXX:stream/KinesisPipe"
              ]
          }
      ]
    }
  • Amazon MQ RabbitMQ -> Cloudwatch logs

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/RabbitMqPipe",
      "CreationTime": "2023-03-25T13:58:42+01:00",
      "CurrentState": "RUNNING",
      "DesiredState": "RUNNING",
      "EnrichmentParameters": {},
      "LastModifiedTime": "2023-03-25T13:59:31+01:00",
      "Name": "RabbitMqPipe",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_RabbitMqPipe_20c02f10",
      "Source": "arn:aws:mq:eu-central-1:XXXXXXXXXXX:broker:AcrivMqVpc:b-0303c682-33a0-4bbd-98ea-b0133df0c55f",
      "SourceParameters": {
          "RabbitMQBrokerParameters": {
              "BatchSize": 1,
              "Credentials": {
                  "BasicAuth": "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
              },
              "QueueName": "Pipes",
              "VirtualHost": "/"
          }
      },
      "StateReason": "USER_INITIATED",
      "Tags": {},
      "Target": "arn:aws:logs:eu-central-1:XXXXXXXXXXX:log-group:/aws/events/RabbitMq",
      "TargetParameters": {}
    }

    Source Policy:

    {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "mq:DescribeBroker"
              ],
              "Resource": [
                  "arn:aws:mq:eu-central-1:XXXXXXXXXXX:broker:AcrivMqVpc:b-0303c682-33a0-4bbd-98ea-b0133df0c55f"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "secretsmanager:GetSecretValue"
              ],
              "Resource": [
                  "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups",
                  "ec2:DescribeVpcs"
              ],
              "Resource": "*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DeleteNetworkInterface"
              ],
              "Resource": "*",
              "Condition": {
                  "StringEqualsIfExists": {
                      "ec2:SubnetID": [
                          "subnet-08a5ecbe78da46956"
                      ]
                  }
              }
          }
      ]
    }
  • Amazon MQ Active MQ -> Cloudwatch logs

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/ApacheMqPipe",
      "CreationTime": "2023-03-25T14:09:40+01:00",
      "CurrentState": "RUNNING",
      "DesiredState": "RUNNING",
      "EnrichmentParameters": {},
      "LastModifiedTime": "2023-03-25T14:10:44+01:00",
      "Name": "ApacheMqPipe",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_ApacheMqPipe_2481bec9",
      "Source": "arn:aws:mq:eu-central-1:XXXXXXXXXXX:broker:ActiveMq-Broker1:b-045db734-ca3e-4f03-acb5-8191beeb009d",
      "SourceParameters": {
          "ActiveMQBrokerParameters": {
              "BatchSize": 1,
              "Credentials": {
                  "BasicAuth": "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
              },
              "QueueName": "Pipes"
          }
      },
      "StateReason": "USER_INITIATED",
      "Tags": {},
      "Target": "arn:aws:logs:eu-central-1:XXXXXXXXXXX:log-group:/aws/events/ApacheMqPipe",
      "TargetParameters": {}
    }

    Source Policy:

    {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "mq:DescribeBroker"
              ],
              "Resource": [
                  "arn:aws:mq:eu-central-1:XXXXXXXXXXX:broker:ActiveMq-Broker1:b-045db734-ca3e-4f03-acb5-8191beeb009d"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "secretsmanager:GetSecretValue"
              ],
              "Resource": [
                  "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups",
                  "ec2:DescribeVpcs"
              ],
              "Resource": "*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DeleteNetworkInterface"
              ],
              "Resource": "*",
              "Condition": {
                  "StringEqualsIfExists": {
                      "ec2:SubnetID": [
                          "subnet-05c76962865a020d0"
                      ]
                  }
              }
          }
      ]
    }
  • MSK Cluster -> Cloudwatch logs

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/MSKPipe",
      "CreationTime": "2023-03-25T14:35:12+01:00",
      "CurrentState": "RUNNING",
      "DesiredState": "RUNNING",
      "EnrichmentParameters": {},
      "LastModifiedTime": "2023-03-25T14:37:59+01:00",
      "Name": "MSKPipe",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_MSKPipe_6f65c436",
      "Source": "arn:aws:kafka:eu-central-1:XXXXXXXXXXX:cluster/demo-cluster-1/5c830310-fea4-4adc-9a67-2da4bcf9130b-s2",
      "SourceParameters": {
          "ManagedStreamingKafkaParameters": {
              "BatchSize": 1,
              "ConsumerGroupID": "pipes-consumer-group-id",
              "StartingPosition": "TRIM_HORIZON",
              "TopicName": "pipes-topic"
          }
      },
      "StateReason": "USER_INITIATED",
      "Tags": {},
      "Target": "arn:aws:logs:eu-central-1:XXXXXXXXXXX:log-group:/aws/events/MskPipe",
      "TargetParameters": {}
    }

    Source Policy: Note:: The policy has no connect or read permissions on the cluster. I think this policy is not working.

    {
      "Version": "2012-10-17",
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kafka:DescribeCluster",
                  "kafka:DescribeClusterV2",
                  "kafka:GetBootstrapBrokers"
              ],
              "Resource": [
                  "arn:aws:kafka:eu-central-1:XXXXXXXXXXX:cluster/demo-cluster-1/5c830310-fea4-4adc-9a67-2da4bcf9130b-s2"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups",
                  "ec2:DescribeVpcs"
              ],
              "Resource": "*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DeleteNetworkInterface"
              ],
              "Resource": "*",
              "Condition": {
                  "StringEqualsIfExists": {
                      "ec2:SubnetID": [
                          "subnet-085642ba241b81254",
                          "subnet-08a5ecbe78da46956",
                          "subnet-05c76962865a020d0"
                      ]
                  }
              }
          }
      ]
    }
  • Self Managed Kafka Cluster -> Cloudwatch logs

    {
      "Arn": "arn:aws:pipes:eu-central-1:XXXXXXXXXXX:pipe/KafkaPipe",
      "CreationTime": "2023-03-25T15:44:18+01:00",
      "CurrentState": "CREATE_FAILED",
      "DesiredState": "RUNNING",
      "EnrichmentParameters": {},
      "LastModifiedTime": "2023-03-25T15:44:28+01:00",
      "Name": "KafkaPipe",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXX:role/service-role/Amazon_EventBridge_Pipe_KafkaPipe_0b2197cf",
      "Source": "smk://some.bootstrap.server:12345",
      "SourceParameters": {
          "SelfManagedKafkaParameters": {
              "BatchSize": 1,
              "Credentials": {
                  "ClientCertificateTlsAuth": "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
              },
              "ServerRootCaCertificate": "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk",
              "StartingPosition": "LATEST",
              "TopicName": "PipesTopic"
          }
      },
      "Tags": {},
      "Target": "arn:aws:logs:eu-central-1:XXXXXXXXXXX:log-group:/aws/events/KafkaPipe",
      "TargetParameters": {}
    }

    Source Policy: Note:: The policy has no connect or read permissions on the cluster. I think this policy is not working.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue"
                ],
                "Resource": [
                    "arn:aws:secretsmanager:eu-central-1:XXXXXXXXXXX:secret:rabbitmq-gdPMEk"
                ]
            }
        ]
    }

    Target Payloads

    • Dynamo stream --> EventBus (--> cloudwatch)
    {
      "version": "0",
      "id": "e200e70a-2e54-ebe4-f426-c9207fcab3b3",
      "detail-type": "Event from aws:dynamodb",
      "source": "Pipe DynamoEventbus",
      "account": "XXXXXXXXXXX",
      "time": "2023-03-29T19:51:10Z",
      "region": "eu-central-1",
      "resources": [],
      "detail": {
          "eventID": "f8fb797b24108ef3903d18847f099b56",
          "eventName": "INSERT",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "eu-central-1",
          "dynamodb": {
              "ApproximateCreationDateTime": 1680119470,
              "Keys": {
                  "id": {
                      "S": "3"
                  }
              },
              "NewImage": {
                  "id": {
                      "S": "3"
                  }
              },
              "SequenceNumber": "18538200000000012739337629",
              "SizeBytes": 6,
              "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:eu-central-1:XXXXXXXXXXX:table/E2EPipesDynamoDb-SourceTable70380C26-17A6A67APO6JG/stream/2023-03-25T21:18:12.753"
      }
    }
  • SQS --> EventBus (--> cloudwatch)

    {
      "version": "0",
      "id": "98c24e95-444a-73bd-d447-4ddfe514e0de",
      "detail-type": "Event from aws:sqs",
      "source": "Pipe SqsEventBridge",
      "account": "XXXXXXXXXXX",
      "time": "2023-03-29T19:41:01Z",
      "region": "eu-central-1",
      "resources": [],
      "detail": {
          "messageId": "e8d76dc5-4cf3-4e71-9ede-dfc3f9b70232",
          "receiptHandle": "AQEB6w1YWZKOeLkBu4bngtpwGeWzsbJFrk5bTQgcWXAQIhAMATuf9YZBfKvdxsE5CbnIK9q1wq+tiVc82CB1wE3VlLya/J3QJe56lnSB7eAu1fEkWRVz/EtEz4Ro126q3aD+HX94iJiJKL+JftW+r48b3MaUsXUCqXz7pjODVQlKc1mW8GxQ4BhYqCJODsBOsL+A6gR3QqPke+mYt0g+Q1AIhC2V/4IbYkiEq8FKhyyC9+/KqIj7Jfzc+TAldYjEQa1eTLiqqFWUSkW4tCu6g0VyqFjhHGnLw5tk8iypG9YYkj2YLODnjnuZvyYUnOmt0jjtKFPUJNfVe4Ms6/5P1se2clqga0JR0pHW0zVdrgHp+lW1w0bAa1WrFDOAbx3HG6hyherCAL22x1ZlBu1RVfbXog==",
          "body": "Hello",
          "attributes": {
              "ApproximateReceiveCount": "1",
              "SentTimestamp": "1680118861660",
              "SenderId": "AROAUHJD7O7N7OKFHRVWS:raphael.manke",
              "ApproximateFirstReceiveTimestamp": "1680118861665"
          },
          "messageAttributes": {},
          "md5OfBody": "8b1a9953c4611296a827abf8c47804d7",
          "eventSource": "aws:sqs",
          "eventSourceARN": "arn:aws:sqs:eu-central-1:XXXXXXXXXXX:PipeSource",
          "awsRegion": "eu-central-1"
      }
    }
  • Kinesis --> EventBus (--> Cloudwatch)

    {
      "version": "0",
      "id": "2d2603b7-8171-6d15-c41b-de1854e0c781",
      "detail-type": "Event from aws:kinesis",
      "source": "Pipe Pipe7793F8A1-MZ6Uu60BB42E",
      "account": "XXXXXXXXXXX",
      "time": "2023-03-29T20:55:47Z",
      "region": "eu-central-1",
      "resources": [],
      "detail": {
          "eventSource": "aws:kinesis",
          "eventVersion": "1.0",
          "eventID": "shardId-000000000000:49639344670955332939983043889605004987500509634079227906",
          "eventName": "aws:kinesis:record",
          "invokeIdentityArn": "arn:aws:iam::XXXXXXXXXXX:role/E2EPipesKinesisEventBus-PipeRole7D4AFC73-I0RSRHWMKUTC",
          "awsRegion": "eu-central-1",
          "eventSourceARN": "arn:aws:kinesis:eu-central-1:XXXXXXXXXXX:stream/E2EPipesKinesisEventBus-KinesisStream46752A3E-SSL4iQQD77p7",
          "kinesisSchemaVersion": "1.0",
          "partitionKey": "my-partition-key",
          "sequenceNumber": "49639344670955332939983043889605004987500509634079227906",
          "data": "bXktZXZlbnQtZGF0YQo=",
          "approximateArrivalTimestamp": 1680123347.074
      }
    }