最近一个项目上使用线程池,设定处理项1W,10线程,但是需要检测线程进行状态。出现错误N次,就自动终止线程。实现如下:
线程池代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 ERRORCOUNT=0 IS_EXIT=True class Worker (Thread) : worker_count=0 timeout=1 def __init__ (self, workQueue,resultQueue,**kwds) : Thread.__init__(self,**kwds) self.id=Worker.worker_count Worker.worker_count+=1 self.setDaemon(True ) self.workQueue=workQueue self.resultQueue=resultQueue self.start() def run (self) : while IS_EXIT: try : callable,args,kwds=self.workQueue.get(timeout=Worker.timeout) res=callable(*args,**kwds) self.resultQueue.put(res) except Queue.Empty: break except : pass class WorkerManager : def __init__ (self, num_of_workers=10 ,timeout=2 ) : self.workQueue=Queue.Queue() self.resultQueue=Queue.Queue() self.workers=[] self.timeout=timeout self._recruitThreads(num_of_workers) def _recruitThreads (self,num_of_workers) : for i in range(num_of_workers): worker=Worker(self.workQueue,self.resultQueue) self.workers.append(worker) def wait_for_complete (self) : while len(self.workers): worker=self.workers.pop() worker.join(10 ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append(worker) def add_job (self,callable,*args,**kwds) : self.workQueue.put((callable,args,kwds)) def get_result (self,*args,**kwds) : return self.resultQueue.get(*args,**kwds)
在WorkerManager中使用到一个全局变量IS_EXIT 用来判断是否需要退出线程
调用线程代码 1 2 3 4 wm=WorkerManager(10 ) for i in range(10000 ): wm.add_job(do_job) wm.wait_for_complete()
具体工作代码 1 2 3 4 5 6 7 8 9 def do_job () : global ERRORCOUNT global IS_EXIT try : do anything except : ERRORCOUNT+=1 if ERRORCOUNT>5 : IS_EXIT=False
此处使用了全局变量ERRORCOUNT 统计错误数量,超过指定次数,则设置IS_EXIT=False 通知线程停止执行。 至此基本上满足项目所需,但并不友好,应有更好的方式。