Асинхронное выполнение процедур или триггера в MS SQL Server

После изменений данных иногда требуется дополнительно произвести какие то действия, это может быть пересчет каких то агрегаций, отправка сообщения и пр. Процессу вносившему изменения как правило важно знать только результат транзакции и не очень интересно ожидать выполнения дополнительных операций.
Асинхронный вызов можно организовать через очередь. В следующий раз напишу как это сделать через обычную таблицу и job, а сегодня о компоненте Service Broker.
У Service Broker много возможностей, он позволяет взаимодействовать и между сервера, но местами не так удобен и гибок, в итоге часто эффективней выносить такую логику за СУБД.
Но для простых и понятных задач вполне годная штука.

1. Создаем БД

IF DB_ID (N'DB_001') IS NOT NULL  
DROP DATABASE DB_001;  
GO  
CREATE DATABASE DB_001 

2. Включаем Service Broker

ALTER DATABASE [DB_001]
SET ENABLE_BROKER
with rollback IMMEDIATE; 

3. Создаем два типа сообщений

CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

4. Создаем контракт между ними

CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

5. Создаем очередь и сервисы

CREATE QUEUE SyncDataQueue;

CREATE SERVICE [SyncDataServiceInitiator]  ON QUEUE SyncDataQueue ([AsyncContract]);
CREATE SERVICE [SyncDataServiceTarget]  ON QUEUE SyncDataQueue ([AsyncContract]);

6. Процедура общего вида, с помощью которой можно отправлять сообщения на нужный сервис без ожидания ответа

CREATE PROCEDURE SendBrokerMessageNoreply
 @FromService SYSNAME,
 @ToService   SYSNAME,
 @Contract    SYSNAME,
 @MessageType SYSNAME,
 @MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
   BEGIN DIALOG CONVERSATION @conversation_handle
  FROM SERVICE @FromService
  TO SERVICE @ToService
  ON CONTRACT @Contract
  WITH ENCRYPTION = OFF;
 
   SEND ON CONVERSATION @conversation_handle
  MESSAGE TYPE @MessageType(@MessageBody);

   END CONVERSATION @conversation_handle;
 
  COMMIT TRANSACTION;
END

7. Добавим таблицу для тестирования куда будут падать сообщения

CREATE TABLE [dbo].[log_audit](
 [ID] [int] NOT NULL,
 [CDate] [date] NOT NULL DEFAULT (getdate())
) ON [PRIMARY]

8. Процедура которая будет принимать сообщения и обрабатывать их, в нашем случае писать в таблицу log_audit

CREATE PROCEDURE [dbo].[SyncDataQueueActivation]
AS
BEGIN
  SET NOCOUNT ON;

  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM [SyncDataQueue]
    ), TIMEOUT 500;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END


    IF @message_type_name = N'AsyncRequest'
    BEGIN
  DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
  
  INSERT INTO [dbo].[log_audit]([ID])
  VALUES(@AccountNumber)
    END
 
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
  --TODO
   --log errors
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END

9. Добавляем новую процедуру к очереди как обработчик

ALTER QUEUE [dbo].[SyncDataQueue]
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = SyncDataQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );

10. Отправляем сообщения

EXECUTE [SendBrokerMessage]
    @FromService = N'SyncDataServiceInitiator',
    @ToService   = N'SyncDataServiceTarget',
    @Contract    = N'AsyncContract',
    @MessageType = N'AsyncRequest',
    @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';


Сообщения можно отправлять из триггера или другой процедуры.


Комментарии

Популярные сообщения из этого блога

Рекурсивные SQL запросы

Кратко про SQLAlchemy Core