wy-mongo-es
v3.4.12
Published
A MongoDB to Elasticsearch connector
Downloads
5
Maintainers
Readme
Mongo-ES
A MongoDB to Elasticsearch connector
从mongo-es fork 过来的项目,由于默认mongdb中的表映射进入es需要每个字段都写对应关系, 我做了一个默认映射全部字段的修改, 在task 定义中加入了一个参数 transAll,映射全部字段,默认为false,
不仅如此,还改动了 scan这个任务的具体拉取数据方式,原本是 new ObjectID(id) 我给直接改成 string了
Installation
npm i -g mongo-es
Usage
Command Line
# normal mode
mongo-es ./config.json
# debug mode, with debug info printed
NODE_ENV=dev mongo-es ./config.json
Programmatically
const fs = require('fs')
const Redis = require('ioredis')
const { Config, Task, run } = require('mongo-es')
const redis = new Redis('localhost')
Task.onSaveCheckpoint((name, checkpoint) => {
return redis.set(`mongo-es:${name}`, JSON.stringify(checkpoint))
})
// this will overwrite task.from in config file
Task.onLoadCheckpoint((name) => {
return redis.get(`mongo-es:${name}`).then(JSON.parse)
})
run(new Config(fs.readFileSync('config.json', 'utf8')))
Concepts
Scan phase
scan entire database for existed documents
Tail phase
tail the oplog for documents' create, update or delete
Configuration
Structure:
{
"controls": {},
"mongodb": {},
"elasticsearch": {},
"tasks": [
{
"extract": {},
"transform": {},
"load": {}
}
]
}
controls
mongodbReadCapacity
- Max docs read per second (default:10000
). (optional)elasticsearchBulkInterval
- Max bluk interval per request (default:5000
). (optional)elasticsearchBulkSize
- Max bluk size per request (default:5000
). (optional)indexNameSuffix
- Index name suffix, for index version control. (optional)
mongodb
url
- The connection URI string, eg:mongodb://user:password@localhost:27017/db?replicaSet=rs0
. notice: must use aadmin
user to access oplog.options
- Connection settings, see: MongoClient. (optional)
elasticsearch
options
- Elasticsearch Config Options, see: Configuration.indices
- If set, auto create indices when program start, see: Indeces Create. (optional)
task.from
phase
-scan
ortail
time
- tail oplog with query:{ ts: { $gte: new Timestamp(0, new Date(time).getTime() / 1000) } }
id
- scan collection with query{ _id: { $gte: id }}
改动了一下,id直接使用传入的string 而不是 ObjectID(id)
task.extract
db
- Database name.collection
- Collection name in database.projection
- Projection selector, see Projection.
task.transform
mapping
- The field mapping from mongodb's collection to elasticsearch's index.parent
- The field in mongodb's collection to use as the_parent
in elasticsearch's index. (optional)transAll
- If set, all documents will be indexed
task.load
index
- The name of the index.type
- The name of the document type.body
- The request body, see Put Mapping.