etl_pod_nest
v1.0.1
Published
ETL pod节点
Downloads
3
Readme
一,ETL监控系统架构
二,etl pod 节点设计哲学
1,ETL的过程
2,关键逻辑称呼
- 数据输入:
input
- 数据输出:
output
- 数据流类型:
tag
一次数据流包括:数据流种类,输入 和 输出
3,ETL pod 接入方式推荐
- 给ETL的数据分类
比如flaw,panel, sheet, ....
- 找出数据流入, 和数据流出的部分
- 接入 输入 和 输入的监控信息
三,使用方法
1,下载包npm i etl_pod_nest
2,在app.module.ts中注册
import { EtlPodModule } from 'etl_pod_nest';
EtlPodModule.forRoot({
// ETL 节点名称【对应监控系统中的ETL节点名称】
etlPodName: 'pol_data_extractor',
// ETL 监控中心服务IP
masterPodIp: '127.0.0.1',
// ETL 监控中心服务ws 端口【默认4001】
masterPodPort: 4001,
// ETL 描述
desc: 'pol 数据抽取',
// 数据流列表
tags: {
// 数据流标识key
doff: {
// 源节点名称
sourcePod: 'kafka',
// 目标节点名称
targetPod: 'mysql',
// 数据流功能描述
desc: 'doff卷数据同步通道',
},
flaw: {
sourcePod: 'kafka',
targetPod: 'mysql',
desc: 'flaw缺陷数据 同步通道',
},
other: {
sourcePod: 'kafka',
targetPod: 'redis',
desc: '其他数据同步通道',
},
},
}),
3,在业务中 将ETL信息到 监控节点
(1)使用装饰器
使用装饰器,默认会拦截函数的执行时间
tag: 数据流类型
// 数据输入
@DataInput({ tag: 'other' })
async getData(data) {
// 数据流入,清洗
const data: any = ...;
return data.mock.slice(0, count);
}
// 数据输出
@DataOutput({ tag: 'other' })
async uploadFlawHasIndex(data) {
try {
// 数据上传
await 。。。
return data;
} catch (error) {
console.log(error);
}
}
上传自定义监控信息
函数 返回值实现指定接口
CustomMonitor
import { CustomMonitor } from 'src/etl-pod/etl.pod.class';
import { DataInput } from 'src/main';
// 数据输入
@DataInput({tag: 'flaw'})
function dataInput(): CustomMonitor {
return {
// 其他业务信息
a: 1,
// 自定义监控信息
monitorData: {
id: {
value: 1,
// 描述
desc: 'asdas',
// 单位
suffix: '%',
// 是否要形成折线图
charts: true,
},
},
};
}
(2)使用注入Sender
发送
import { Injectable } from '@nestjs/common';
import { DataInput, DataOutput, Sender, EtlSender } from 'etl_pod_nest';
@Injectable()
export class DemoService {
// 注入Sender
@Sender()
private etlSender: EtlSender;
constructor( ) {}
async getData2() {
let startTime = Date.now();
let count = Math.floor(Math.random() * 1000);
const data: any = ...;
const curData = data.mock.slice(0, count);
// 自定义 发送ETL信息到监控节点
this.etlSender.send({
tag: 'doff',
// 输入 | 输出 , input | output
type: 'input',
timeStart: startTime,
timeEnd: Date.now(),
// 自定义的监控信息
customMsg: {
curId: {
value: curData[curData.length - 1]?.uid,
desc: '当前doffId',
suffix: '',
charts: true,
},
curFactory: {
value: curData[curData.length - 1]?.factory,
desc: '当前工厂',
suffix: '',
},
counts: {
value: curData.length,
desc: '同步数量',
charts: true,
suffix: '个',
},
},
});
return data.mock;
}
}