Python Threading и KeyboardInterrupt.
Как организовать пул потоков?
Допустим у вас есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков.
За всё время работы скрипта стартует только 10 потоков. Каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:
# -*- coding: utf-8 -*-
import threading
inporty Queue
import time
import traceback
class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue
def run(self):
while True:
try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break # данные закончились, прекращаем работу
try: self.work(item) # работа с данными
except Exception: traceback.print_exc()
time.sleep(0.5)
self.__queue.task_done() # задача завершена
return
def work(self,item):
print item
#pass
def main():
# выводим в 5 потоков цифры от 1 до 100.
queue = Queue.Queue()
num_threads = 5 # 5 потоков
for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем поток
t.start() # стартуем
time.sleep(0.1) # чтобы в консоли друг на друга не накладывались
queue.join() #блокируем выполнение программы, пока не будут израсходованы данные
print "Done!"
if __name__ == '__main__':
main()
Хотелось бы еще, чтобы было можно все это в любой момент остановить. Но поскольку выполнение программы блокируем с помощью queue.join(), то скрипт не обратит внимание на нажатия клавиш Ctrl+C для перерывания программы. Воспользуемся данным решением.
# -*- coding: utf-8 -*-
import threading
import Queue
import time
import traceback
class Worker(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.__queue = queue
self.kill_received = False # флаг прекращения работы
def run(self):
while not self.kill_received:
try: item = self.__queue.get_nowait() # ждём данные
except Queue.Empty: break
try: self.work(item)
except Exception: traceback.print_exc()
time.sleep(0.5)
self.__queue.task_done() # задача завершена
self.__queue.put(item) # зациклим
return
def work(self,item):
print item
def main():
queue = Queue.Queue()
num_threads = 5 # 5 потоков
threads = []
for x in xrange(100):
queue.put(x) # заносим данные в очередь
for i in xrange(num_threads):
t = Worker(queue) # создаем нить
threads.append(t)
t.start() # стартуем
time.sleep(0.1)
# ждем пока в живых не останется только главный поток
while threading.activeCount()>1:
try:
time.sleep(1)
except KeyboardInterrupt:
print 'Ctrl+C received! Sending kill to threads...'
for t in threads:
t.kill_received = True # даем сигнал о завершении всем потокам
print "Done!"
if __name__ == '__main__':
main()
Комментариев нет:
Отправить комментарий