вторник, 13 ноября 2012 г.

Grab:Spider - Python фреймворк для парсинга сайтов

Модуль Spider это фреймворк позволяющий описать парсер сайта как набор функций обработчиков, где каждый обработчик отвечает за специфичный тип запроса. Например, при парсинге форума у вас будут обработчики для главной страницы, страницы подфорума, страницы топика, страницы профиля участника. Изначально такая структура парсера была разработана в силу ограничений асинхронного режима, но впоследствии оказалось, что писать парсеры в таком структурированном виде (один запрос - одна функция) очень удобно.

Модуль Spider работает асинхронно. Это значит что всегда есть только один рабочий поток программы. Для множественных запросов не создаются ни треды, ни процессы. Все созданные запросы обрабатываются библиотекой multicurl. Суть асинхронного подхода в том, что программа создаёт сетевые запросы и ждёт сигналы о готовности ответа на эти запроссы. Как только готов ответ, то вызывается функция-обработчик, которую мы привязали к конкретному запросу. Асинхронный подход позволяет обрабатывать большее количество одновременных соединений чем подход, связанный с созданием тредов или процессов т.к. память занята всего одним процессом и процессору не нужно постоянно переключаться между множество процессов.

Есть один нюанс, который будет очень непривычен тем, кто привык работать в синхронном стиле. Асинхронный подход позволяет вызывать функции обработчики при готовности сетевого ответа. Если алгоритм парсинга состоит из нескольких последовательных сетевых запросов, то нужно где-то хранить информацию о том, для чего мы создали сетевой запрос и что с ним делать. Spider позволяет достаточно удобно решать эту проблему.

Каждая функция-обработчки получает два входных аргумента. Первый аргумент — это объект Grab, в котором хранится информация о сетевом ответе. Вся прелесть Spider модуля в том, что он сохранил знакомый вам интерфейс для работы с синхронными запросами. Второй аргумент функции-обработчика это Task объект. Task объекты создаются в Spideer для того, чтобы добавить в очередь сетевых запросов новое задание. С помощью Task объекта можно сохранять промежуточные данные между множественными запросами.

У threading-подхода к парсинга сайтов есть плюсы и минусы. Плюс в том, что мы запускаем отдельный поток(thread) и делаем в нём, что хотим: можем делать последовательно несколько сетевых вызовов и всё это в пределах одного контекста - никуда не надо переключаться, что-то запоминать и вспоминать. Минус в том, что треды тормозят и жрут память.

Какие альтернативы?

Работать с сетевыми ресурсами асинхронно. Есть только один поток выполнения программы, в которм выполняется вся логика обработки данных по мере готовности этих данных, сами данные загружаются асинхронно. На практике это позволяет не особо напрягаясь работать с сетью в несколько сотен потоков, если вы попробуете запустить столько тредов, то они будут нешуточно тормозить.

Так вот, я написал интерфейс к multicurl - это часть библиотеки pycurl, которая позволяет работать с сетью асинхронно. Я выбрал multicurl, потому что Grab использует pycurl и я подумал, что мне удастся использовать его и для работы с multicurl. Архитектура парсеров на базе Grab:Spider весьма похожа на парсеры на базе фреймворка scrapy, что, в общем, не удивительно и логично.

Приведу пример простейшего паука:

# coding: utf-8
from grab.spider import Spider, Task

class SimpleSpider(Spider):
    initial_urls = ['http://ya.ru']

    def task_initial(self, grab, task):
        grab.set_input('text', u'ночь')
        grab.submit(make_request=False)
        yield Task('search', grab=grab)

    def task_search(self, grab, task):
        for elem in grab.xpath_list('//h2/a'):
            print elem.text_content()


if __name__ == '__main__':
    bot = SimpleSpider()
    bot.run()
    print bot.render_stats()

Что тут происходит? Для каждого URL в "self.initial_urls" создаётся задание с именем "initial". После того, как multicurl скачивает документ, вызывается обработчик с именем "task_initial". Самое главное, это то, что внутри обработчика мы получаем Grab-объект связанный с запрошенным документом. В результате мы можем использовать практические любые функции из Grab API. В данном примере, мы используем его работу с формами. Обратите внимание, нам нужно указать параметр "make_request=False", чтобы форма не отсылалась тут же, ибо мы хотим, чтобы этот сетевой запрос был обработан асинхронно.

Работа с Grab:Spider сводится к генерации запросов с помощью Task объектов и дальнейшей их обработке в специальных методах. У каждого задания есть имя, именно по нему потом выбирается метод для обработки запрошенного сетевого документа.

