Python多进程执行统一任务队列
03 August 2016
说明
现在有这样一个需求,我想开启多个线程去执行一个队列,当当队列没有元素的时候,所有进程阻塞,放入元素到队列中,唤醒一个线程进行执行
代码
# !/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Queue, Process
import time
class WorkManager(object):
def __init__(self, work_num=1000, thread_num=2):
self.work_queue = Queue()
self.processes = []
self.__init_work_queue(work_num)
self.__init_thread_pool(thread_num)
def __init_thread_pool(self, thread_num):
"""
初始化进程
:param thread_num 进程数
"""
for i in range(thread_num):
self.processes.append(Work(self.work_queue))
def __init_work_queue(self, jobs_num):
"""
初始化工作队列
:param jobs_num 初始任务
"""
for i in range(jobs_num):
self.add_job(do_job, i)
def add_job(self, func, *args):
"""
添加一项工作入队
:param func 要执行的任务名
:param args 参数
"""
self.work_queue.put((func, list(args))) # 任务入队,Queue内部实现了同步机制
def wait_allcomplete(self):
"""
等待所有线程运行完毕
"""
for item in self.processes:
if item.is_alive():
item.join()
def stop_all(self):
"""
停止所有进程
:return:
"""
for item in self.processes:
item.stop = True
item.terminate()
class Work(Process):
def __init__(self, work_queue):
Process.__init__(self)
self.work_queue = work_queue
self.stop = False
self.start()
def run(self):
# 死循环,从而让创建的进程程在一定条件下关闭退出
while not self.stop:
try:
print 'before is -- ', get_now_time()
do, args = self.work_queue.get()
print 'after is -- ', get_now_time()
do(args)# 具体要做的任务
except Exception, e:
print e
break
def do_job(args):
time.sleep(2) # 模拟处理时间
import os
print 'do_job: pid is %s, args--is %s' % (os.getpid(), list(args))
def get_now_time():
return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
if __name__ == '__main__':
start = time.time()
work_manager = WorkManager(5, 5)
print 'main sleep 20'
time.sleep(20)
print 'main wake and add job to queue'
work_manager.add_job(do_job, 4, 5)
print 'main sleep 10'
time.sleep(10)
print 'stop all prograss'
work_manager.stop_all()
work_manager.wait_allcomplete()
end = time.time()
print "cost all time: %s" % (end - start)