вторник, 11 декабря 2012 г.

Python Многопоточный сервер

Прежде чем мы приступим к разбору кода, работающего с потоками, нам нужно рассмотреть наиболее важную вещь — глобальную блокировку интерпретатора (global interpreter lock, GIL) Python. Если два или более потока попытаются манипулировать одним и тем же объектом в одно и то же время, то неизбежно возникнут проблемы. Глобальная блокировка интерпретатора исправляет это. В любой момент времени действия может выполнять только один поток. Python автоматически переключается между потоками, когда в этом возникает необходимость.

Использование модуля Threading.

Модуль threading предоставляет нам простой способ работы с потоками. Его класс Thread может быть унаследован (subclassed) для создания потока или нескольких потоков. Метод run должен содержать код, который вы желаете выполнить при выполнении потока. Звучит просто, не так ли? Вот, посмотрите:

import threading
class MyThread(threading.Thread):
    def run(self):
        print 'Insert some thread stuff here.'
        print 'It'll be executed...yeah....'
        print 'There's not much to it.'

Выполнить поток также просто. Всё, что нам нужно сделать, это создать экземпляр нашего класса потока, после чего вызвать его метод start:

import threading
class MyThread(threading.Thread):

    def run(self):
        print 'You called my start method, yeah.'
        print 'Were you expecting something amazing?'

 MyThread().start()

Конечно, всего один поток это не бог весть что. Как и люди, потоки через некоторое время остаются в одиночестве. Давайте создадим группу потоков:

import threading

theVar = 1

class MyThread(threading.Thread):
    def run ( self ):
        global theVar
        print 'This is thread ' + str(theVar) + ' speaking.'
        print 'Hello and good bye.'
        theVar = theVar + 1

for x in xrange ( 20 ):
    MyThread().start()

Давайте теперь сделаем с помощью модуля threading нечто условно-полезное. Сервера часто используют потоки для работы в одно и то же время с несколькими клиентами. Давайте создадим простой, но расширяемый сервер. Когда клиент подключится к нему, сервер создаст новый поток для обслуживания этого клиента. Чтобы отправлять потоку данные клиента нам понадобится перекрыть метод __init__ класса Thread, чтобы он принимал параметры. Отныне сервер будет отправлять поток своей дорогой и ждать новых клиентов. Каждый поток будет посылать упакованный (pickled) объект соответствующему клиенту, после чего печатать не более десяти строк, полученных от клиента. (Упакованный объект в общем случае является объектом, уменьшенным до нескольких символов. Это полезно при сохранении объектов для последующего использования или для передачи объектов по сети).

import pickle
import socket
import threading

# We'll pickle a list of numbers:
someList = [1, 2, 7, 9, 0]
pickledList = pickle.dumps(someList)

# Our thread class:
class ClientThread(threading.Thread):

    # Override Thread's __init__ method to accept the parameters needed:
    def __init__(self, channel, details):
        self.channel = channel
        self.details = details
        threading.Thread.__init__(self)

    def run(self):
        print 'Received connection:', self.details[0]
        self.channel.send(pickledList)
        for x in xrange(10):
            print self.channel.recv(1024)
        self.channel.close()
        print 'Closed connection:', self.details[0]

# Set up the server:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', 2727))
server.listen(5)

# Have the server serve "forever":
while True:
    channel, details = server.accept()
    ClientThread(channel, details).start()

Теперь нам нужно создать клиента, который будет подключаться к серверу, получать от него упакованный объект, распаковывать (reconstructs) объект и, наконец, посылать десять сообщений и закрывать соединение:

import pickle
import socket

# Connect to the server:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 2727))

# Retrieve and unpickle the list object:
print pickle.loads(client.recv(1024))

# Send some messages:
for x in xrange(10):
    client.send('Hey. ' + str(x) + '\n')

# Close the connection
client.close()

Конечно, приведённый выше клиент не в состоянии воспользоваться всеми преимуществами многопоточности нашего сервера. Клиент порождает только один поток, что делает многопоточность бессмысленной. Давайте добавим клиенту многопоточности, чтобы сделать всё более интересным. Каждый поток будет подключаться к серверу и выполнять приведённый выше код:

import pickle
import socket
import threading

# Here's our thread:
class ConnectionThread(threading.Thread):

    def run(self):
        # Connect to the server:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect (('localhost', 2727))

        # Retrieve and unpickle the list object:
        print pickle.loads(client.recv(1024))

        # Send some messages:
        for x in xrange(10):
            client.send('Hey. ' + str(x) + '\n')

        # Close the connection
        client.close()

# Let's spawn a few threads:
for x in xrange(5):
    ConnectionThread().start()

Пулы потоков (pooling threads).

Важно помнить, что потоки не появляются мгновенно. Создание большого их числа может замедлить ваше приложение. Чтобы создать поток и, позднее, уничтожить его, требуется время. Потоки могут также потреблять много ценных системных ресурсов в больших приложениях. Эта проблема легко решается путём создания ограниченного числа потоков (set number of threads) (пула потоков) и назначения им новых задач, в общем, повторного их использования. Соединения будут приниматься и передаваться тому потоку, который раньше всех закончит работу с предыдущим клиентом.

