Внимание! Прочитайте, пожалуйста, текст в правой колонке (внизу).
Внимание! Прочитайте, пожалуйста, текст в правой колонке (внизу). Внимание! Прочитайте, пожалуйста, текст в правой колонке (внизу). Homepage Карта сайта Версия для печати

Джентльменский набор Web-разработчика   Ларри Уолл о Perl6   Наблы Система Orphus
 

53. PostgreSQL и задача параллельной многопроцессной обработки очереди

[14 декабря 2008 г.] обсудить статью в форуме

Чайник 

Иногда встречается ситуация, которую можно обобщить фразой "задача параллельной обработки очереди". В данной статье я кратко расскажу, почему для эффективного решения этой задачи удобнее всего использовать PostgreSQL, а не какую-либо другую СУБД (типа MySQL или FireBird).

Предположим, у нас есть очередь из большого количества элементов (несколько десятков миллионов). Требуется обработать каждый элемент очереди, затратив на это минимальное время, при следующих допущениях:

  • аппаратные ресурсы (количество машин или процессоров) у нас не ограничены и могут наращиваться;
  • среда выполнения ненадежна (обработчик элемента может в любой момент "умереть", что не должно привести к остановке обработки очереди или потере элементов).

Пример очереди: подписчики

Представим очередь таблицей PostgreSQL. Пусть это будет, например, таблица подписчиков на различные рассылки проекта, содержащая около 10 млн элементов:

Листинг 1
CREATE TABLE subscription(
  id INTEGER,                — первичный ключ
  user_id INTEGER,           — ссылка на пользователя
  subscription_type INTEGER, — тип подписки: новости, события друзей и т. д.
  last_mail_at TIMESTAMP     — когда было отправлено письмо в предущий раз?
);

Обработка каждого элемента — это отправка письма соответствующего типа очередному пользователю. Ясно, что ускорить работу можно, запустив сразу несколько процессов-обработчиков, каждый из которых будет брать очередной элемент и посылать для него письмо, сохраняя в last_mail_at время отправки.

Пусть мы инициируем новую рассылку в момент времени MAILING_STARTED_AT. Давайте предположим, что мы запускаем 50 параллельных процессов, каждый из которых берет очередной необработанный элемент из очереди (last_mail_at < MAILING_STARTED_AT; считаем, что каждый из процессов имеет возможность получить значение MAILING_STARTED_AT, т.е. оно константно после инициации рассылки). Далее он обрабатывает его, а затем — записывает в last_mail_at текущее время.

Чайник 

Дело пойдет быстрее, если эти 50 процессов будут запущены сразу на нескольких физических машинах. Но с точки зрения архитектуры и отладки все равно, работают ли они в рамках одной машины или нескольких.

SQL-запрос на выборку очередного элемента может выглядеть так:

Листинг 2
SELECT id
FROM subscription
WHERE
  last_mail_at < MAILING_STARTED_AT
  AND subscription_type = <тип подписки>
LIMIT 1

Самое сложное в этой задаче — добиться с достаточной степенью производительности, чтобы один и тот же элемент в очереди не был захвачен несколькими процессами одновременно. В противном случае возможна ситуация отправки одного и того же письма дважды, что недопустимо.

Как делать не надо

Давайте сразу отбросим плохо работающие варианты.

  • Идея извлекать на каждой итерации по одному элементу довольно плоха с точки зрения нагрузки на СУБД. Представьте, что у вас 50 процессов, каждый рассылает по 5 писем в секунду. Мы получаем в сумме 250 запросов в секунду на СУБД, что хоть и не так много, но все же уже ощутимо.
  • Не годится блокировать всю таблицу (LOCK TABLE) на время извлечения элемента, т.к. это заставляет остальные процессы ждать.
  • Не подходит пометка очередного элемента обработанным (last_mail_at := CURRENT_TIMESTAMP) сразу же после его извлечения, потому что:
    1. Между извлечением элемента и его пометкой может "вклиниться" другой процесс.
    2. Чтобы избежать (1), можно делать запрос вида SELECT FOR UPDATE, блокирующий запись до момента ее обновления. Однако в этом случае все другие процессы, "наткнувшись" на заблокированный элемент, будут ждать, пока он не разблокируется, хотя он им вовсе и не нужен.
    3. Вообще, помечать элемент обработанным мы имеем право только после того, как убедились: письмо было реально отправлено. Вспомните о ненадежности среды: если процесс внезапно "умрет" (а это всегда рано или поздно происходит на практике: мир несовершенен, пакеты в сети теряются, сисадмин выполняет команду killall или перезапускает машину и т. д.), то рассылка не должна пострадать.
  • Довольно плох вариант брать элементы из очереди по принципу "расчески". Например, если у нас 10 процессов, то первый мог бы взять те элементы, номера которых при делении на 10 дают 0, второй — которые дают 1 и т. д. Это плохо потому, что процессы могут в любой момент "умереть" (помните о ненадежности среды?), а также запускаются на разных физических машинах, часть из которых может быть в любой момент выключена без ущерба для рассылки (но с ущербом только для ее скорости). В общем, довольно сложно точно контролировать текущее количество процессов-обработчиков, а именно точность и нужна для метода "расчески".

