@wholebuzz/mapreduce
v0.0.21
Published
Communication-free MapReduce for the 99%
Downloads
0
Readme
@wholebuzz/mapreduce
Communication-free MapReduce for the 99%
@wholebuzz/mapreduce
is a novel MapReduce [1]
implementation trading shuffle latency for liberated scheduling. It works with your existing
infrastructure.
For a full example, see mapreduce-example.
@wholebuzz/mapreduce
operates on databases of rows, usually expressed as SSSS-of-NNNN
sharded flat files
in Apache Parquet or (gzipped) JSON Lines format.
The API and CLI mostly follow Hadoop MapReduce except that each
MapContext and ReduceContext
use keyProperty
and valueProperty
to dereference an underlying Object, instead of requiring the data to
be shaped like { key: Key, value: Value }
.
Custom Mappers and Reducers are typically compiled with webpack and deployed to a
cloud storage bucket. The path is then supplied to the Container or CLI (e.g. --plugins s3://my-bucket/index.js
).
See mapreduce-example.
Mapper API
interface Mapper<Key, Value> extends Base<Key, Value> {
map: (key: Key, value: Value, context: MapContext<Key, Value>) => void | Promise<void>
}
Reducer API
interface Reducer<Key, Value> extends Base<Key, Value> {
reduce: (key: Key, values: Value[], context: ReduceContext<Key, Value>) => void | Promise<void>
}
Sharding
The default Shard Function is modulo identity
for numbers and md5lsw
for string. MD5 is supported by
MySQL, PostgreSQL, and SQLServer, allowing sharded database queries. However another hash (e.g. fnv-plus
)
may be preferred.
Example
Sort (and shard) the supplied test data by guid
Also converts from JSON to JSON Lines.
$ yarn mapreduce -v \
--map IdentityMapper \
--reduce IdentityReducer \
--inputPaths ./test/test-SSSS-of-NNNN.json.gz \
--outputPath ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputShards 8 \
-D keyProperty=guid
Re-sort (and shard) the output of the previous command by id
And convert from JSON Lines back to JSON.
$ yarn mapreduce -v \
--map IdentityMapper \
--reduce IdentityReducer \
--inputPaths ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputPath ./test-id-sorted-SSSS-of-NNNN.json.gz \
--outputShards 4 \
-D keyProperty=id
And we're back to where we started
$ diff ./test-id-sorted-0000-of-0004.json.gz ./test/test-0000-of-0004.json.gz
$ diff ./test-id-sorted-0001-of-0004.json.gz ./test/test-0001-of-0004.json.gz
$ diff ./test-id-sorted-0002-of-0004.json.gz ./test/test-0002-of-0004.json.gz
$ diff ./test-id-sorted-0003-of-0004.json.gz ./test/test-0003-of-0004.json.gz
Same example using arbitrary number of workers
Sort the supplied test data by guid
using three workers
$ export MY_JOB_ID=`yarn --silent mapreduce job --new | tail -1 | jq -r ".jobid"`
$ for ((i = 0; i < 3; i++)); do yarn mapreduce -v \
--jobid $MY_JOB_ID \
--map IdentityMapper \
--reduce IdentityReducer \
--inputPaths ./test/test-SSSS-of-NNNN.json.gz \
--outputPath ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputShards 8 \
--numWorkers 3 \
--workerIndex $i \
-D keyProperty=guid &; done
Sort the supplied test data by guid
using three workers and job config
$ export MY_JOB_CONFIG=`yarn --silent mapreduce job --new \
--map IdentityMapper \
--reduce IdentityReducer \
--inputPaths ./test/test-SSSS-of-NNNN.json.gz \
--outputPath ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputShards 8 \
--numWorkers 3 \
-D keyProperty=guid \
| tail -1`
$ for ((i = 0; i < 3; i++)); do yarn mapreduce -v --jobConfig "$MY_JOB_CONFIG" --workerIndex $i &; done
Sort the supplied test data by guid
using three containerized workers
$ export MY_JOB_ID=`yarn --silent mapreduce job --new | tail -1 | jq -r ".jobid"`
$ for ((i = 0; i < 3; i++)); do docker run -d -e "RUN_ARGS= \
--jobid $MY_JOB_ID \
--map IdentityMapper \
--reduce IdentityReducer \
--inputPaths /mnt-cwd/test/test-SSSS-of-NNNN.json.gz \
--outputPath /mnt-cwd/test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--shuffleDirectory /mnt-cwd/ \
--outputShards 8 \
--numWorkers 3 \
--workerIndex $i \
-D keyProperty=guid" \
-v $PWD:/mnt-cwd \
--rm -it wholebuzz/mapreduce; done
Same example running in the cloud
- Supply
s3://
orgs://
URLs for--inputPaths
,--outputPath
, and--shuffleDirectory
. - Use your preferred scheduler to start the workers (e.g. Airflow, Hadoop, Kubeflow, Kubernetes, EC2, or GCE). See mapreduce-example.
Example of transforming the data with TransformMapper
Configuration variables ending in Code
will be eval()
'd.
$ yarn mapreduce -v \
--map TransformMapper \
--inputPaths ./test/test-SSSS-of-NNNN.json.gz \
--outputPath ./test-md5-SSSS-of-NNNN.jsonl.gz \
--outputShards 4 \
-D keyProperty=guid \
-D transformCode="const { md5 } = require('@wholebuzz/fs/lib/util'); (x) => ({ guid: x.guid, hash: md5(x.guid) })"
Example of logging the data with TransformMapper
The special path /dev/null
writes no output.
$ yarn mapreduce -v \
--map TransformMapper \
--unpatchReduce \
--inputPaths ./test/test-SSSS-of-NNNN.json.gz \
--outputPath /dev/null \
-D transformCode="(x) => console.log('Hello row', x)"
Example merging two sharded datasets with streaming K-way merge sort
$ yarn mapreduce -v \
--unpatchMap \
--reduce MergePropertiesReducer \
--inputPaths ./test-md5-SSSS-of-NNNN.jsonl.gz ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputPath ./test-merge-SSSS-of-NNNN.jsonl.gz \
--outputShards 4 \
-D keyProperty=guid
Same example but aggregating distinct records instead of combining
$ yarn mapreduce -v \
--unpatchMap \
--reduce MergeNamedValuesReducer \
--inputPaths ./test-md5-SSSS-of-NNNN.jsonl.gz ./test-guid-sorted-SSSS-of-NNNN.jsonl.gz \
--outputPath ./test-merge-SSSS-of-NNNN.jsonl.gz \
--outputShards 4 \
-D keyProperty=guid
Example using a Postgres database for input
Add DEBUG=knex:query
to your environment to see the (sharded) queries.
$ yarn mapreduce -v \
--inputType postgresql \
--inputName postgres \
--inputHost localhost \
--inputPort 5433 \
--inputUser postgres \
--inputPassword postgres \
--inputTable dbcptest \
--inputShards 4 \
--inputShardBy guid \
--outputPath ./dbcptest-SSSS-of-NNNN.jsonl.gz \
--outputShards 4 \
-D keyProperty=guid
Technical overview
Instead of starting a Master which starts simultaneous Mappers and Reducers on a cluster, let's decouple Mappers' dependency on Reducers by using cloud storage as intermediary. We can run a large MapReduce job with a single-thread and zero communication. Or we can run the usual many parallel Mappers, synchronizing only (via file IPC) the completion of the stages of Shuffle and Reduce.
Top-level:
API:
MapReduce(filein, fileout, mapperClass, reducerClass, combinerClass)
input:
filein-SSSS-of-NNNN.jsonl.gz
e.g.filein-0000-of-0004.jsonl.gz
,filein-0001-of-0004.jsonl.gz
, ...,filein-0003-of-0004.jsonl.gz
output:
fileout-SSSS-of-NNNN.jsonl.gz
e.g.fileout-0000-of-0004.jsonl.gz
,fileout-0001-of-0004.jsonl.gz
, ...,fileout-0003-of-0004.jsonl.gz
Mapper step:
For each input shard:
- Map() is called for each line of the JSONL
- Outputs
numOutputShards
files - The outputs are externally sorted by the new key (if changed)
Output is written (according to shard function) to:
shuffle-SSSS-of-NNNN.inputshard-SSSS-of-NNNN.jsonl.gz
, e.g.shuffle-0000-of-0004.inputshard-0000-of-0004.jsonl.gz
,shuffle-0000-of-0004.inputshard-0001-of-0004.jsonl.gz
, ...,shuffle-0000-of-0004.inputshard-0003-of-0004.jsonl.gz
,
shuffle-0001-of-0004.inputshard-0000-of-0004.jsonl.gz
,shuffle-0001-of-0004.inputshard-0001-of-0004.jsonl.gz
, ...,shuffle-0001-of-0004.inputshard-0003-of-0004.jsonl.gz
,...,
shuffle-0003-of-0004.inputshard-0000-of-0004.jsonl.gz
,shuffle-0003-of-0004.inputshard-0001-of-0004.jsonl.gz
, ...,shuffle-0003-of-0004.inputshard-0003-of-0004.jsonl.gz
The number of output files is equal to InputShards * OutputShards
Each input shard makes one contribution (one file) to each output shard, e.g.
- Processing
filein-0003-of-0004.jsonl.gz
produces: shuffle-0000-of-0004.inputshard-0003-of-0004.jsonl.gz
,shuffle-0001-of-0004.inputshard-0003-of-0004.jsonl.gz
, ...,shuffle-0003-of-0004.inputshard-0003-of-0004.jsonl.gz
- Processing
Shuffle step:
- If the output of the external-sort was streamed to cloud storage, this step is already done.
- Otherwise, upload files with AnyFileSystem copy between temporary directory (e.g. local) and reduce directory (e.g. s3://).
Reducer step:
For each output shard:
- Streaming K-way merge sort on
shuffle-SSSS-of-NNNN.inputshard-*-of-0004.jsonl.gz
, e.g. mergeshuffle-0002-of-0004.inputshard-0000-of-0004.jsonl.gz
,shuffle-0002-of-0004.inputshard-0001-of-0004.jsonl.gz
, ...,shuffle-0002-of-0004.inputshard-0003-of-0004.jsonl.gz
- To produce
fileout-0002-of-0004.jsonl.gz
- Calls Reduce() for each Key and Value set
- Streaming K-way merge sort on
Output is written to:
fileout-SSSS-of-NNNN.jsonl.gz
for each shard, e.g.fileout-0000-of-0004.jsonl.gz
,fileout-0001-of-0004.jsonl.gz
, ...,fileout-0003-of-0004.jsonl.gz
Cleanup:
- The shuffle files can be removed
References:
- [1] Dean, Ghemawat. 2004. MapReduce: Simplified Data Processing on Large Clusters
- [2] Rahn, Sanders, Singler. 2010. Scalable Distributed-Memory External Sorting
- [3] Ding, Wang, Xin. 2013. ComMapReduce: An Improvement of MapReduce with Lightweight Communication Mechanisms
- [4] Chen. 2020. External Sorting Algorithm: State-of-the-Art and Future