请稍侯

Python多进程执行统一任务队列

03 August 2016

说明

现在有这样一个需求,我想开启多个线程去执行一个队列,当当队列没有元素的时候,所有进程阻塞,放入元素到队列中,唤醒一个线程进行执行

代码

# !/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Queue, Process
import time


class WorkManager(object):
    def __init__(self, work_num=1000, thread_num=2):
        self.work_queue = Queue()
        self.processes = []
        self.__init_work_queue(work_num)
        self.__init_thread_pool(thread_num)

    def __init_thread_pool(self, thread_num):
        """
        初始化进程
        :param thread_num 进程数
        """
        for i in range(thread_num):
            self.processes.append(Work(self.work_queue))

    def __init_work_queue(self, jobs_num):
        """
        初始化工作队列
        :param jobs_num 初始任务
        """
        for i in range(jobs_num):
            self.add_job(do_job, i)

    def add_job(self, func, *args):
        """
        添加一项工作入队
        :param func 要执行的任务名
        :param args 参数
        """
        self.work_queue.put((func, list(args)))  # 任务入队,Queue内部实现了同步机制

    def wait_allcomplete(self):
        """
        等待所有线程运行完毕
        """
        for item in self.processes:
            if item.is_alive():
                item.join()

    def stop_all(self):
        """
        停止所有进程
        :return:
        """
        for item in self.processes:
            item.stop = True
            item.terminate()


class Work(Process):
    def __init__(self, work_queue):
        Process.__init__(self)
        self.work_queue = work_queue
        self.stop = False
        self.start()

    def run(self):
        # 死循环,从而让创建的进程程在一定条件下关闭退出
        while not self.stop:
            try:
                print 'before is -- ', get_now_time()
                do, args = self.work_queue.get()
                print 'after is -- ', get_now_time()
                do(args)# 具体要做的任务
            except Exception, e:
                print e
                break


def do_job(args):
    time.sleep(2)  # 模拟处理时间
    import os
    print 'do_job: pid is %s, args--is %s' % (os.getpid(), list(args))


def get_now_time():
    return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))

if __name__ == '__main__':
    start = time.time()
    work_manager = WorkManager(5, 5)

    print 'main sleep 20'
    time.sleep(20)
    print 'main wake and add job to queue'
    work_manager.add_job(do_job, 4, 5)
    print 'main sleep 10'

    time.sleep(10)
    print 'stop all prograss'
    work_manager.stop_all()
    work_manager.wait_allcomplete()
    end = time.time()
    print "cost all time: %s" % (end - start)