2.4 Other Communication Patterns

There are many cases when a conductor process obtains or creates data that needs to be sent or received from all the other processes. In this section, we will discuss some special communication constructs specifically for those purposes.

Broadcast

A broadcast sends data from one process to all other processes. A common use of boradcasting is to send user input to all the processes in a parallel program, as shown in the program below:

Program file: 09broadcastUserInput.py

from mpi4py import MPI
import sys

# Determine if user provided a string to be broadcast.
# If not, quit with a warning.
def checkInput(id):
    numArguments = len(sys.argv)
    if numArguments == 1:
        #no extra argument was given- conductor warns and all exit
        if id == 0:
            print("Please add a string to be broadcast from conductor to workers")
        sys.exit()

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    if numProcesses > 1 :
        checkInput(id)

        if id == 0:        # conductor
            #conductor: get the command line argument
            data = sys.argv[1]
            print("Conductor Process {} of {} on {} broadcasts \"{}\""\
            .format(id, numProcesses, myHostName, data))

        else :
            # worker: start with empty data
            data = 'No data'
            print("Worker Process {} of {} on {} starts with \"{}\""\
            .format(id, numProcesses, myHostName, data))

        #initiate and complete the broadcast
        data = comm.bcast(data, root=0)
        #check the result
        print("Process {} of {} on {} has \"{}\" after the broadcast"\
        .format(id, numProcesses, myHostName, data))

    else :
        print("Please run this program with the number of processes \
greater than 1")

########## Run the main function
main()

To run the above example use the following command:

python run.py ./09broadcastUserInput.py N dataString

Here the N signifies the number of processes to start up in MPI, which must be greater than one. The dataString must be supplied and represents the string that will be broadcast from the conductor process to the workers.

For example, in this special instance, you can send a string with spaces and other special characters it it in it like this:

python run.py ./09broadcastUserInput.py 2 "hello\ world\!"

Exercise:

  • Run, using N = from 1 through 8 processes, with a string of your choosing.

  • Find the place in this code where the data is being broadcast to all of the processes. Match the prints to the output you observe when you run it.

Broadcasting a list

It is also possible to broadcast more complex data structures, like a list. The following program illustrates how to broadcast a list to every process:

Program file: 11broadcastList.py

from mpi4py import MPI
import numpy as np

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    if numProcesses > 1 :

        if id == 0:        # conductor
            #conductor: generate a dictionary with arbitrary data in it
            data = np.array(range(numProcesses))
            print("Conductor Process {} of {} on {} broadcasts {}"\
            .format(id, numProcesses, myHostName, data))

        else :
            # worker: start with empty data
            data = None
            print("Worker Process {} of {} on {} starts with {}"\
            .format(id, numProcesses, myHostName, data))

        #initiate and complete the broadcast
        data = comm.bcast(data, root=0)

        #check the result
        print("Process {} of {} on {} has {} after the broadcast"\
        .format(id, numProcesses, myHostName, data))

    else :
        print("Please run this program with the number of processes greater than 1")

########## Run the main function
main()

To run the above example use the following command (N signifies the number of processes):

python run.py ./11broadcastList.py N

Exercise:

  • Run, using N = from 1 through 8 processes.

Scatter and Gather

There are often cases when each process can work on some portion of a larger data structure. This can be carried out by having the conductor process maintain the larger structure and send parts to each of the worker processes, keeping part of the structure on the conductor. Each process then works on their portion of the data, and then the conductor can get the completed portions back.

This is so common in message passing parallel processing that there are two special collective communication functions called Scatter() and Gather() that handle this.

The mpi4py Scatter function, with a capital S, can be used to send portions of a larger array on the conductor to the workers, like this:

../_images/Scatter_array.png

The result of doing this then looks like this, where each process has a portion of the original that they can then work on:

../_images/after_Scatter_array.png

The reverse of this process can be done using the Gather function.

In this example, a 1-D array is created by the conductor, then scattered, using Scatter (capital S). After each smaller array used by each process is changed, the Gather (capital G) function brings the full array with the changes back into the conductor.

Note

In the code below, note how all processes must call the Scatter and Gather functions.

Program file: 16ScatterGather.py

from mpi4py import MPI
import numpy as np


# Create a 1D array to be scattered.
def genArray(numProcesses, numElementsPerProcess):

    data = np.linspace(1, #start
                numProcesses*numElementsPerProcess, #stop
                numElementsPerProcess*numProcesses, #total elements
                dtype='u4')  # 4-byte unsigned integer data type
    return data

