I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.
Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
This question is related to
python
python-2.7
multiprocessing
This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.
This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.
from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
queue = None
def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()
def render(self, item):
self.queue.put(item)
def upload(self):
while True:
item = self.queue.get()
if item is None:
break
# process your item here
self.queue.task_done()
def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
For everyone using editors like Komodo Edit (win10) add sys.stdout.flush()
to:
def mp_worker((inputs, the_time)):
print " Process %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
sys.stdout.flush()
or as first line to:
if __name__ == '__main__':
sys.stdout.flush()
This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.
Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):
def execute_run(rp):
... do something
pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
for en in TESTED_ENERGIES:
for ecut in TESTED_E_CUT:
rp = RunParams(
simulations, DEST_DIR,
PARTICLE, mat, 960, 0.125, ecut, en
)
pool.submit(execute_run, rp)
pool.join()
Basically:
pool = ThreadPoolExecutor(6)
creates a pool for 6 threadspool.submit(execute_run, rp)
adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function. pool.join
waits until all tasks are done. Here is my personal goto for this topic:
Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args
# Work
# ...
# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()
def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)
# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]
try:
# Spawn up to 9999999 jobs, I think this is the maximum possible.
# I do not know what happens if you exceed this.
pool.map_async(func_worker, func_args).get(9999999)
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('\033[0m')
sys.stdout.write('User Interupt\n')
pool.close()
if __name__ == '__main__':
main()
Source: Stackoverflow.com