Sqlserver
 sql >> Base de données >  >> RDS >> Sqlserver

Implémentation de la charge incrémentielle à l'aide de Change Data Capture dans SQL Server

Cet article sera intéressant pour ceux qui doivent souvent gérer l'intégration de données.

Présentation

Supposons qu'il existe une base de données où les utilisateurs modifient toujours les données (mise à jour ou suppression). Peut-être que cette base de données est utilisée par une grosse application qui ne permet pas de modifier la structure de la table. La tâche consiste à charger de temps en temps des données de cette base de données vers une autre base de données sur un serveur différent. Le moyen le plus simple de résoudre le problème consiste à charger les nouvelles données d'une base de données source vers une base de données cible avec un nettoyage préalable de la base de données cible. Vous pouvez utiliser cette méthode tant que le temps de chargement des données est acceptable et ne dépasse pas les délais prédéfinis. Que faire si le chargement des données prend plusieurs jours ? De plus, des canaux de communication instables conduisent à la situation où le chargement des données s'arrête et redémarre. Si vous rencontrez ces obstacles, je vous suggère d'envisager l'un des algorithmes de "rechargement des données". Cela signifie que seules les modifications de données survenues depuis le dernier chargement sont chargées.

CDC

Dans SQL Server 2008, Microsoft a introduit un mécanisme de suivi des données appelé Change Data Capture (CDC). D'une manière générale, le but de ce mécanisme est que l'activation de CDC pour n'importe quelle table de base de données créera une table système dans la même base de données avec un nom similaire à celui de la table d'origine (le schéma sera le suivant :'cdc' comme préfixe plus le ancien nom de schéma plus "_" et la fin "_CT". Par exemple, la table d'origine est dbo.Example, alors la table système s'appellera cdc.dbo_Example_CT). Il stockera toutes les données qui ont été modifiées.

En fait, pour creuser plus profondément dans CDC, considérons l'exemple. Mais d'abord, assurez-vous que l'agent SQL qui utilise CDC fonctionne sur l'instance de test SQL Server.

De plus, nous allons envisager un script qui crée une base de données et une table de test, remplit cette table avec des données et active CDC pour cette table.

Pour comprendre et simplifier la tâche, nous allons utiliser une instance SQL Server sans distribuer les bases de données source et cible sur différents serveurs.

utiliser mastergo-- créer une base de données source s'il n'existe pas (sélectionner * dans sys.databases où nom ='db_src_cdc') créer une base de données db_src_cdcutiliser db_src_cdcgo-- activer CDC s'il est désactivé s'il n'existe pas (sélectionner * dans sys.databases où nom =db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- créer un rôle pour les tables avec CDCif not exists(select * from sys.sysusers where name ='CDC_Reader' and issqlrole=1) create role CDC_Readergo-- create a tableif object_id('dbo.Example','U') est null créer une table dbo.Example ( ID int contrainte d'identité PK_Example clé primaire, Titre varchar(200) not null )go-- remplir la tableinsert dbo.Example (Title) values( 'Un'),('Deux'),('Trois'),('Quatre'),('Cinq');go-- activer CDC pour la table si elle n'existe pas (sélectionnez * dans sys.tables où is_tracked_by_cdc =1 and name ='Example') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Example', @role_name ='CDC_Reader'go-- remplissez la table avec des données. Nous allons modifier ou supprimer quelque chose de mise à jour dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Exemple (ID, Titre) values(1,'One'),(6,'Six');set identity_insert dbo.Example off;go

Maintenant, regardons ce que nous avons après avoir exécuté ce script dans les tables dbo.Example et cdc.dbo_Example_CT (il convient de noter que CDC est asynchrone. Les données sont renseignées dans les tables où le suivi des modifications est stocké après un certain laps de temps ).

select * from dbo.Example;
ID Title---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, *from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Titre------ --------------------- - ----------- ---------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A00000000560006 NULL 0x0000003A000000560002 1 0X03 1 ONE 1 0x0000003A000000560006 NULL 0x0000000000000000000000000060006 NULL 0x00000000000000000000000060006 NULL 0x00000000000000000000006. 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Quatre 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2pre 

Examinez en détail la structure de la table dans laquelle le suivi des modifications est stocké. Les champs __ $start_lsn et __ $seqval sont respectivement LSN (numéro de séquence du journal dans la base de données) et le numéro de transaction dans la transaction. Il y a une propriété importante dans ces champs, à savoir, nous pouvons être sûrs que l'enregistrement avec un LSN plus élevé sera effectué plus tard. Grâce à cette propriété, nous pouvons facilement obtenir le dernier état de chaque enregistrement dans la requête, en filtrant notre sélection par la condition - où __ $ rn =1.

Le champ __$operation contient le code de transaction :

  • 1 - l'enregistrement est supprimé
  • 2 - l'enregistrement est inséré
  • 3, 4 – l'enregistrement est mis à jour. Les anciennes données avant la mise à jour sont 3, les nouvelles données sont 4.

