Database
 sql >> Base de données >  >> RDS >> Database

Configuration de Service Broker pour le traitement asynchrone

Dans mon dernier article, j'ai parlé des avantages de la mise en œuvre du traitement asynchrone à l'aide de Service Broker dans SQL Server par rapport aux autres méthodes qui existent pour le traitement découplé des tâches longues. Dans cet article, nous allons passer en revue tous les composants qui doivent être configurés pour une configuration de base de Service Broker dans une seule base de données, et les considérations importantes pour la gestion des conversations entre les services de courtier. Pour commencer, nous devrons créer une base de données et l'activer pour l'utilisation de Service Broker :

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Configuration des composants du courtier

Les objets de base qui doivent être créés dans la base de données sont les types de message pour les messages, un contrat qui définit comment les messages seront envoyés entre les services, une file d'attente et le service initiateur, et une file d'attente et le service cible. De nombreux exemples en ligne pour Service Broker montrent une dénomination d'objet complexe pour les types de messages, les contrats et les services pour Service Broker. Cependant, il n'est pas nécessaire que les noms soient complexes, et des noms d'objets simples peuvent être utilisés pour n'importe lequel des objets.

Pour les messages, nous devrons créer un type de message pour la requête, qui s'appellera AsyncRequest , et un type de message pour le résultat, qui s'appellera AsyncResult . Les deux utiliseront XML qui sera validé comme étant correctement formé par les services du courtier pour envoyer et recevoir les données requises par les services.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

Le contrat spécifie que le AsyncRequest sera envoyé par le service initiateur au service cible et que le service cible renverra un AsyncResult message au service initiateur. Le contrat peut également spécifier plusieurs types de message pour l'initiateur et la cible, ou qu'un type de message spécifique peut être envoyé par n'importe quel service, si le traitement spécifique l'exige.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Pour chacun des services, une file d'attente doit être créée pour assurer le stockage des messages reçus par le service. Le service cible où la requête sera envoyée doit être créé en spécifiant le AsyncContract pour permettre l'envoi de messages au service. Dans ce cas, le service est nommé ProcessingService et sera créé sur la ProcessingQueue au sein de la base de données. Le service initiateur ne nécessite pas la spécification d'un contrat, ce qui lui permet uniquement de recevoir des messages en réponse à une conversation initiée à partir de lui.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Envoi d'un message pour traitement

Comme je l'ai expliqué dans l'article précédent, je préfère implémenter une procédure stockée wrapper pour envoyer un nouveau message à un service de courtage, afin qu'il puisse être modifié une fois pour adapter les performances si nécessaire. Cette procédure est un simple wrapper autour de la création d'une nouvelle conversation et de l'envoi du message au ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

En utilisant la procédure stockée wrapper, nous pouvons maintenant envoyer un message de test au ProcessingService pour valider que nous avons correctement configuré les services de courtage.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Traitement des messages

Bien que nous puissions traiter manuellement les messages de la ProcessingQueue , nous voudrons probablement que les messages soient traités automatiquement lorsqu'ils sont envoyés au ProcessingService . Pour ce faire, une procédure stockée d'activation doit être créée que nous testerons puis lierons ultérieurement à la file d'attente pour automatiser le traitement lors de l'activation de la file d'attente. Pour traiter un message, nous devons RECEIVE le message de la file d'attente dans une transaction, ainsi que le type de message et le descripteur de conversation pour le message. Le type de message garantit que la logique appropriée est appliquée au message en cours de traitement, et le descripteur de conversation permet de renvoyer une réponse au service initiateur lorsque le message a été traité.

Le RECEIVE La commande permet à un seul message ou à plusieurs messages au sein du même descripteur ou groupe de conversation d'être traités en une seule transaction. Pour traiter plusieurs messages, une variable de table doit être utilisée, ou pour effectuer un traitement de message unique, une variable locale peut être utilisée. La procédure d'activation ci-dessous récupère un seul message de la file d'attente, vérifie le type de message pour déterminer s'il s'agit d'un AsyncRequest message, puis exécute le processus de longue durée en fonction des informations de message reçues. S'il ne reçoit pas de message dans la boucle, il attendra jusqu'à 5 000 ms, ou 5 secondes, qu'un autre message entre dans la file d'attente avant de quitter la boucle et de terminer son exécution. Après avoir traité un message, il construit un AsyncResult message et le renvoie à l'initiateur sur le même descripteur de conversation que celui à partir duquel le message a été reçu. La procédure vérifie également le type de message pour déterminer si un EndDialog ou Error message a été reçu pour nettoyer la conversation en y mettant fin.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

La RequestQueue devra également traiter les messages qui lui sont envoyés, donc une procédure supplémentaire pour traiter le AsyncResult les messages renvoyés par la procédure ProcessingQueueActivation doivent être créés. Puisque nous savons que le message AsnycResult signifie que tout le travail de traitement est terminé, la conversation peut être terminée une fois que nous avons traité ce message, qui enverra un message EndDialog au ProcessingService, qui sera ensuite traité par sa procédure d'activation pour mettre fin au conversation en nettoyant tout et en évitant le feu et oubliez les problèmes qui surviennent lorsque les conversations se terminent correctement.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Tester les procédures

Avant d'automatiser le traitement de la file d'attente pour nos services, il est important de tester les procédures d'activation pour s'assurer qu'elles traitent les messages de manière appropriée et pour éviter qu'une file d'attente ne soit désactivée en cas d'erreur non gérée correctement. Puisqu'il y a déjà un message sur la ProcessingQueue le ProcessingQueueActivation procédure peut être exécutée pour traiter ce message. Gardez à l'esprit que le WAITFOR entraînera la fin de la procédure en 5 secondes, même si le message est traité immédiatement à partir de la file d'attente. Après traitement du message, nous pouvons vérifier que la procédure a bien fonctionné en interrogeant la RequestQueue pour voir si un AsyncResult message existe, puis nous pouvons vérifier que le RequestQueueActivation la procédure fonctionne correctement en l'exécutant.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatisation du traitement

À ce stade, tous les composants sont complets pour automatiser entièrement notre traitement. La seule chose qui reste est de lier les procédures d'activation à leurs files d'attente appropriées, puis d'envoyer un autre message de test pour valider qu'il est traité et qu'il ne reste rien dans les files d'attente par la suite.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Résumé

Les composants de base du traitement asynchrone automatisé dans SQL Server Service Broker peuvent être configurés dans une configuration de base de données unique pour permettre le traitement découplé des tâches de longue durée. Cela peut être un outil puissant pour améliorer les performances de l'application, à partir de l'expérience de l'utilisateur final, en dissociant le traitement des interactions de l'utilisateur final avec l'application.