8.1. Dynamic Load Balancing¶
In this example we combine the master-worker pattern with message passing. The master has many tasks that need to be completed. The master starts by sending some data needed to complete a task to each worker process. Then the master loops and waits to hear back from each worker by receiving a message from any of them. When the master receives a message from a worker, it sends that worker more data for its next task, unless there are no more tasks to complete, in which case it sends a special message to the worker to stop running.
In this simple example, each worker is sent the number of seconds it should ‘sleep’, which can vary from 1 to 8. This illustrates varying sizes of workloads. Because of the code’s simplicity, the number of tasks each worker does doesn’t vary by much. In some real examples, the time for one task my be quite different than the time for another, which could have a different outcome, in which some workers were able to complete more tasks as others were doing long ones.
This approach can sometimes be an improvement on the assignment of and equal number of tasks to all processes.
Program file: 17dynamicLoadBalance.py
Example usage:
python run.py ./17dynamicLoadBalance.py N
Here the N signifies the number of processes to start up in mpi.
run.py executes this program within mpirun using the number of processes given.
Exercises:
Run, using N = 4 processes
Study the execution carefully. Note that with 4 processes, 3 are workers. The total number of tasks is 3*4, or 12. Which process does the most work? You can count by looking for the lines that end with “… from X”, where X is a worker process id.
Try with N = 8 (7 workers).
8.1.1. Dive into the code¶
Note in this case how the master, whose id is 0, handles the assignment of tasks, while the workers simply do what they are sent until they are told to stop.
from mpi4py import MPI
import numpy as np
import time
def genTasks(numTasks):
np.random.seed(1000) # run the same set of timed tasks
return np.random.randint(low=1, high=9, size=numTasks)
# tags that can be applied to messages
WORKTAG = 1
DIETAG = 2
def main():
comm = MPI.COMM_WORLD
id = comm.Get_rank() #number of the process running the code
numProcesses = comm.Get_size() #total number of processes running
myHostName = MPI.Get_processor_name() #machine name running the code
if (id == 0) :
# create an arbitrary array of numbers for how long each
# worker task will 'work', by sleeping that amount of seconds
numTasks = (numProcesses-1)*4 # avg 4 tasks per worker process
workTimes = genTasks(numTasks)
print("master created {} values for sleep times:".format(workTimes.size), flush=True)
print(workTimes, flush=True)
handOutWork(workTimes, comm, numProcesses)
else:
worker(comm)
def handOutWork(workTimes, comm, numProcesses):
totalWork = workTimes.size
workcount = 0
recvcount = 0
print("master sending first tasks", flush=True)
# send out the first tasks to all workers
for id in range(1, numProcesses):
if workcount < totalWork:
work=workTimes[workcount]
comm.send(work, dest=id, tag=WORKTAG)
workcount += 1
print("master sent {} to {}".format(work, id), flush=True)
# while there is still work,
# receive result from a worker, which also
# signals they would like some new work
while (workcount < totalWork) :
# receive next finished result
stat = MPI.Status()
workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
recvcount += 1
workerId = stat.Get_source()
print("master received {} from {}".format(workTime, workerId), flush=True)
#send next work
comm.send(workTimes[workcount], dest=workerId, tag=WORKTAG)
workcount += 1
print("master sent {} to {}".format(work, workerId), flush=True)
# Receive results for outstanding work requests.
while (recvcount < totalWork):
stat = MPI.Status()
workTime = comm.recv(source=MPI.ANY_SOURCE, status=stat)
recvcount += 1
workerId = stat.Get_source()
print("end: master received {} from {}".format(workTime, workerId), flush=True)
# Tell all workers to stop
for id in range(1, numProcesses):
comm.send(-1, dest=id, tag=DIETAG)
def worker(comm):
# keep receiving messages and do work, unless tagged to 'die'
while(True):
stat = MPI.Status()
waitTime = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat)
print("worker {} got {}".format(comm.Get_rank(), waitTime), flush=True)
if (stat.Get_tag() == DIETAG):
print("worker {} dying".format(comm.Get_rank()), flush=True)
return
# simulate work by sleeping
time.sleep(waitTime)
# indicate done with work by sending to Master
comm.send(waitTime, dest=0)
########## Run the main function
main()