Работа с очередями в PostgreSQL

И вот тут на сцену выходит PostgreSQL. В нем есть одна возможность, которой нет, например, в MySQL и FireBird — множественная рекомендательная блокировка объектов (функции pg_try_advisory_lock() и pg_advisory_unlock()).

Для начала договоримся, что будем извлекать элементы не по одному, а целыми пачками по 100 шт. Это снизит число SQL-запросов на выборку в 100 раз и примерно во столько же раз улучшит производительность:

Листинг 3
SELECT id
FROM subscription
WHERE
  last_mail_at < MAILING_STARTED_AT
  AND subscription_type = <тип подписки>
LIMIT 100

Не забудьте создать индекс!

Естественно, чтобы этот запрос отрабатывал мгновенно, а не сканировал всю базу, нужно создать правильный индекс:

Листинг 4
CREATE INDEX subscription_idx ON subscription
  USING btree (subscription_type, last_mail_at, id)

После создания индекса проверим, что он действительно используется в нашем запросе, запустив EXPLAIN SELECT:

Листинг 5
QUERY PLAN
Limit (cost=0.00..679.00 rows=100 width=4)
  -> Index Scan using subscription_idx on subscription 
     (cost=0.00..490539.67 rows=722449 width=4)
     Index Cond: ((subscription_type = <тип подписки>) AND 
     (last_mail_at < MAILING_STARTED_AT))

Итак, PostgreSQL выбирает первые 100 элементов индекса, а не сканирует всю таблицу целиком. Чего мы и добивались.

Собственно блокировка: pg_try_advisory_lock()

Теперь модифицируем наш запрос так, чтобы при его запуске на нескольких машинах результаты никогда не пересекались. Это делается так (псевдо-операция @IDS := ... означает, что данные нужно сохранить в некоторый массив вызывающего скрипта):

Листинг 6
/* Исходный запрос: получаем элементы-кандидаты на обработку. */
@IDS := ARRAY(
  SELECT id
  FROM subscription
  WHERE
    last_mail_at < MAILING_STARTED_AT
    AND subscription_type = <тип подписки>
    AND pg_try_advisory_lock(tableoid::INTEGER, id) /* вот она, блокировка! */
  LIMIT 100
);

/* Пост-проверка: отсеиваем тех, кто успел пометиться обработанным за */
/* время работы предыдущего запроса (см. ниже подробности про это). */
@IDS := ARRAY(
  SELECT id
  FROM subscription
  WHERE
    last_mail_at < MAILING_STARTED_AT
    AND id IN (@IDS)
);

Чайник 

Функция pg_try_advisory_lock() пытается "повесить" исключительную блокировку на некоторый "виртуальный" идентификтор, который описывается парой ее параметров. Если этот идентификатор уже заблокирован, она ничего не делает и возвращает false. Если же блокировку удалось установить, то функция возвращает true. Блокировка снимается, когда вызывается pg_advisory_unlock(), либо когда клиент отсоединяется от базы данных.

Посмотрим, насколько эффективно PostgreSQL выполняет этот запрос, запустив EXPLAIN ANALYZE:

