# -*- coding: utf-8 -*-
import threading
import sys
from .defaults import get_logger
try:
import Queue as queue
except ImportError:
import queue
import traceback
[文档]class TaskQueue(object):
def __init__(self, producer, consumers):
self.__producer = producer
self.__consumers = consumers
self.__threads = []
# must be an infinite queue, otherwise producer may be blocked after all consumers being dead.
self.__queue = queue.Queue()
self.__lock = threading.Lock()
self.__exc_info = None
self.__exc_stack = ''
[文档] def run(self):
self.__add_and_run(threading.Thread(target=self.__producer_func))
for c in self.__consumers:
self.__add_and_run(threading.Thread(target=self.__consumer_func, args=(c,)))
# give KeyboardInterrupt chances to happen by joining with timeouts.
while self.__any_active():
for t in self.__threads:
t.join(1)
if self.__exc_info:
get_logger().debug('An exception was thrown by producer or consumer, backtrace: {0}'.format(self.__exc_stack))
raise self.__exc_info[1]
[文档] def put(self, data):
assert data is not None
self.__queue.put(data)
[文档] def get(self):
return self.__queue.get()
[文档] def ok(self):
with self.__lock:
return self.__exc_info is None
def __add_and_run(self, thread):
thread.daemon = True
thread.start()
self.__threads.append(thread)
def __any_active(self):
return any(t.is_alive() for t in self.__threads)
def __producer_func(self):
try:
self.__producer(self)
except:
self.__on_exception(sys.exc_info())
self.__put_end()
else:
self.__put_end()
def __consumer_func(self, consumer):
try:
consumer(self)
except:
self.__on_exception(sys.exc_info())
def __put_end(self):
for i in range(len(self.__consumers)):
self.__queue.put(None)
def __on_exception(self, exc_info):
with self.__lock:
if self.__exc_info is None:
self.__exc_info = exc_info
self.__exc_stack = traceback.format_exc()