@x12/rx-pool
v0.0.1
Published
Rx Generic Pool
Downloads
4
Readme
@x12/rx-pool
export declare class RxPool<T> {
readonly pool: GenericPool; // Pool<T>
constructor(factory: PoolFactory, poolOpts?: PoolOption);
// Observable 改造方法
// 从连接池中获取一个 T
acquire(priority?: number): Observable<T>;
use<U>(cb: (resource: T) => U | PromiseLike<U> | Observable<U>): Observable<U>;
// 以下返回为操作符 (Operator)
// 新增方法,用于使用 T 进行查询
run(cb: (resorce: T) => any): OperatorFunction<T, T>;
// 释放,给进程继续使用
release(): OperatorFunction<T, T>;
// 摧毁,将会重新创建连接
destroy(): OperatorFunction<T, T>;
// 以下为 Generic Pool 原有方法
release(resource: T): PromiseLike<void>;
destroy(resource: T): PromiseLike<void>;
start(): void;
drain(): PromiseLike<void>;
clear(): PromiseLike<void>;
isBorrowedResource(resource: T): boolean;
}
Demo:
import { RxPool, PoolFactory, PoolOption } from '@x12/rx-pool';
import { MongoClient } from 'mongodb';
const poolFactory: PoolFactory = {
create() {
const mongoClient = new MongoClient('url', {
/* mongodb options */
});
return mongoClient.connect();
},
destroy(client) {
return client.close();
},
validate(client) {
const db = client.db('test').admin();
return db
.ping()
.then(() => true)
.catch(() => false);
}
};
const poolOpts: PoolOption = {};
const pool = new RxPool<MongoClient>(poolFactory, poolOpts);
const s$ = pool.acquire().pipe(
pool.run(async (client) => {
const db = client.db('test').admin();
console.log(await db.ping());
}),
pool.release()
);
s$.subscribe({
complete() {
console.log('complete');
}
});