Листинг 7
QUERY PLAN
Limit (cost=0.00..2047.39 rows=100 width=4) 
  (actual time=0.163..4.948 rows=100 loops=1)
  -> Index Scan using subscription_idx on subscription 
       (cost=0.00..493513.69 rows=241046 width=4) (actual time=0.161..4.393 rows=100 loops=1)
     Index Cond: ((subscription_type = <тип подписки>) AND (last_mail_at < MAILING_STARTED_AT))
     Filter: pg_try_advisory_lock((tableoid)::integer, id)
Total runtime: 5.270 ms

Мы видим, что запрос все еще "хороший": PostgreSQL идет по индексу, отбрасывая элементы, которые не удалось заблокировать.

Чайник 

В качестве первого параметра pg_try_advisory_lock() мы используем tableoid::integer, что является внутренним идентификатором таблицы subscription. Это значение отлично подходит в качестве изолированного "пространства имен" для блокировки.

Если теперь, не закрывая соединения, запустить этот же запрос в другом процессе, он вернет элементы, которые не были заблокированы в первом процессе, что нам и нужно.

Обработка элемента

Итак, у нас есть 100 значений id из таблицы subscription. В цикле для каждого из них выполняем следующие действия:

  1. Обрабатываем элемент (посылаем письмо).
  2. Обновляем поле last_mail_at у элемента, записывая туда текущее время.
  3. Вызываем pg_advisory_unlock() для элемента, чтобы освободить ресурсы.

Постпроверка при выборке: внимание, возможны проблемы!

Если вы реализуете алгоритм в точности таком виде, как описано выше, то с удивлением обнаружите, что иногда один и тот же элемент будет обработан дважды. Удивительно, но это действительно так! Давайте посмотрим на картинке, почему это происходит.

Итак:

  1. Процесс (А) запускает SELECT-запрос, который выбирает очередной элемент с условием last_mail_at < MAILING_STARTED_AT.
  2. В это самое мгновение ОС решает переключить процессор на другой процесс (B). Причем ОС совершенно наплевать на то, что она разорвала SQL-запрос в середине его исполнения.
  3. Новый процесс (B) радостно помечает очередной элемент обработанным. Он меняет значение last_mailing_at, а также (внимание!) снимает блокировку с элемента (pg_advisory_unlock).
  4. ОС наконец-то возвращает управление в первый процесс (A), который в середине SQL-запроса проверяет, может ли он получить блокировку на элемент (pg_try_advisory_lock). Конечно же, операция завершается успешно: ведь (B) ранее разблокировал запись (см. стрелку 1)

Чайник 

В действительности не обязательно, чтобы SQL-запрос разорвался прямо в середине, потому что PostgreSQL поддерживает версионирование записей в таблицах. Процесс (B) может перехватить управление после старта охватывающей транзакции, но до запуска SELECT. Т.е. вероятность сбоя в алгоритме возрастает.

К счастью, решить проблему очень легко: нужно после извлечения пачки элементов из очереди сделать повторную проверку, удовлетворяют ли они условию last_mail_at < MAILING_STARTED_AT:

Давайте посмотрим, почему это работает:

  • Стрелка 1: pg_advisory_unlock всегда предшествует успешному pg_try_advisory_lockСтрелка 2: запрос A1 всегда предшествует запросу A2 (т.к. они идут в одном соединеннии).
  • Вывод: запрос A2 гарантировано выполнится после окончания запроса B.

А значит, пост-проверка пройдет успешно, и в конце запроса A2 мы гарантировано получим необработанные элементы.

Обработка элементов в соответствии с их приоритетом

В примере выше мы обрабатываем элементы в произвольном порядке. (Этот порядок на практике идет по возрастанию last_mail_at, однако вряд ли такое поведение можно назвать полезным.) Как же быть, если элементы нужно брать в соответствии с определенным критерием — например, по порядку следования их id?

Вообще говоря, как только мы запускаем параллельную обработку, само понятие "порядка" становится весьма условным. Если же нужен порядок "приблизительный" (к примеру, по возрастанию id), напрашивается идея слегка изменить SQL-запрос выборки, добавив в него ORDER BY:

Листинг 8
SELECT id
FROM subscription
WHERE
  last_mail_at < MAILING_STARTED_AT
  AND subscription_type = <тип подписки>
  AND pg_try_advisory_lock(tableoid::INTEGER, id)
ORDER BY id /* не делайте так! */
LIMIT 100

