Dans mon dernier article, j'ai parlé des avantages de la mise en œuvre du traitement asynchrone à l'aide de Service Broker dans SQL Server par rapport aux autres méthodes qui existent pour le traitement découplé des tâches longues. Dans cet article, nous allons passer en revue tous les composants qui doivent être configurés pour une configuration de base de Service Broker dans une seule base de données, et les considérations importantes pour la gestion des conversations entre les services de courtier. Pour commencer, nous devrons créer une base de données et l'activer pour l'utilisation de Service Broker :
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Configuration des composants du courtier
Les objets de base qui doivent être créés dans la base de données sont les types de message pour les messages, un contrat qui définit comment les messages seront envoyés entre les services, une file d'attente et le service initiateur, et une file d'attente et le service cible. De nombreux exemples en ligne pour Service Broker montrent une dénomination d'objet complexe pour les types de messages, les contrats et les services pour Service Broker. Cependant, il n'est pas nécessaire que les noms soient complexes, et des noms d'objets simples peuvent être utilisés pour n'importe lequel des objets.
Pour les messages, nous devrons créer un type de message pour la requête, qui s'appellera AsyncRequest
, et un type de message pour le résultat, qui s'appellera AsyncResult
. Les deux utiliseront XML qui sera validé comme étant correctement formé par les services du courtier pour envoyer et recevoir les données requises par les services.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Le contrat spécifie que le AsyncRequest
sera envoyé par le service initiateur au service cible et que le service cible renverra un AsyncResult
message au service initiateur. Le contrat peut également spécifier plusieurs types de message pour l'initiateur et la cible, ou qu'un type de message spécifique peut être envoyé par n'importe quel service, si le traitement spécifique l'exige.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Pour chacun des services, une file d'attente doit être créée pour assurer le stockage des messages reçus par le service. Le service cible où la requête sera envoyée doit être créé en spécifiant le AsyncContract
pour permettre l'envoi de messages au service. Dans ce cas, le service est nommé ProcessingService
et sera créé sur la ProcessingQueue
au sein de la base de données. Le service initiateur ne nécessite pas la spécification d'un contrat, ce qui lui permet uniquement de recevoir des messages en réponse à une conversation initiée à partir de lui.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Envoi d'un message pour traitement
Comme je l'ai expliqué dans l'article précédent, je préfère implémenter une procédure stockée wrapper pour envoyer un nouveau message à un service de courtage, afin qu'il puisse être modifié une fois pour adapter les performances si nécessaire. Cette procédure est un simple wrapper autour de la création d'une nouvelle conversation et de l'envoi du message au ProcessingService
.
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
En utilisant la procédure stockée wrapper, nous pouvons maintenant envoyer un message de test au ProcessingService
pour valider que nous avons correctement configuré les services de courtage.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Traitement des messages
Bien que nous puissions traiter manuellement les messages de la ProcessingQueue
, nous voudrons probablement que les messages soient traités automatiquement lorsqu'ils sont envoyés au ProcessingService
. Pour ce faire, une procédure stockée d'activation doit être créée que nous testerons puis lierons ultérieurement à la file d'attente pour automatiser le traitement lors de l'activation de la file d'attente. Pour traiter un message, nous devons RECEIVE
le message de la file d'attente dans une transaction, ainsi que le type de message et le descripteur de conversation pour le message. Le type de message garantit que la logique appropriée est appliquée au message en cours de traitement, et le descripteur de conversation permet de renvoyer une réponse au service initiateur lorsque le message a été traité.
Le RECEIVE
La commande permet à un seul message ou à plusieurs messages au sein du même descripteur ou groupe de conversation d'être traités en une seule transaction. Pour traiter plusieurs messages, une variable de table doit être utilisée, ou pour effectuer un traitement de message unique, une variable locale peut être utilisée. La procédure d'activation ci-dessous récupère un seul message de la file d'attente, vérifie le type de message pour déterminer s'il s'agit d'un AsyncRequest
message, puis exécute le processus de longue durée en fonction des informations de message reçues. S'il ne reçoit pas de message dans la boucle, il attendra jusqu'à 5 000 ms, ou 5 secondes, qu'un autre message entre dans la file d'attente avant de quitter la boucle et de terminer son exécution. Après avoir traité un message, il construit un AsyncResult
message et le renvoie à l'initiateur sur le même descripteur de conversation que celui à partir duquel le message a été reçu. La procédure vérifie également le type de message pour déterminer si un EndDialog
ou Error
message a été reçu pour nettoyer la conversation en y mettant fin.
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
La RequestQueue
devra également traiter les messages qui lui sont envoyés, donc une procédure supplémentaire pour traiter le AsyncResult
les messages renvoyés par la procédure ProcessingQueueActivation doivent être créés. Puisque nous savons que le message AsnycResult signifie que tout le travail de traitement est terminé, la conversation peut être terminée une fois que nous avons traité ce message, qui enverra un message EndDialog au ProcessingService, qui sera ensuite traité par sa procédure d'activation pour mettre fin au conversation en nettoyant tout et en évitant le feu et oubliez les problèmes qui surviennent lorsque les conversations se terminent correctement.
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Tester les procédures
Avant d'automatiser le traitement de la file d'attente pour nos services, il est important de tester les procédures d'activation pour s'assurer qu'elles traitent les messages de manière appropriée et pour éviter qu'une file d'attente ne soit désactivée en cas d'erreur non gérée correctement. Puisqu'il y a déjà un message sur la ProcessingQueue
le ProcessingQueueActivation
procédure peut être exécutée pour traiter ce message. Gardez à l'esprit que le WAITFOR
entraînera la fin de la procédure en 5 secondes, même si le message est traité immédiatement à partir de la file d'attente. Après traitement du message, nous pouvons vérifier que la procédure a bien fonctionné en interrogeant la RequestQueue
pour voir si un AsyncResult
message existe, puis nous pouvons vérifier que le RequestQueueActivation
la procédure fonctionne correctement en l'exécutant.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Automatisation du traitement
À ce stade, tous les composants sont complets pour automatiser entièrement notre traitement. La seule chose qui reste est de lier les procédures d'activation à leurs files d'attente appropriées, puis d'envoyer un autre message de test pour valider qu'il est traité et qu'il ne reste rien dans les files d'attente par la suite.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Résumé
Les composants de base du traitement asynchrone automatisé dans SQL Server Service Broker peuvent être configurés dans une configuration de base de données unique pour permettre le traitement découplé des tâches de longue durée. Cela peut être un outil puissant pour améliorer les performances de l'application, à partir de l'expérience de l'utilisateur final, en dissociant le traitement des interactions de l'utilisateur final avec l'application.