Если вы по-прежнему не понимаете, сравните это с больницей. Скажем, у нас есть пятеро врачей. Это наши потоки. Пациенты (клиенты) приходят в больницу и, если врачи заняты, сидят в приёмном покое.

Очевидно, нам нужно нечто, что сможет передавать данные клиента в наши потоки, не вызывая при этом проблем (оно должно быть потокобезопасным). Модуль Queue Python делает это для нас. Клиентская информация сохраняется в объекте Queue, откуда потоки извлекают её по мере надобности.

Давайте переделаем наш сервер, чтобы оценить преимущества пула потоков:

import pickle
import Queue
import socket
import threading

# We'll pickle a list of numbers, yet again:
someList = [1, 2, 7, 9, 0]
pickledList = pickle.dumps(someList)

# A revised version of our thread class:
class ClientThread(threading.Thread):

# Note that we do not override Thread's __init__ method.
# The Queue module makes this not necessary.

    def run(self):
        # Have our thread serve "forever":
        while True:
            # Get a client out of the queue
            client = clientPool.get()

            # Check if we actually have an actual client in the client variable:
            if client != None:
                print 'Received connection:', client[1][0]
                client[0].send(pickledList)
                for x in xrange(10):
                    print client[0].recv(1024)
                client[0].close()
                print 'Closed connection:', client[1][0]

# Create our Queue:
clientPool = Queue.Queue(0)

# Start two threads:
for x in xrange(2):
    ClientThread().start()
# Set up the server:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', 2727))
server.listen(5)

# Have the server serve "forever":
while True:
    clientPool.put(server.accept())

Как вы можете увидеть, он немного сложнее нашего предыдущего сервера, но не усложнён до полной непонятности. Для проверки этого сервера, так же, как и предыдущего, можно воспользоваться клиентом из предыдущего раздела.

Дополнительные хитрости.

Работа с потоками не заключается только в их создании и уничтожении. Класс Thread из модуля threading содержит ещё несколько методов, которые могут вам пригодиться. Первые два предназначены для именования потоков. Метод setName присваивает потоку имя, а метод getName возвращает имя потока:

import threading

class TestThread(threading.Thread):
    def run(self):
        print 'Hello, my name is', self.getName()

cazaril = TestThread()
cazaril.setName('Cazaril')
cazaril.start()

ista = TestThread()
ista.setName('Ista')
ista.start()

TestThread().start()

Ничего удивительного. Также, как вы можете видеть, у потоков есть имена, даже если вы их не задавали.

Мы также можем проверить, является ли поток живым, воспользовавшись методом isAlive. Если поток ещё не закончил выполняться, независимо от того, что происходит в его методе run, то он классифицируется как живой:

import threading
import time

class TestThread(threading.Thread):

    def run(self):
        print 'Patient: Doctor, am I going to die?'

class AnotherThread(TestThread):

    def run (self):
        TestThread.run(self)
        time.sleep(10)

dying = TestThread()
dying.start()
if dying.isAlive():
    print 'Doctor: No.'
else:
    print 'Doctor: Next!'

living = AnotherThread()
living.start()
if living.isAlive():
    print 'Doctor: No.'
else:
    print 'Doctor: Next!'

Второй поток остаётся в живых, поскольку мы заставили его ждать, воспользовавшись методом sleep модуля time.

Если нам нужно, чтобы поток дождался завершения другого потока, можно воспользоваться методом join:

import threading
import time

class ThreadOne(threading.Thread):

    def run(self):
        print 'Thread', self.getName(), 'started.'
        time.sleep ( 5 )
        print 'Thread', self.getName(), 'ended.'

class ThreadTwo(threading.Thread):

    def run(self):
        print 'Thread', self.getName(), 'started.'
        thingOne.join()
        print 'Thread', self.getName(), 'ended.'

thingOne = ThreadOne()
thingOne.start()

thingTwo = ThreadTwo()
thingTwo.start()

Мы также можем использовать метод setDaemon. Если при вызове в него передаётся значение True и другие потоки завершили своё исполнение, то из основной программы будет произведён выход, а поток продолжит работу:

import threading
import time

class DaemonThread(threading.Thread):

    def run(self):
        self.setDaemon(True)
        time.sleep(10)

DaemonThread().start()
print 'Leaving.'

Python также содержит модуль thread, работающий на более низком уровне, чем threading. Хочу обратить ваше внимание на одну особенность: это содержащаяся в нём функция start_new_thread. Используя её мы можем превратить обычную функцию в поток:

import thread

def thread(stuff):
    print "I'm a real boy!"
    print stuff

thread.start_new_thread(thread, ('Argument'))

Заключение.

О многопоточности можно рассказать значительно больше, чем я сделал в этой статье, но я не буду пытаться объять необъятное. Кроме того, как упомянул Гвидо ван Россум, преимущества, которые даёт сложная многопоточность в Python могут быть сведены на нет последствиями. Однако, небольшая доза здравого смысла может устранить большинство проблем в простой многопоточности.

Многопоточность очень важна, когда дело касается компьютерных приложений и, как я упоминал раньше, Python её поддерживает. При условии правильного использования, эффект от применения потоков может быть очень благотворным и часто даже критическим, как я подчеркивал в этой статье.

Комментариев нет:

Отправить комментарий