def timesTen(a):
    return(a*10);

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    # in mpi4py, the uppercase Scatter method works on arrays generated by
    # numpy routines.
    #
    # Here we will create a single array designed to then scatter 3 elements
    # of it in a smaller array to each process.

    numDataPerProcess = 3

    if id == 0:
        data = genArray(numProcesses, numDataPerProcess)
        #genListOfLists(numElements)
        print("Conductor {} of {} on {} has created array: {}"\
        .format(id, numProcesses, myHostName, data))
    else:
        data = None
        print("Worker Process {} of {} on {} starts with {}"\
        .format(id, numProcesses, myHostName, data))

    #scatter one small array from a part of the large array
    # on node 0 to each of the processes
    smallerPart = np.empty(numDataPerProcess, dtype='u4') # allocate space for result on each process
    comm.Scatter(data, smallerPart, root=0)

    if id == 0:
        print("Conductor {} of {} on {} has original array after Scatter: {}"\
        .format(id, numProcesses, myHostName, data))

    print("Process {} of {} on {} has smaller part after Scatter {}"\
    .format(id, numProcesses, myHostName, smallerPart))

    # do some work on each element
    newValues = timesTen(smallerPart)

    print("Process {} of {} on {} has smaller part after work {}"\
    .format(id, numProcesses, myHostName, newValues))

    # All processes participate in gathering each of their parts back at
    # process 0, where the original data is now overwritten with new values
    # from eqch process.
    comm.Gather(newValues, data, root=0)

    if id == 0:
        print("Conductor {} of {} on {} has new data array after Gather:\n {}"\
        .format(id, numProcesses, myHostName, data))


########## Run the main function
main()

Example usage:

python run.py ./16ScatterGather.py N

Exercises:

  • Run, using N = from 2 through 8 processes.

  • If you want to study the numpy part of the code, look up the numpy method linspace() used in genArray().

Applying Gather to PopulateArray

Let’s use the Gather() function to simplify the code in the PopulateArray program. The revised code is shown below:

Program file: PopulateArrayGather.py

from mpi4py import MPI
import numpy as np 

#array size


def populateArray(rank, nprocs):

    nElems = N/nprocs #number of elemes to generate
    isTail = N % nprocs

    start = rank*nElems
    end = (rank+1)*nElems
    if rank == (nprocs-1) and isTail > 0:
        end = N

    length = end-start

    local_array = np.empty(length)
    for i in range(length):
        local_array[i] = 1+start+i
    return local_array



def main():
    comm = MPI.COMM_WORLD
    myId = comm.Get_rank()
    numProcesses = comm.Get_size()

    #SPMD pattern 
    local = populateArray(myId, numProcesses) #generate an empty array of size N

    if myId == 0: #initialize global array on conductor process only
        global_array = np.empty(N)
    else:
        global_array = None

    #gather the local results into global_array
    comm.Gather(local, global_array, root=0) 

    if myId == 0:
        #print out final array
        total = sum(global_array)
        print(total)
        print((N*(N+1)/2))


########## Run the main function
main()

Notice that the Send/Receive pattern used in the Point-to-Point communication section have now been replaced with a single comm.Gather() statement. Additional code however is needed to initialize the global array that holds the final results.

Run this code using the following command (N is the number of processes):

python run.py ./populateArrayGather.py N

Even as we increase the number of processes, the result stays the same.

Exercises

  • Modify the PopulateArrayGather program to do array addition. Each processor should compute a local sum of the array it produces. The global array should be the length of the number of processes, since the Gather() function is gathering a number of sums.

  • Modify the Integration example frome earlier to use the Gather() function.

Reduction

There are often cases when every process needs to complete a partial result of an overall computation. For example if you want to process a large set of numbers by summing them together into one value (i.e. reduce a set of numbers into one value, its sum), you could do this faster by having each process compute a partial sum, then have all the processes communicate to add each of their partial sums together.

This is so common in parallel processing that there is a special collective communication function called reduce that does just this.

The type of reduction of many values down to one can be done with different types of operators on the set of values computed by each process.

Reduce all values using sum and max

In this example, every process computes the square of (id+1). Then all those values are summed together and also the maximum function is applied.

Program file: 12reduction.py


from mpi4py import MPI

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    square = (id+1) * (id+1)

    if numProcesses > 1 :
        #initiate and complete the reductions
        sum = comm.reduce(square, op=MPI.SUM)
        max = comm.reduce(square, op=MPI.MAX)
    else :
        sum = square
        max = square

    if id == 0:        # conductor/root process will print result
        print("The sum of the squares is  {}".format(sum))
        print("The max of the squares is  {}".format(max))