К сожалению, этот способ не работает: невозможно создать такой индекс, который одновременно учитывал бы и условие "last_mail_at < MAILING_STARTED_AT" (либо даже "last_mail_at <> MAILING_STARTED_AT"), и критерий сортировки "ORDER BY id". (Подробности см. в документации PostgreSQL.)

Можно предложить следующее решение:

  1. Добавляем в таблицу поле mailing_num. В нем будем хранить номер рассылки, которая должна будет обработать этот элемент.
  2. Каждый раз, начиная очередную рассылку, мы выбираем для нее номер MAILING_NUM, отличный от номеров всех предыдущих рассылок. Затем выполняем запрос:

    Листинг 9
    UPDATE subscription SET mailing_num=MAILING_NUM
    subscription_type = <тип подписки>;

    Т.е. устанавливаем для всех элементов новый номер рассылки MAILING_NUM. Обратите внимание, что это очень "тяжелый" запрос: он затрагивает миллионы элементов, но зато выполняется только однократно — при старте новой рассылки.
  3. Формулируем запрос на выборку элементов так:

    Листинг 10
    SELECT id
    FROM subscription
    WHERE
      mailing_num = MAILING_NUM
      AND pg_try_advisory_lock(tableoid::INTEGER, id)
    ORDER BY id /* теперь так можно! */
    LIMIT 100

    Итак, мы переформулировали запрос, чтобы по нему можно было создать эффективный индекс:

    Листинг 11
    CREATE INDEX subscription_idx ON subscription
      USING btree (mailing_num, id)

Все, что мы сделали, — это избавились от условия "меньше" или "не равно" в WHERE-части запроса, за счет чего получили возможность создания эффективного индекса для ORDER BY.

Выводы

Обратите внимание, что ни одна из операций, используемых выше, не является блокирующей всю работу. Т.е. 50 наших процессов могут работать параллельно и никогда не ждут друг друга. Даже операция UPDATE не блокирует другие процессы, т.к. в PostgreSQL запись никогда не блокирует чтение. А значит, архитектура допускает практически неограниченное масштабирование: достаточно увеличить количество машин в 2 раза, чтобы примерно во столько же ускорить рассылку.

Нужно также заметить, что реализация данного алгоритма в MySQL заведомо обречена на провал. В этой СУБД хотя и есть операция GET_LOCK, но она может блокировать только одну запись в соединении (если вы попытаетесь заблокировать вторую, то первая будет освобождена).

Ну и последнее. Операция pg_try_advisory_lock() принципиально отличается от SELECT ... FOR UPDATE тем, что она не блокирует выполнение SELECT-операций в других процессах и, согласно документации, ничего не пишет на диск. А значит, работает в нашей задаче очень эффективно.

обсудить статью в форуме

 
Рекламный блок
   

На странице:
    53. PostgreSQL и задача параллельной многопроцессной обработки очереди
Пример очереди: подписчики
Как делать не надо
Работа с очередями в PostgreSQL
     Не забудьте создать индекс!
     Собственно блокировка: pg_try_advisory_lock()
     Обработка элемента
     Постпроверка при выборке: внимание, возможны проблемы!
Обработка элементов в соответствии с их приоритетом

Важное объявление:
    автор категорически против копирования и распространения в Интернете всех статей «Куроводства» с возрастом, меньшим 6 месяцев. Печальный опыт «расползания» чрезвычайно устаревших ошибочных версий статьи про Apache действительно объясняет такое решение.

Орфография на «Куроводстве»:
    если вы заметили орфографическую, стилистическую или другую ошибку на этой странице, просто выделите ошибку мышью и нажмите Ctrl+Enter. Выделенный текст будет немедленно отослан вебмастеру, а Вы даже ничего и не заметите — настолько быстро все произойдет.

На заметку:
    если вы уже вскипели насчет дизайна этой страницы, то присмотритесь повнимательнее к названию, почитайте FAQ, сходите по лебедевским местам, как это уже предлагалось выше. Можно ли считать пародию плагиатом? Надеюсь, что нет.

Параметры этой страницы
   
GZip

Ссылки от спонсоров
   


Дмитрий Котеров | 14 декабря 2008 г. ©1999-2016 | Генеральный спонсор: Хостинг «Джино» | Контакт Вернуться к оглавлению