7.2. Collective Communication: scatter and gather arrays

The mpi4py library of functions has several collective communication functions that are designed to work with arrays created using the python library for numerical analysis computations called numpy.

If you are unfamiliar with using numpy, and want to know more about its features and available methods, you will need to consult another tutorial for that. It should be possible to understand the following scatter, then gather example by observing the results that get printed, even if you are unfamiliar with the functions from numpy that are used to create the 1-D array.

The numpy library has special data structures called arrays, that are common in other programming languages. A 1-dimensional array of integers can be envisioned very much like a list of integers, where each value in the array is at a particular index. The mpi4py Scatter function, with a capital S, can be used to send portions of a larger array on the conductor to the workers, like this:

../_images/Scatter_array.png

The result of doing this then looks like this, where each process has a portion of the original that they can then work on:

../_images/after_Scatter_array.png

The reverse of this process can be done using the Gather function.

In this example, a 1-D array is created by the conductor, then scattered, using Scatter (capital S). After each smaller array used by each process is changed, the Gather (capital G) function brings the full array with the changes back into the conductor.

Program file: 16ScatterGather.py

Example usage:

python run.py ./16ScatterGather.py N

Here the N signifies the number of processes to start up in mpi.

run.py executes this program within mpirun using the number of processes given.

Exercises:

7.2.1. Explore the code

Note

In the code below, note how all processes must call the Scatter and Gather functions.

from mpi4py import MPI
import numpy as np


# Create a 1D array to be scattered.
def genArray(numProcesses, numElementsPerProcess):

    data = np.linspace(1, #start
                numProcesses*numElementsPerProcess, #stop
                numElementsPerProcess*numProcesses, #total elements
                dtype='u4')  # 4-byte unsigned integer data type
    return data

def timesTen(a):
    return(a*10);

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    # in mpi4py, the uppercase Scatter method works on arrays generated by
    # numpy routines.
    #
    # Here we will create a single array designed to then scatter 3 elements
    # of it in a smaller array to each process.

    numDataPerProcess = 3

    if id == 0:
        data = genArray(numProcesses, numDataPerProcess)
        #genListOfLists(numElements)
        print("Conductor {} of {} on {} has created array: {}"\
        .format(id, numProcesses, myHostName, data))
    else:
        data = None
        print("Worker Process {} of {} on {} starts with {}"\
        .format(id, numProcesses, myHostName, data))

    #scatter one small array from a part of the large array
    # on node 0 to each of the processes
    smallerPart = np.empty(numDataPerProcess, dtype='u4') # allocate space for result on each process
    comm.Scatter(data, smallerPart, root=0)

    if id == 0:
        print("Conductor {} of {} on {} has original array after Scatter: {}"\
        .format(id, numProcesses, myHostName, data))

    print("Process {} of {} on {} has smaller part after Scatter {}"\
    .format(id, numProcesses, myHostName, smallerPart))

    # do some work on each element
    newValues = timesTen(smallerPart)

    print("Process {} of {} on {} has smaller part after work {}"\
    .format(id, numProcesses, myHostName, newValues))

    # All processes participate in gathering each of their parts back at
    # process 0, where the original data is now overwritten with new values
    # from eqch process.
    comm.Gather(newValues, data, root=0)

    if id == 0:
        print("Conductor {} of {} on {} has new data array after Gather:\n {}"\
        .format(id, numProcesses, myHostName, data))


########## Run the main function
main()
You have attempted of activities on this page