mongo-bigquery-sync
v1.0.2
Published
Basic setup to sync mongo data to GBQ
Downloads
4
Readme
MongoDB to BigQuery Streamer
This package allows you to stream data from a MongoDB collection to a Google BigQuery table in batches. It is designed to handle large datasets by processing them in chunks, transforming the data as needed before inserting it into BigQuery, and providing progress logs during the operation.
Features
- Batch processing: Efficiently processes large datasets by streaming in batches of configurable size.
- Data transformation: Supports custom transformation functions to modify documents before insertion.
- Filtered queries: Allows MongoDB query filters to only sync specific records.
- Sorting: Supports sorting based on MongoDB fields.
- Progress tracking: Logs progress and returns the last inserted record for monitoring or resumption.
Requirements
Before using this package, ensure you have:
- A MongoDB instance with a collection of documents.
- A Google BigQuery project, dataset, and table where the data will be inserted.
- A Google Cloud service account with permission to write data to BigQuery.
Installation
To install the package, use npm or yarn:
npm install mongodb-to-bigquery-streamer
or
yarn add mongodb-to-bigquery-streamer
Usage
Here's an example of how to use the package to stream data from MongoDB to BigQuery:
const { streamDataInBatchesFromMongoDB } = require('mongodb-to-bigquery-streamer');
const options = {
mongodbUri: 'mongodb://localhost:27017', // MongoDB URI
mongoDbName: 'my_database', // MongoDB Database Name
mongoCollectionName: 'my_collection', // MongoDB Collection Name
bigQueryDataset: 'my_dataset', // BigQuery Dataset Name
bigQueryTable: 'my_table', // BigQuery Table Name
transformFn: (doc) => doc, // Optional transformation function (default: identity function)
chunkSize: 100, // Optional batch size (default: 10)
mongoQueryParams: { active: true }, // Optional query parameters for MongoDB (default: {})
sortField: '_id', // Optional sorting field for MongoDB (default: '_id')
gbqProjectId: 'my-gbq-project', // Google BigQuery Project ID
gbqKeyFile: './path/to/keyfile.json' // Path to BigQuery service account key
};
streamDataInBatchesFromMongoDB(options)
.then((result) => {
if (result.success) {
console.log('Data successfully streamed to BigQuery.');
console.log('Last Inserted Record:', result.lastInsertedRecord);
} else {
console.error('Data streaming failed.');
}
})
.catch((error) => {
console.error('Error:', error);
});
Parameters
| Parameter | Type | Description | Default Value |
|-----------------------|----------|---------------------------------------------------------------------------------------------------|-------------------------|
| mongodbUri
| String
| MongoDB connection URI. | |
| mongoDbName
| String
| Name of the MongoDB database. | |
| mongoCollectionName
| String
| Name of the MongoDB collection. | |
| bigQueryDataset
| String
| BigQuery dataset name. | |
| bigQueryTable
| String
| BigQuery table name. | |
| transformFn
| Function
| (Optional) Function to transform documents before inserting into BigQuery. | (doc) => doc
|
| chunkSize
| Number
| (Optional) The number of documents to process in each batch. | 10
|
| mongoQueryParams
| Object
| (Optional) Query parameters to filter documents from MongoDB. | {}
|
| sortField
| String
| (Optional) Field to sort the documents by (for paging through the data). | _id
|
| gbqProjectId
| String
| Google BigQuery project ID. | |
| gbqKeyFile
| String
| Path to the Google BigQuery service account key file. | |
Example Transformation Function
You can define a custom transformation function to modify the MongoDB documents before they are inserted into BigQuery. This is useful for changing field names, formatting, or adding new fields. For example:
const transformFn = (doc) => {
return {
id: doc._id.toString(),
name: doc.name.toUpperCase(),
createdAt: doc.created_at,
};
};
In this example, the function:
- Converts the MongoDB
_id
to a string. - Uppercases the
name
field. - Retains the
created_at
field unchanged.
Error Handling
The streamDataInBatchesFromMongoDB
function returns an object indicating the result of the sync process:
{
"success": true,
"lastInsertedRecord": { ... }
}
If the streaming process succeeds, success
will be true
, and lastInsertedRecord
will contain the last document that was inserted into BigQuery.
If an error occurs, success
will be false
, and you can inspect the logs to understand what went wrong.
Handling Large Data Sets
To efficiently stream large datasets, the process is divided into batches. You can adjust the chunkSize
parameter to control how many documents are processed in each batch. The default batch size is 10
, but you can set it to a higher number for faster processing if needed.
chunkSize: 500, // Process 500 documents in each batch
BigQuery Configuration
Before using the package, make sure your Google Cloud service account has sufficient permissions to write to the specified BigQuery dataset and table.
- Set up a service account in Google Cloud with the necessary roles (e.g.,
BigQuery Data Editor
). - Download the JSON key file for the service account.
- Use the
gbqKeyFile
option in the configuration to provide the path to the service account key.
gbqKeyFile: './path/to/keyfile.json'
License
This package is licensed under the MIT License.