[python] Dead simple example of using Multiprocessing Queue, Pool and Locking

I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.

Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)

This question is related to python python-2.7 multiprocessing

The answer is


This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.

This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()

Here is my personal goto for this topic:

Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        # Spawn up to 9999999 jobs, I think this is the maximum possible.
        # I do not know what happens if you exceed this.
        pool.map_async(func_worker, func_args).get(9999999)
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()

Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

Basically:

  • pool = ThreadPoolExecutor(6) creates a pool for 6 threads
  • Then you have bunch of for's that add tasks to the pool
  • pool.submit(execute_run, rp) adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function.
  • pool.join waits until all tasks are done.

For everyone using editors like Komodo Edit (win10) add sys.stdout.flush() to:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

or as first line to:

    if __name__ == '__main__':
       sys.stdout.flush()

This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.


Examples related to python

programming a servo thru a barometer Is there a way to view two blocks of code from the same file simultaneously in Sublime Text? python variable NameError Why my regexp for hyphenated words doesn't work? Comparing a variable with a string python not working when redirecting from bash script is it possible to add colors to python output? Get Public URL for File - Google Cloud Storage - App Engine (Python) Real time face detection OpenCV, Python xlrd.biffh.XLRDError: Excel xlsx file; not supported Could not load dynamic library 'cudart64_101.dll' on tensorflow CPU-only installation

Examples related to python-2.7

Numpy, multiply array with scalar Not able to install Python packages [SSL: TLSV1_ALERT_PROTOCOL_VERSION] How to create a new text file using Python Could not find a version that satisfies the requirement tensorflow Python: Pandas pd.read_excel giving ImportError: Install xlrd >= 0.9.0 for Excel support Display/Print one column from a DataFrame of Series in Pandas How to calculate 1st and 3rd quartiles? How can I read pdf in python? How to completely uninstall python 2.7.13 on Ubuntu 16.04 Check key exist in python dict

Examples related to multiprocessing

Passing multiple parameters to pool.map() function in Python Dead simple example of using Multiprocessing Queue, Pool and Locking Using multiprocessing.Process with a maximum number of simultaneous processes Multiprocessing a for loop? RuntimeError on windows trying python multiprocessing How to use multiprocessing queue in Python? Shared-memory objects in multiprocessing Python multiprocessing PicklingError: Can't pickle <type 'function'> multiprocessing.Pool: When to use apply, apply_async or map? How to troubleshoot an "AttributeError: __exit__" in multiproccesing in Python?