Создать Task объект можно двумя способами. Простой способ:

Task('foo', url='http://google.com')

После того, как документ будет полностью скачан из сети, будет вызван метод с именем "task_foo".

Более сложный способ:

g = Grab()
g.setup(....настраиваем запрос как угодно...)
Task('foo', grab=g)

Этим способом мы можем настроить параметры запроса в соответствии с нашими нуждами: выставить куки, специальные заголовки, сгенерировать POST-запрос.

В каких местах можно создавать запросы?
В любом методе-обработчике можно сделать yield Task объекта и он будет добавлен в асинхроннную очередь для скачивания. Также можно вернуть Task объект через return. Кроме того есть ещё два пути генерации Task объектов.

1) Можно указать в аттрибуте "self.initial_urls" список адресов и для них будут созданы задания с именем "initial".

2) Можно определить метод "task_generator" и yield'ить в нём сколько угодно запросов. Причём новые запросы из него будут браться по мере выполнения старых. Это позволяет без проблем проитерировать по миллиону строк из файла и не засорять ими всю память.

Первоначально я планировал сделать обработку извлечённых данных как в scrapy. Там это сделано с помощю Pipeline-объектов. Например, вы получили страницу с фильмом, пропарсили её и вернули Pipeline объект с типом Movie. А ещё предварительно вы написали в конфиге что Movie Pipeline должен сохраняться в базу данных или в CSV-файл. Как-то так. На практике оказалось, что проще не заморачиваться с дополнительной обёрткой и писать данные в БД или в файл сразу в методе обработчике запроса. Конечно, это не будет работать в случае распараллеливания методов по облаку машин, но до этого момента ещё надо дожить, а пока удобнее делать всё непосредственно в методе обработчике.

Task-объекту можно передавать дополнительные аргументы. Например, мы делаем запрос в google поиск. Формируем нужный url и создаём Task объект: Task('search', url='...', query=query) Далее в методе "task_search" мы сможем узнать какой именно запрос мы искали, обратившись к аттрибуту "task.query"

Grab:spider автоматически пытается исправить сетевые ошибки. В случае network timeout он выполняет задание ещё раз. Количество попыток вы можете настраивать с помощью опции "network_try_limit" при создании Spider объекта.

Надо сказать, что писать парсеры в асинхронном стиле мне очень понравилось. И дело не только в том, что асинхронный подход меньше нагружает ресурсы системы, но также в том, что исходный код парсера приобретает чёткую и понятную структуру.

Если вы используете Grab, то оцените поглядите модуль spider. Возможно, вам понравится. Если вы не знаете, что такое Grab, возможно вам лучше обратить внимание на фреймворк scrapy, так как он документирован в сто крат краше нежели Grab.

Рассмотрим пример простого парсера. Допустим, мы хотим зайти на сайт habrahabr.ru, считать заголовки последних новостей, далее для каждого заголовка найти картинку с помощью images.yandex.ru и сохранить полученные данные в файл:

# coding: utf-8
import urllib
import csv
import logging

from grab.spider import Spider, Task

