Bodo supports a subset of Python that is commonly used for data analytics and machine learning. This section describes this subset and explains how parallelization is performed. The supported data structures for parallel/distributed datasets are Numpy arrays, and Pandas Dataframe, Series and Index objects.
Bodo parallelizes programs automatically based on the map-reduce parallel pattern. Put simply, this means the compiler analyzes the program to determine whether each parallelizable data structure and operation should be distributed or not. This analysis uses the semantics of operations as the program below demonstrates:
This program reads a one-dimensional array called A from file and sums its values. Array A is the output of an I/O operation and is input to np.sum. Based on semantics of I/O and np.sum, Bodo determines that A can be distributed since I/O can output a distributed array and np.sum can take a distributed array as input. In map-reduce terminology, A is output of a map operator and is input to a reduce operator. Hence, Bodo distributes A and all operations associated with A (i.e. I/O and np.sum) and generates a parallel binary. This binary replaces the example_1D function in the Python program automatically.
Bodo can only analyze and parallelize the supported data-parallel operations of Numpy and Pandas (listed in this manual). Hence, only the supported operations can be used for distributed datasets and computations. The sequential computation on other data structures can be any code that Numba supports.
Data is distributed in one-dimensional block (1D_Block) manner among processors by default. This means that processors own equal chunks of each distributed array, DataFrame or Series, except possibly the last processor. Dataframes and multi-dimensional arrays are distributed along their first dimension. For example, chunks of rows are distributed for dataframes and 2D matrices. The figure below illustrates the distribution of a 9-element one-dimensional Numpy array, as well as a 9 by 2 array, on three processors:
Bodo replicates the arrays that are not distributed. This is called REP distribution for consistency.
Argument and Return Variables
Bodo assumes argument and return variables to jitted functions are replicated. However, the user can annotate these variables to indicate distributed data. In this case, the user is responsile for handling of the distributed data chuchunks outside the Bodo scope. For example, the data can come from other jitted functions:
The distributions found by Bodo can be printed either by setting BODO_DISTRIBUTED_DIAGNOSTICS=1 or calling distributed_diagnostics() on the compiled function. For example, consider example code below:
Here is the diagnostics output:
This report suggests that the function has an array that is distributed in 1D_Block fashion. The variable name is renamed from A to $A.39.101 through the optimization passes. The report also suggests that there is a parfor (data-parallel for loop) that is 1D_Block distributed.
Explicit Parallel Loops
Sometimes explicit parallel loops are required since a program cannot be written in terms of data-parallel operators easily. In this case, one can use Bodo's
prange in place of
range to specify that a loop can be parallelized. The user is required to make sure the loop does not have cross iteration dependencies except for supported reductions.
The example below demonstrates a parallel loop with a reduction:
Currently, reductions using
max operators are supported.
For Parquet and CSV, the syntax is the same as Pandas:
For HDF5, the syntax is the same as the h5py package. For example:
Numpy's fromfile and tofile are supported as below:
Bodo automatically parallelizes I/O of different nodes in a distributed setting without any code changes.
Bodo needs to know the types of input arrays. If the file name is a constant string, Bodo tries to look at the file at compile time and recognize the types. Otherwise, the user is responsile for providing the types similar to Numba's typing syntax. For example:
Bodo assigns REP to distributable arguments of
Bodo provides a limited number of parallel APIs to support advanced cases that may need them.
bodo.get_rankGet the rank of the process (same as MPI_Comm_rank).
bodo.get_sizeGet the number of processes (same as MPI_Comm_size).
bodo.barrierBlocks until all processes have reached this call (same as MPI_Barrier).
bodo.gathervGathers all data chunks into process 0 (same as MPI_Gatherv).
bodo.allgathervGathers all data chunks and delivers to all processes (same as MPI_Allgatherv).