En plus des champs de service avec le préfixe « __$ », les champs de la table d'origine sont entièrement dupliqués. Ces informations nous suffisent pour procéder au chargement incrémental.

Configuration d'une base de données pour le chargement des données

Créez une table dans notre base de données cible de test, dans laquelle les données seront chargées, ainsi qu'une table supplémentaire pour stocker des données sur le journal de chargement.

utiliser mastergo-- créer une base de données cible si elle n'existe pas (sélectionner * dans sys.databases où nom ='db_dst_cdc') créer une base de données db_dst_cdcgouse db_dst_cdcgo-- créer une table si object_id('dbo.Example','U') est null créer une table dbo.Example ( ID int contrainte PK_Example clé primaire, titre varchar(200) non null )go-- créer une table pour stocker le journal de chargement si object_id('dbo.log_cdc','U') est null créer une table dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), contrainte pk_log_cdc clé primaire (table_name,dt desc) )go

J'attire votre attention sur les champs de la table LOG_CDC :

  • TABLE_NAME stocke des informations sur la table qui a été chargée (il est possible de charger plusieurs tables à l'avenir, à partir de différentes bases de données ou même de différents serveurs ; le format de la table est "SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME"
  • DT est un champ de la date et de l'heure de chargement, qui est facultatif pour le chargement incrémentiel. Cependant, il sera utile pour auditer le chargement.
  • LSN :après le chargement d'une table, nous devons stocker des informations sur l'endroit où commencer le prochain chargement, si nécessaire. En conséquence, après chaque chargement, nous ajoutons le dernier (maximum) __ $ start_lsn dans cette colonne.

Algorithme de chargement des données

Comme décrit ci-dessus, en utilisant la requête, nous pouvons obtenir le dernier état de la table à l'aide de fonctions de fenêtre. Si nous connaissons le LSN du dernier chargement, lors du prochain chargement, nous pourrons filtrer à partir de la source toutes les données dont les modifications sont supérieures au LSN stocké, s'il y a eu au moins un chargement précédent complet :

avec incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$ start_lsn> @lsn)select * from incr_Example

Ensuite, nous pouvons obtenir tous les enregistrements pour le chargement complet, si le LSN du chargement n'est pas stocké :

avec incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$ start_lsn> @lsn), full_Example as( select * from db_src_cdc.dbo.Example where @lsn is null)select ID, Title, __$operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example 

Ainsi, selon la valeur @LSN, cette requête affichera soit toutes les dernières modifications (en contournant les intermédiaires) avec le statut Supprimé ou non, soit toutes les données de la table d'origine, en ajoutant le statut 2 (nouvel enregistrement) - ce champ est utilisé uniquement pour unifier deux sélections. Avec cette requête, nous pouvons facilement implémenter le chargement complet ou le rechargement à l'aide de la commande MERGE (à partir de la version SQL 2008).

Pour éviter les goulots d'étranglement qui peuvent créer des processus alternatifs et pour charger des données correspondantes à partir de différentes tables (à l'avenir, nous chargerons plusieurs tables et, éventuellement, il peut y avoir des relations relationnelles entre elles), je suggère d'utiliser un instantané de base de données sur la base de données source ( une autre fonctionnalité SQL 2008).

Le texte complet du chargement est le suivant :

[expand title=”Code”]

/* Algorithme de chargement des données*/-- créer un instantané de base de données s'il existe (sélectionnez * à partir de sys.databases où name ='db_src_cdc_ss' ) supprimez la base de données db_src_cdc_ss; déclarez @query nvarchar(max); sélectionnez @query =N' créer la base de données db_src_cdc_ss on ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) en tant qu'instantané de db_src_cdc'from db_src_cdc.sys.sysfiles where groupid =1; exec ( @query );-- lire le LSN du précédent loaddeclare @lsn binary(10) =(select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name ='localhost.db_src_cdc.dbo.Example');-- clear une table avant le chargement complet si @lsn est null tronquer la table db_dst_cdc.dbo.Example;-- charger le processus avec incr_Example as( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn , * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn> @lsn), full_Example as( select * from db_src_cdc_ss.dbo.Example where @lsn is null), cte_Example as( select ID, Titre, __$operation from incr_Example where __$rn =1 union all select ID, Title, 2 as __$operation from full_Example)merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID when matched and __$operation =1 then deletewhen matched and __$operation <> 1 then update set trg.Title =src.Titlewhen not matched by target and __$operation <> 1 then insert (ID, Title) values ​​(src.ID, src .Title);-- marque la fin du processus de chargement et les dernières valeurs LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn) ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))-- supprimez l'instantané de la base de données s'il existe (sélectionnez * dans sys.databases où nom ='db_src_cdc_ss' ) supprimez la base de données db_src_cdc_ss

[/expand]