class ExampleSpider(Spider):
    # Список страниц, с которых Spider начнёт работу
    # для каждого адреса в этом списке будет сгенерировано
    # задание с именем initial
    initial_urls = ['http://habrahabr.ru/']

    def prepare(self):
        # Подготовим файл для записи результатов
        # Функция prepare вызываетя один раз перед началом
        # работы парсера
        self.result_file = csv.writer(open('result.txt', 'w'))
        # Этот счётчик будем использовать для нумерации
        # найденных картинок, чтобы создавать им простые имена файлов.
        self.result_counter = 0

    def task_initial(self, grab, task):
        print 'Habrahabr home page'

        # Это функция обработчик для заданий с именем initial
        # т.е. для тех заданий, чтобы были созданы для
        # адреов указанных в self.initial_urls

        # Как видите интерфейс работы с ответом такой же
        # как и в обычном Grab
        for elem in grab.xpath_list('//h1[@class="title"]/a[@class="post_title"]'):
            # Для каждой ссылки-заголовка создадим новое задание
            # с именем habrapost
            # Обратите внимание, что мы создаём задания с помощью
            # вызова yield - это сделано исключительно ради красоты
            # По-сути это равносильно следующему коду:
            # self.add_task(Task('habrapost', url=...))
            yield Task('habrapost', url=elem.get('href'))

    def task_habrapost(self, grab, task):
        print 'Habrahabr topic: %s' % task.url

        # Эта функция, как вы уже догадываетесь
        # получает результаты обработки запросов, кооторые
        # мы создали для кадого хабратопика, найденного на
        # главной странице хабры

        # Для начала сохраним адрес и заголовк топика в массив
        post = {
            'url': task.url,
            'title': grab.xpath_text('//h1/span[@class="post_title"]'),
        }

        # Теперь создадим запрос к поиску картинок яндекса, обратите внимание,
        # что мы передаём объекту Task информацию о хабрапосте. Таким образом
        # в функции обработки поиска картинок мы будем знать, для какого именно
        # хабрапоста мы получили результат поиска картинки. Дело в том, что все
        # нестандартные аргументы конструктора Task просто запоминаются в созданном
        # объекте и доступны в дальнейшем как его атррибуты
        query = urllib.quote_plus(post['title'].encode('utf-8'))
        search_url = 'http://images.yandex.ru/yandsearch?text=%s&rpt=image' % query
        yield Task('image_search', url=search_url, post=post)

    def task_image_search(self, grab, task):
        print 'Images search result for %s' % task.post['title']

        # В этой функции мы получили результат обработки поиска картинок, но
        # это ещё не сама картинка! Это только список найденных картинок,
        # Теперь возьмём адрес первой картинки и создадим задание для её
        # скачивания. Не забудем передать информацию о хабрапосте, для которого
        # мы ищем картинку, эта информация хранится в `task.post`.
        image_url = grab.xpath_text('//div[@class="b-image"]/a/img/@src')
        yield Task('image', url=image_url, post=task.post)

    def task_image(self, grab, task):
        print 'Image downloaded for %s' % task.post['title']

        # Это последнняя функция в нашем парсере.
        # Картинка получена, можно сохранить результат.
        path = 'images/%s.jpg' % self.result_counter
        grab.response.save(path)
        self.result_file.writerow([
            task.post['url'].encode('utf-8'),
            task.post['title'].encode('utf-8'),
            path
        ])
        # Не забудем увеличить счётчик ответов, чтобы
        # следующая картинка записалась в другой файл
        self.result_counter += 1


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    # Запустим парсер в многопоточном режиме - два потока
    # Можно больше, только вас яндекс забанит
    # Он вас и с двумя то потоками забанит, если много будете его беспокоить
    bot = ExampleSpider(thread_number=2)
    bot.run()


Пример кода реального парсера для парсинга сайта.

from grab.spider import Spider, Task, Data
from grab.tools.logs import default_logging
from grab import Grab
import pymongo
from hashlib import sha1
import os
from grab.tools.rex import rex_cache

db = pymongo.Connection()['bestflashgames']