########## Run the main function
main()

Example usage:

python run.py ./12reduction.py N

Exercises:

  • Run, using N = from 1 through 8 processes.

  • Try replacing MPI.MAX with MPI.MIN(minimum) and/or replacing MPI.SUM with MPI.PROD (product). Then save and run the code again.

  • Find the place in this code where the data computed on each process is being reduced to one value. Match the prints to the output you observe when you run it.

Reduction on a list of values

We can try reduction with lists of values, but the behavior matches Python semantics regarding lists.

Note

There are two ways in Python that you might want to sum a set of lists from each process: 1) concatenating the elements together, or 2) summing the element at each location from each process and placing the sum in that location in a new list. In the latter case, the new list is the same length as the original lists on each process.

Program file: 13reductionList.py

from mpi4py import MPI

# Exercise: Can you explain what this function returns,
#           given two lists as input?
def sumListByElements(x,y):
    return [a+b for a, b in zip(x, y)]

def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    myHostName = MPI.Get_processor_name()  #machine name running the code

    srcList = [1*id, 2*id, 3*id, 4*id, 5*id]

    destListMax = comm.reduce(srcList, op=MPI.MAX)
    destListSum = comm.reduce(srcList, op=MPI.SUM)
    #destListSumByElement = comm.reduce(srcList, op=sumListByElements)

    if id == 0:        # conductor/root process will print result
        print("The resulting reduce max list is  {}".format(destListMax))
        print("The resulting reduce sum list is  {}".format(destListSum))
        #print("The resulting reduce sum list is  {}".format(destListSumByElement))

########## Run the main function
main()

Example usage:

python run.py ./13reductionList.py N

Exercises:

  • Run, using N = from 1 through 4 processes.

  • Uncomment the two lines of runnable code that are commented in the main() function. Observe the new results and explain why the MPI.SUM (using the + operator underneath) behaves the way it does on lists, and what the new function called sumListByElements is doing instead.

  • In this code, try to explain what the function called sumListByElements does. If you are unfamiliar with the zip function, look up what it does.

Returning to the Array Example

Let’s return to the problem of array addition, where the goal is to sum all the elements in an array together in parallel. We build on our earlier PopulateArray program. After populating the array in parallel, we will use the scatter() method first to re-distribute the contents of the global_array to each process:

    #gather the local results into global_array (old code)
    comm.Gather(local, global_array, root=0)


    #step 2: compute local sums (new code)
    #now scatter the global array to each process
    comm.Scatter(global_array, local, root=0)

    #compute local sums
    local_sum = sum(local)

Here, the local array is overwritten with the scattered results of global_array. Each process then computes its local sum (stored in local_sum).

Computing the final total using Gather

One way to compute the final total is to use a second invocation of the Gather() function as shown below:

    #gather the local results into global_array (old code)
    comm.Gather(local, global_array, root=0)


    #step 2: compute local sums (new code)
    #now scatter the global array to each process
    comm.Scatter(global_array, local, root=0)

    #compute local sums
    local_sum = sum(local)

    if myId == 0: #initialize all_sums array on conductor only
        all_sums = np.empty(numProcesses)
    else:
        all_sums = None

    #gather the local sums into all_sums array
    comm.Gather(local_sum, all_sums, root=0)


    if myId == 0:
        #print out final array
        finalTotal = sum(all_sums)
        print(finalTotal)
        print((N*(N+1)/2))

The conductor process allocates a new array called all_sums, which is then populated by a second Gather() call. Finally, the conductor process computes the final total by summing together all the subtotals located in the all_sums array.

Computing the final total using Reduce

Recall that the Reduce() function combines all the local values using a common function (i.e. Sum, Max, Min, Prod). Since our goal is to add together all the elments of our array, we can use the Reduce() function as follows:

    #gather the local results into global_array (old code)
    comm.Gather(local, global_array, root=0)


    #step 2: compute local sums
    #now scatter the global array to each process
    comm.Scatter(global_array, local, root=0)

    #compute local sums (SPMD)
    local_sum = sum(local)

    #reduction (new change)
    finalTotal = comm.reduce(local_sum, op=MPI.SUM)


    if myId == 0:
        #print out final array
        print(finalTotal)
        print((N*(N+1)/2))

In addition to being shorter, this code snippet is much simpler than the one employing Gather, as all the conductor process is doing is printing out the result.

Exercise:

Exercise:

Now modify the integration example to use Reduce(). Compare the performance of the integration example with the earlier one that uses Gather(). Which is faster?

You have attempted of activities on this page