пятница, 14 декабря 2012 г.

Python Keyboardinterrupt

Python Threading и KeyboardInterrupt.

Как организовать пул потоков?

Допустим у вас есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков.

За всё время работы скрипта стартует только 10 потоков. Каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:

# -*- coding: utf-8 -*-
import threading
inporty Queue
import time
import traceback

class Worker(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.__queue = queue

    def run(self):
        while True:

            try: item = self.__queue.get_nowait() # ждём данные
            except Queue.Empty: break # данные закончились, прекращаем работу

            try: self.work(item) # работа с данными
            except Exception: traceback.print_exc()

            time.sleep(0.5)
            self.__queue.task_done() # задача завершена
        return
    def work(self,item):
        print item
        #pass


def main():
    # выводим в 5 потоков цифры от 1 до 100.
    queue = Queue.Queue()
    num_threads = 5 # 5 потоков

    for x in xrange(100):
        queue.put(x) # заносим данные в очередь
    for i in xrange(num_threads):
        t = Worker(queue) # создаем поток
        t.start() # стартуем
        time.sleep(0.1) # чтобы в консоли друг на друга не накладывались

    queue.join() #блокируем выполнение программы, пока не будут израсходованы данные

    print "Done!"
if __name__ == '__main__':
    main()

Хотелось бы еще, чтобы было можно все это в любой момент остановить. Но поскольку выполнение программы блокируем с помощью queue.join(), то скрипт не обратит внимание на нажатия клавиш Ctrl+C для перерывания программы. Воспользуемся данным решением.

# -*- coding: utf-8 -*-
import threading
import Queue
import time
import traceback



class Worker(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.__queue = queue

        self.kill_received = False # флаг прекращения работы
    def run(self):
        while not self.kill_received:

            try: item = self.__queue.get_nowait() # ждём данные
            except Queue.Empty: break

            try: self.work(item)
            except Exception: traceback.print_exc()

            time.sleep(0.5)
            self.__queue.task_done() # задача завершена
            self.__queue.put(item) # зациклим
        return
    def work(self,item):
        print item


def main():
    queue = Queue.Queue()
    num_threads = 5 # 5 потоков

    threads = []
    for x in xrange(100):
        queue.put(x) # заносим данные в очередь
    for i in xrange(num_threads):
        t = Worker(queue) # создаем нить
        threads.append(t)
        t.start() # стартуем
        time.sleep(0.1)


    # ждем пока в живых не останется только главный поток
    while threading.activeCount()>1:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            print 'Ctrl+C received! Sending kill to threads...'
            for t in threads:
                t.kill_received = True # даем сигнал о завершении всем потокам
    print "Done!"
if __name__ == '__main__':
    main()

Комментариев нет:

Отправить комментарий