class FlashSpider(Spider):
    initial_urls = ['http://www.bestflashgames.com/categorieslist/']

    def prepare(self):
        self.parsed_games = []

    def get_id(self, url):
        return url.rstrip('/').split('/')[-1]

    def task_initial(self, grab, task):
        for url in grab.tree.xpath('//div[@class="figure"]/a/@href'):
            yield Task('category', url=grab.make_url_absolute(url))

    def task_category(self, grab, task):
        # Integrity
        cid = self.get_id(task.url)
        category = {'_id': cid}
        category = db.category.find_one(category) or category
        category['title'] = grab.css_text('.gallery b font')

        # Save, logging
        db.category.save(category)
        print category['title']

        # Task to parse list of games
        yield Task('category_page', url=task.url, category=category)

    def task_category_page(self, grab, task):
        # Games list
        for url in grab.tree.xpath('//div[@class="figure"]/a/@href'):
            url = grab.make_url_absolute(url, resolve_base=True)
            yield Task('game', url=url, category=task.category)

        ## Next page
        nav = grab.css('.navigation-list a.right', None)
        if nav is not None:
            yield Task(
               'category_page',
               url=grab.make_url_absolute(nav.get('href'), resolve_base=True),
               category=task.category)

    def task_game(self, grab, task):
        # Integrity
        if grab.xpath_exists('//strong[contains(text(), "Not found")]'):
            print 'GAME NOT FOUND'
            return
        gid = self.get_id(task.url)
        game = {'_id': gid}
        if game['_id'] in self.parsed_games:
            print 'Already parsed in this session'
            return
        game = db.game.find_one(game) or game
        game['title'] = grab.css_text('.head p span')

        # Parse categories
        cats = grab.tree.xpath('//div[@class="head"]/ul[1]/li/a/@href')
        game['categories'] = [self.get_id(x) for x in cats]
        game['description'] = grab.xpath_text(
            '//div[@class="post"]/b[text()="Description:"]/../text()', '')
        game['image_url'] = grab.xpath_text('//div[@class="code"]//img/@src', '')
        game['gameid'] = grab.rex_text(rex_cache('gameid=(\d+)'))
        game['url'] = task.url

        # Logging
        print 'GAME', game['title']
        print game['categories']
        print game['description']
        print game['image_url']

        # Save
        db.game.save(game)
        self.parsed_games.append(game['_id'])

        # Task to save game's image
        if not 'image' in game:
            yield Task('game_image', url=game['image_url'], game=game,
                       disable_cache=True)

        yield Task('swf', url='http://www.bestflashgames.com/site/getgame.php?id=%s' % game['gameid'],
                   game=game)

    def task_game_image(self, grab, task):
        # Show activity
        print 'DOWNLOAD %s' % task.url

        # Calculate hash from URL
        img_hash = sha1(task.url).hexdigest()
        img_dir = 'static/game/%s/%s' % (img_hash[:2], img_hash[2:4])

        # Prepare directory
        try:
            os.makedirs(img_dir)
        except OSError:
            pass

        # Find extension
        ext = task.url.split('.')[-1]
        if len(ext) > 4:
            ext = 'bin'

        # Save file
        img_path = os.path.join(img_dir, '%s.%s' % (img_hash, ext))
        grab.response.save(img_path)

        task.game['image'] = img_path
        db.game.save(task.game)

    def task_swf(self, grab, task):
        print 'SWF GATE', task.url
        try:
            url = grab.rex_text(rex_cache(
                'show_flash\(\'([^\']+)'))
        except IndexError:
            try:
                url = grab.rex_text(rex_cache(
                    'name="movie" value="(http://[^"]+)'))
            except IndexError:
                try:
                    url = grab.rex_text(rex_cache(
                        '<embed src="(http[^"]+)'))
                except IndexError, ex:
                    try:
                        url = grab.rex_text(rex_cache(
                            '<iframe src="(http[^"]+)'))
                    except IndexError, ex:
                        url = ''


        task.game['swf'] = url
        db.game.save(task.game)

class SwfSizeSpider(FlashSpider):
    initial_urls = None
    size = 0

    def task_generator(self):
        for game in db.game.find({'swf': {'$ne': ''}}):
            g = Grab()
            g.setup(url=game['swf'], method='head')
            yield Task('swf', grab=g, game=game, disable_cache=True)

    def task_swf(self, grab, task):
        size = int(grab.response.headers.get('Content-Length', 0))
        task.game['swf_size'] = size
        db.game.save(task.game)
        print size

    def shutdown(self):
        print 'Total size', self.size / float((1024 * 1024))

if __name__ == '__main__':
    default_logging()
    bot = SwfSizeSpider(
        thread_number=10,
        cache_db='bestflashgames',
        use_cache=True)
    bot.setup_proxylist('var/proxy.txt', 'http', auto_change=True)
    try:
        bot.run()
    except KeyboardInterrupt:
        pass
    print bot.render_stats()


Немного про инструментарий который помогает в работе.

В качестве рабочего браузера я использую FireFox с плагинами HttpFox (позволяет анализировать входящий/исходящий http-трафик), XPather (позволяет проверять xpath выражения), SQLite Manager (просмотр sqlite таблиц), код набираю в emacs, где активно использую сниппеты (YASnippets) для часто встречающихся конструкций.

Из-за специфики фрэймворка  на первом этапе сайт полностью (или если данных много то частично) сохраняется в локальный кэш на базе mongodb, что очень экономит время, так как считывание страниц идет из кэша.

Для работы с sql базами куда, как правило (реже в json/xml), нужно разложить данные мы используем ORM - SQLAlchemy.

Собственно сам фрэймворк Grab предполагает большую гибкость в построении проекта и контроль за своими действиями. Однако, последние несколько проектов хорошо ложились в следующую структуру, отлично знакомую тем кто занимается веб-разработкой:

