Технические данные
- Опубликовано:
- 22.05.2025
- Версия:
- 0.0.8
- Установлено:
- Менее 50 раз
- Подходящие редакции:
- «Первый сайт», «Старт», «Стандарт», «Малый бизнес», «Бизнес»
- Адаптивность:
- Нет
- Поддержка Композита:
- Нет
- Совместимо с Сайты24
- Нет
- Совместимо с PHP 8.1
- Да
Пользовательское соглашение
Описание
Обеспечивает интеграцию Битрикс с брокером сообщений RabbitMQ (AMQP) либо, при его отсутствии, реализует менеджер на базе используемой продуктом СуБД.
Разработан на основе пакета yii3 queue
Модуль предназначен для разработчиков, никаких компонентов/шаблонов для публичной части не предполагается.
Детали использования см. на вкладке "Установка"
Решение устанавливается на любые редакции БУС:
- Установите модуль штатным образом. Запись сообщений
- По умолчанию очередь обслуживается используемой продуктом СуБД (в процессе установки модуль создает таблицу в базе данных, в которую пишутся сообщения и обрабатываются в дальнейшем)
Для использования на базе брокера сообщений RabbitMQ необходимо внести изменение в конфигурацию приложения (файл .settings.php), добавив новое соединение в секцию `connections`.'rabbitmq' => [ 'className' => '\\InterMotion\\Queue\\Expansion\\Bitrix\\Main\\Data\\RabbitMQConnection', 'host' => "{rabbitmq_host}", 'port' => {rabbitmq_port}, 'username' => "{rabbitmq_username}", 'password' => "{rabbitmq_password}", ],
- Обработка сообщений
Предпочтительным является обработка сообщений, используя процесс-менеджер `supervisor`, альтернативным вариантом является использование менеджера фоновых задач - `crontab`.
$ sudo apt update && sudo apt install supervisor -y $ sudo systemctl enable --now supervisor $ sudo nano /etc/supervisor/conf.d/bitrix-worker.conf |
[program:bitrix-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/www/site.tld/bitrix/modules/intermotion.queue/tools/console.php queue:listen autostart=true autorestart=true stopasgroup=true killasgroup=true numprocs=5 redirect_stderr=true stdout_logfile=/var/log/bitrix-worker.log |
Параметр `numproc` отвечает за количество активных единомоментно процессов (воркеров) параллельно обрабатывающих очередь.
применяем конфигурацию:$ sudo supervisorctl reread $ sudo supervisorctl update # запускаем воркер $ sudo supervisorctl start bitrix-worker |
$ sudo supervisorctl status |
* * * * * /usr/bin/flock -w 1 php /var/www/site.tld/bitrix/modules/intermotion.queue/tools/console.php queue:run |
Использование штатных событий
Модуль реализует отложенное выполнения ряда штатных событий для элементов, разделов инфоблоков, товаров, их остатков и цен.
use Bitrix\Main; // элементы инфоблоков iblock:OnAfterIBlockElement<Add,Update,Delete>(array $fields) -> intermotion.queue:onAfterIblockElement<Add,Update,Delete>(new Main\Event $e { parameters: [elementId: {id}, iblockId: {iblockId}] }) -> intermotion.queue:onAfterIblockElementAction(Main\Event $e { parameters: [elementId: {id}, iblockId: {iblockId}, action: {add,update,delete}] }) // разделы инфоблоков iblock:OnAfterIBlockSection<Add,Update,Delete>(array $fields) -> intermotion.queue:onAfterIblockSection<Add,Update,Delete>(Main\Event $e { parameters: [elementId: {id}, iblockId: {iblockId}] }) -> intermotion.queue:onAfterIblockSectionAction(new Main\Event $e { parameters: [elementId: {id}, iblockId: {iblockId}, action: {add,update,delete}] }) // товары catalog:Bitrix\Catalog\Model\Product::OnAfter<Add,Update,Delete>(Main\Event $e) -> intermotion.queue:onAfterCatalogProduct<Add,Update,Delete>(Main\Event $e { parameters: [productId: {id}] }) -> intermotion.queue:onAfterCatalogProductAction(new Main\Event $e { parameters: [productId: {id}, action: {add,update,delete}] }) // цены catalog:Bitrix\Catalog\Model\Price::OnAfter<Add,Update,Delete>(Main\Event $e) -> intermotion.queue:onAfterCatalogProductPrice<Add,Update,Delete>(Main\Event $e { parameters: [productPriceId: {id}] }) -> intermotion.queue:onAfterCatalogProductPriceAction(new Main\Event $e { parameters: [productPriceId: {id}, action: {add,update,delete}] }) // остатки catalog:OnStoreProduct<Add,Update,Delete>(int $id) -> intermotion.queue:onAfterCatalogStorePrice<Add,Update,Delete>(Main\Event $e { parameters: [productStoreId: {id}] }) -> intermotion.queue:onAfterCatalogProductStoreAction(new Main\Event $e { parameters: [productStoreId: {id}, action: {add,update,delete}] }) |
Например, при создании элемента инфоблока необходимо выполнить сложные, ресурсозатратные операции. Если подписываться на стандартные события модуля "информационные блоки", то время, которое необходимо на просчет этих операций, будет "затормаживать" систему администрирования и прочие api-вызовы, подразумевающие сохранение элементов.
Это идеальный случай для использования "отложенных" выполнений событий.
use Bitrix\Main; use Bitrix\Iblock; use InterMotion\Queue; // вместо Main\EventManager::getInstance()->addEventHandler( 'iblock', 'OnAfterIBlockElementAdd', function(array $fields): void { try { if (!isset($fields['RESULT']) || !$fields['RESULT']) { throw new Main\NotSupportedException('Element not really saved'); } $elementId = (int) $fields['ID']; $iblockId = (int) $fields['IBLOCK_ID']; // сложные вычисления. } catch (\Throwable $exception) { } } ); // используем Main\EventManager::getInstance()->addEventHandler( Queue\Config::getModuleName(), 'OnAfterIBlockElementAdd', function(Main\Event $event): Main\EventResult { try { // обязательно необходимо проверить что элемент, соответсвующий идентификатору существует $elementId = $event->getParameter('elementId'); $iblockId = $event->getParameter('iblockId'); // сложные вычисления. } catch (\Throwable $exception) { } return (new Main\EventResult(Main\EventResult::SUCCESS); } ); |
ВАЖНО: все обработчики событий имеют равный приоритет выполнения и попадают в один общий канал (`channel`). Поэтому, следует не допускать перекоса по времени выполнения обработчиков. Например, есть 10 типов обработчиков, 9 из которых выполняются по 5-10 секунд, а 10-ый требует для выполнения одну минуту. Таким образом, в перспективе, 9 обработчиков будут ждать когда выполнится 10-ый. При условии, что событий очень много, может возникнуть ситуация, когда все время выполняются только 10-ые обработчики, а до первых 9 очередь просто не дойдет.
Решить данную проблему можно либо,
- увеличением количества процессов (см `supervisor`, параметр `numprocs`;
- созданием своих собственных событий, которые будут отправлять сообщения в отдельный канал и в последующем выделять отдельные воркеры на обработку только этого канала
Если предоставляемых модулем событий недостаточно для реализации необходимого функционала, есть возможность создавать свои собственные.
Для этого необходимо:
- создать задачу (унаследоваться от абстрактного класса `InterMotion\Queue\Job\Base`)
- зарегистрировать задание на событии `registerJob`
Рабочий пример задания - `lib/job/example.php`
namespace InterMotion\Queue\Job; use Psr; use YiiSoft\Queue as YiiQueue; // описание класса задания class Example extends Base { /** * @return string */ public function getChannel(): string { // в какой канал публикуем сообщение return 'default'; } /** * @return string */ public function getJobId(): string { // внутренний идентификатор задания, для того, чтобы при чтении сообщений из канала можно было идентифицировать обработчик // УНИКАЛЕН для каждого типа задания !! return 'intermotion-queue-job-example'; } /** * @param YiiQueue\Message\MessageInterface $message * @return void * @throws \Exception */ public function execute(YiiQueue\Message\MessageInterface $message): void { $logger = $this->getLogger(); try { $logger->info('job started', [ 'data' => $message->getData(), 'metadata' => $message->getMetadata(), 'handlerName' => $message->getHandlerName() ]); // do nothing // выполнение самой задачи $logger->info('job complete'); } catch (\Exception $exception) { $logger->error( $exception->getMessage(), [ 'trace' => $exception->getTrace() ] ); throw $exception; } } } // регистрируем задание Main\EventManager::getInstance()->addEventHandler( Queue\Config::getModuleName(), 'registerJob', function(Main\Event $event): Main\EventResult { return (new Main\EventResult(Main\EventResult::SUCCESS, [ 'JOBS' => [ // класс задания, можно зарегистрировать сразу несколько [ 'CLASS' => Queue\Job\Example::class ], ] ])); } ); |
Решение поддерживает консольные команды `Symfony Console`:
1. Обработка всех существующих сообщений в определенных каналах. Завершается, когда заканчиваются сообщения.
Используется для разработки / отладки / запуске на менеджере запуска заданий `crontab`
php bitrix/modules/intermotion.queue/tools/console.php queue:run [channel1 [channel2 [...]]] --maximum 100 |
Аргументы и опции:
- channel - перечень каналов, сообщения из которых необходимо извлечь. По умолчанию - из всех каналов
- maximum - максимальное количество для обработки В КАЖДОМ канале. По умолчанию 0 - все сообщений
Используется для процесс-менеджера `supervisor`
php bitrix/modules/intermotion.queue/tools/console.php queue:listen [channel1 [channel2 [...]]] |
Аргументы:
- channel - перечень каналов, сообщения в которых необходимо слушать. По умолчанию - из всех каналов
По умолчанию создается лог-файл `queue.log` в корне модуля.