Multiprocessing in Python

Most of the codes I develop run in parallel using MPI (Message Passing Interface) using the python wrapper, mpi4py. There is a reason why highly scalable programs use this approach, and that is because each processor handles its own chunk of memory and communicates with other processors only when it’s needed. PETSc, for example, is a behemoth computing framework entirely written in the MPI computing philosophy. Despite MPI’s efficiency, there are some barriers:

  • MPICH or OpenMPI must be already compiled on the system
  • Python needs mpi4py to communicate in parallel
  • Programming with MPI is sometimes extremely tedious
  • The problem may not benefit from distributed memory

If any of these are true, then OpenMP is a simple alternative. The OpenMP philosophy is to use a “master” process to spawn a bunch of worker processes that each share the same memory allocation. Clearly, this is approach is unsuitable if computation spans more than one computation node. On the positive side, most programming languages already have some implementation of OpenMP without the need for additional software. We explore such an implementation withihn the multiprocessing module in Python.

Multiprocessing module

There are 2 main objects in the multiprocessing module, which can be imported as:

from multiprocessing import Pool, Queue

I have found Queue to be the most intuitive. It sets up a queue of tasks to be executed by each processor. When one task completes, the free processor takes another task from the queue until there are no tasks remaining.

from multiprocessing import Queue, Process, cpu_count

n = n_evaluations
processes = []
q_in = Queue(1)
q_in = Queue()

nprocs = cpu_count()

# initialise the processes
for i in range(nprocs):
    p = Process(target=function, args=args, kwargs=kwargs)
    processes.append(p)

for p in processes:
    p.daemon = True
    p.start()

# put items in the queue
sent = [q_in.put((i, var)) for i in range(n)]

# get the results
for i in range(len(sent)):
    i, res = q_out.get()

# wait until each processor has finished
[p.join() for p in processes]

Set nprocs to the number of processes (the number of CPUs) and pass i to the queue to reconstruct the ordering of the results.

In the above example, the Queue object can only be defined from the __main__ namespace. However, after some tinkering I got this to work within a Python class:

from multiprocessing import Queue, Process, cpu_count

class UptownFunc:
    def __init__(self):
        pass

    def _func_queue(self, func, q_in, q_out, var, *args, **kwargs):
        """ Retrive processes from the queue """
        while True:
            pos, var = q_in.get()
            if pos is None:
                break

            pass_args = [var]
            pass_args.extend(args)
            res = func(*pass_args, **kwargs)
            q_out.put((pos, res))
        return

    def parallelise_function(self, var, func, *args, **kwargs):
        """ Split evaluations of func across processors """
        n = len(var)

        processes = []
        q_in = Queue(1)
        q_out = Queue()

        nprocs = cpu_count()

        for i in range(nprocs):
            pass_args = [func, q_in, q_out]
            pass_args.extend(args)

            p = Process(target=self._func_queue,\
                        args=tuple(pass_args),\
                        kwargs=kwargs)

            processes.append(p)

        for p in processes:
            p.daemon = True
            p.start()

        # put items in the queue
        sent = [q_in.put((i, var[i])) for i in range(n)]
        [q_in.put((None, None)) for _ in range(nprocs)]

        # get the results
        results = []
        ordering = []
        for j in range(len(sent)):
            i, res = q_out.get()
            results.append(res)
            ordering.append(i)

        # wait until each processor has finished
        [p.join() for p in processes]

        # reorder results
        return results[ordering]

And that’s it! Pass any function to parallelise_function and supply a list of var to evaulate in parallel along with optional arguments and keywords.

If this looks familiar it’s because I’ve taken it directly from PyCurious: a tool to calculate the Curie depth from windows of the magnetic anomaly. The above code snippet parallelises the computation of Curie depth across each window.

Related