1) models.py - описываю модели данных.
2) config.py - аналог settings.py из мира Django: настройки, инициализация orm.
3) /spiders/*.py - код пауков.
4) spider.py или project_name.py - главный файл проекта, по совместительству обычно реализует command-line интерфейс для запуска различных пауков, так как зачастую сайт парсится по частям.

В качестве примера не сильно оторванного от реальной жизни напишем парсер «Trending projects» и «Most popular Python projects» c open-source цитадели GitHub.

Сперва нужно описать модель.

class Item(Base):
    __tablename__ = 'item'

    sqlite_autoincrement = True
    id = Column(Integer, primary_key=True)

    title = Column(String(160))
    author = Column(String(160))
    description = Column(String(255))
    url = Column(String(160))

    last_update = Column(DateTime, default=datetime.datetime.now)

Далее, в файле config.py выполняется начальная инициализация orm, создание таблиц, константы и находится функция которая конструирует параметры запуска паука в зависимости от настроек (default_spider_params), которая обычно общая для всех пауков в проекте.

def init_engine():
    db_engine = create_engine(
        'sqlite+pysqlite:///data.sqlite', encoding='utf-8')
    Base.metadata.create_all(db_engine)
    return db_engine
   
db_engine = init_engine()
Session = sessionmaker(bind=db_engine)

def default_spider_params():
    params = {
        'thread_number': MAX_THREADS,
        'network_try_limit': 20,
        'task_try_limit': 20,
    }
    if USE_CACHE:
        params.update({
            'thread_number': 3,
            'use_cache': True,
            'cache_db': CACHE_DB,
            'debug_error' :True,
        })
       
    return params

В большинстве случаев нет необходимости использовать mongodb на сервере, поэтому удобно сделать кэш отключаемым. При деплое проекта я просто ставлю USE_CACHE = False и все отлично работает. SAVE_TO_DB используется чтобы резрешить/запретить запись данных в базу данных.

Собственно переходим к самому интересному у нас будет 2-а паука: первый будет парсить 5 репозиториев «Top Trending» проектов, а второй «Most watched Python».

Явно видно, что у этих пауков есть общие части, которые можно и нужно вынести в отдельный, базовый класс и наследовать уже от него, что уменьшает код, упрощает поддержку и делает программу более удобной для восприятия. В более-менее сложном проекте, где есть большое кол-во немного отличающихся друг от друга страниц необходимость выносить часть функционала в суперклассы возникает постоянно.

Не будем пренебрегать ООП и напишем BaseHubSpider в котором определим 2-а метода save() и log_progress().

class BaseHubSpider(Spider):
    initial_urls = ['http://github.com']

    items_total = 0

    def save(self, data):
        if not SAVE_TO_DB:
            return
           
        session = Session()

        if not session.query(Item).filter_by(title=data['title']).first():
            obj = Item(**data)
            session.add(obj)
        session.commit()

    def log_progress(self, str):
        self.items_total += 1
        print "(%d) Item scraped: %s" % (self.items_total, str)

В реальном приложении весьма вероятно наличие функции разбора страницы в зависимости от каких-то параметров - названий полей которые на каждой странице разные в то время как xpath путь к ним практически одинаковый и так далее.

Например, как-нибудь так (это не рабочий пример, а просто иллюстрация для лучшего понимания):

    XPATH = u'//table[@class="standart-table table"]' + \
            u'//tr[th[text() = "%s"]]/td'

    values = (
        ('title', u'Наименование товара'),
        ('rating', u'Рейтинг'),
        ('categories', u'Категория товара'),
        ('description', u'Описание'),       
    )
   
    for db_field, field_title in values:
        try:
            data[db_field] = get_node_text(grab.xpath(
                XPATH % field_title, None)).strip()
        except AttributeError:
            data[db_field] = ''

https://github.com/istinspring/grab-default-project-example/blob/master/spiders/lang_python.py

Код паука который парсит и сохраняет в базу данных 20 самых популярных python проектов.

Обратите внимание на

        repos = grab.xpath_list(
            '//table[@class="repo"]//tr/td[@class="title"]/..')
        for repo in repos:
            data = {
                'author': repo.xpath('./td[@class="owner"]/a/text()')[0],
                'title': repo.xpath('./td[@class="title"]/a/text()')[0],}

repos = grab.xpath_list('') - возвращает список lxml объект, в то время как например grab.xpath('') возвращает первый элемент, так как xpath в данном случае метод объекта grab, то есть оперируя в цикле repo.xpath('./h3/a[1]/text()') - мы получаем список или исключение если lxml не смог найти xpath. Проще говоря, xpath от объекта grab и xpath от lxml объекта — разные вещи, в первом случае вернется первый элемент (или default или бросит exception), а во втором вернется список элементов ['something'].

Комментариев нет:

Отправить комментарий