Mysql
 sql >> Base de données >  >> RDS >> Mysql

Utilisation de Python et MySQL dans le processus ETL :Utilisation de Python et SQLAlchemy

Dans les deux articles précédents de cette série, nous avons expliqué comment utiliser Python et SQLAlchemy pour effectuer le processus ETL. Aujourd'hui, nous ferons la même chose, mais cette fois en utilisant Python et SQL Alchemy sans commandes SQL sous forme textuelle. Cela nous permettra d'utiliser SQLAlchemy quel que soit le moteur de base de données auquel nous sommes connectés. Alors, commençons.

Aujourd'hui, nous allons discuter de la façon d'effectuer le processus ETL en utilisant Python et SQLAlchemy. Nous allons créer un script pour extraire les données quotidiennes de notre base de données opérationnelle, les transformer, puis les charger dans notre entrepôt de données.

Ceci est le troisième article de la série. Si vous n'avez pas lu les deux premiers articles (Utiliser Python et MySQL dans le processus ETL et SQLAlchemy), je vous encourage fortement à le faire avant de continuer.

Toute cette série est une continuation de notre série d'entrepôts de données :

  • Création d'un DWH, première partie :un modèle de données d'entreprise d'abonnement
  • Création d'un DWH, deuxième partie :un modèle de données d'entreprise d'abonnement
  • Création d'un entrepôt de données, partie 3 :un modèle de données d'entreprise d'abonnement

Bon, maintenant commençons sur le sujet d'aujourd'hui. Examinons d'abord les modèles de données.

Les modèles de données



Modèle de données de base de données opérationnelle (en direct)




Modèle de données DWH


Ce sont les deux modèles de données que nous utiliserons. Pour plus d'informations sur les entrepôts de données (DWH), consultez ces articles :

  • Le schéma en étoile
  • Le schéma du flocon de neige
  • Schéma en étoile contre schéma en flocon

Pourquoi SQLAlchemy ?

L'idée derrière SQLAlchemy est qu'après avoir importé des bases de données, nous n'avons pas besoin de code SQL spécifique au moteur de base de données associé. Au lieu de cela, nous pouvons importer des objets dans SQLAlchemy et utiliser la syntaxe SQLAlchemy pour les instructions. Cela nous permettra d'utiliser le même langage quel que soit le moteur de base de données auquel nous sommes connectés. Le principal avantage ici est qu'un développeur n'a pas besoin de s'occuper des différences entre les différents moteurs de base de données. Votre programme SQLAlchemy fonctionnera exactement de la même manière (avec des modifications mineures) si vous migrez vers un autre moteur de base de données.

J'ai décidé d'utiliser uniquement les commandes SQLAlchemy et les listes Python pour communiquer avec le stockage temporaire et entre différentes bases de données. Les principales raisons de cette décision sont que 1) les listes Python sont bien connues et 2) le code serait lisible pour ceux qui n'ont pas de compétences en Python.

Cela ne veut pas dire que SQLAlchemy est parfait. Il a certaines limites, dont nous parlerons plus tard. Pour l'instant, regardons simplement le code ci-dessous :

Exécuter le script et le résultat

Il s'agit de la commande Python utilisée pour appeler notre script. Le script vérifie les données dans la base de données opérationnelle, compare les valeurs avec le DWH et importe les nouvelles valeurs. Dans cet exemple, nous mettons à jour des valeurs dans deux tables de dimension et une table de faits ; le script renvoie la sortie appropriée. L'ensemble du script est écrit pour que vous puissiez l'exécuter plusieurs fois par jour. Il supprimera les "anciennes" données de ce jour et les remplacera par de nouvelles.

Analysons l'ensemble du script, en commençant par le haut.

Importer SQLAlchemy

La première chose que nous devons faire est d'importer les modules que nous utiliserons dans le script. Habituellement, vous importerez vos modules au fur et à mesure que vous écrivez le script. Dans la plupart des cas, vous ne saurez pas exactement de quels modules vous aurez besoin au départ.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Nous avons importé le datetime de Python module, qui nous fournit des classes qui fonctionnent avec des dates.

Ensuite, nous avons le sqlalchemy module. Nous n'importerons pas le module entier, juste les choses dont nous avons besoin - certaines spécifiques à SQLAlchemy (create_engine , MetaData , Table ), certaines parties de l'instruction SQL (select , and_ , case ) et func , ce qui nous permet d'utiliser des fonctions comme count() et somme() .

Connexion aux bases de données

Nous devrons nous connecter à deux bases de données sur notre serveur. Nous pourrions nous connecter à plus de bases de données (MySQL, SQL Server ou toute autre) à partir de différents serveurs si nécessaire. Dans ce cas, les deux bases de données sont des bases de données MySQL et sont stockées sur ma machine locale.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Nous avons créé deux moteurs et deux connexions. Je n'entrerai pas dans les détails ici car nous avons déjà expliqué ce point dans l'article précédent.

Mettre à jour le dim_time Dimension

Objectif :Insérer la date d'hier si elle n'est pas déjà insérée dans le tableau.

Dans notre script, nous mettrons à jour deux tables de dimension avec de nouvelles valeurs. Les autres suivent le même schéma, nous n'y reviendrons donc qu'une seule fois; nous n'avons pas besoin d'écrire un code presque identique plusieurs fois de plus.

L'idée est très simple. Nous exécuterons toujours le script pour insérer de nouvelles données pour hier. Par conséquent, nous devons vérifier si cette date a été insérée dans la table de dimension. S'il est déjà là, nous ne ferons rien; si ce n'est pas le cas, nous l'ajouterons. Examinons le code pour mettre à jour le dim_time tableau.

Premièrement, nous allons vérifier si la date existe. S'il n'existe pas, nous l'ajouterons. Nous commençons par stocker la date d'hier dans une variable. En Python, vous procédez ainsi :

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

La première ligne prend une date actuelle, la convertit en une valeur numérique, soustrait 1 de cette valeur et reconvertit cette valeur numérique en une date (hier =aujourd'hui – 1 ). La deuxième ligne stocke la date dans un format textuel.

Ensuite, nous testerons si la date est déjà dans la base de données :

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Après avoir chargé la table, nous exécuterons une requête qui devrait renvoyer toutes les lignes de la table de dimension où la valeur heure/date est égale à hier. Le résultat peut avoir 0 (aucune date de ce type dans le tableau) ou 1 ligne (la date est déjà dans le tableau).

Si la date n'est pas déjà dans le tableau, nous utiliserons la commande insert() pour l'ajouter :

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Une nouvelle chose que je voudrais souligner ici est l'utilisation de. .year , .month , .isocalendar()[1] , et .weekday pour obtenir des parties de date.

Mise à jour de dim_city Dimension

Objectif :Insérer de nouvelles villes s'il y en a (c'est-à-dire comparer la liste des villes dans la base de données en direct à la liste des villes dans le DWH et ajouter celles qui manquent).

Mise à jour du dim_time dimension était assez simple. Nous avons simplement testé si une date était dans le tableau et l'avons insérée si elle n'y était pas déjà. Pour tester une valeur dans la base de données DWH, nous avons utilisé une variable Python (hier ). Nous utiliserons à nouveau ce processus, mais cette fois avec des listes.

Puisqu'il n'existe pas de moyen simple de combiner des tables de différentes bases de données dans une seule requête SQLAlchemy, nous ne pouvons pas utiliser l'approche décrite dans la partie 1 de cette série. Par conséquent, nous aurons besoin d'un objet pour stocker les valeurs nécessaires à la communication entre ces deux bases de données. J'ai décidé d'utiliser des listes, car elles sont courantes et font le travail.

Tout d'abord, nous allons charger le country et city tables d'une base de données en direct dans les objets pertinents.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Ensuite, nous allons charger le dim_city table du DWH dans une liste :

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Ensuite, nous ferons de même pour les valeurs de la base de données en direct. Nous rejoindrons les tables country et city nous avons donc toutes les données nécessaires dans cette liste :

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Nous allons maintenant parcourir la liste contenant les données de la base de données en direct. Pour chaque enregistrement, nous comparerons les valeurs (city_name , postal_code , et country_name ). Si nous ne trouvons pas de telles valeurs, nous ajouterons un nouvel enregistrement dans le dim_city tableau.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Pour déterminer si la valeur est déjà dans le DWH, nous avons testé une combinaison d'attributs qui doivent être uniques. (La clé primaire de la base de données en direct ne nous aide pas beaucoup ici.) Nous pouvons utiliser un code similaire pour mettre à jour d'autres dictionnaires. Ce n'est pas la solution la plus agréable, mais c'est quand même assez élégant. Et il fera exactement ce dont nous avons besoin.

Mettre à jour le fact_customer_subscribed Tableau

Objectif :Si nous avons d'anciennes données pour la date d'hier, supprimez-les d'abord. Ajoutez les données d'hier dans le DWH, que nous ayons supprimé ou non quelque chose à l'étape précédente.

Après avoir mis à jour toutes les tables de dimension, nous devons mettre à jour les tables de faits. Dans notre script, nous ne mettrons à jour qu'une seule table de faits. Le raisonnement est le même que dans la section précédente :la mise à jour d'autres tables suivrait le même schéma, nous répéterions donc principalement le code.

Avant d'insérer des valeurs dans la table de faits, nous devons connaître les valeurs des clés associées à partir des tables de dimension. Pour ce faire, nous allons à nouveau charger les dimensions dans des listes et les comparer avec les valeurs de la base de données en direct.

La première chose que nous allons faire est de charger le client et fact_customer_subscribed tables en objets :

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Maintenant, nous devons trouver des clés pour la dimension temporelle associée. Puisque nous insérons toujours des données pour hier, nous allons rechercher cette date dans le dim_time table et utilisez son ID. La requête renvoie 1 ligne et l'ID est en première position (l'index commence à 0, donc c'est result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Pendant ce temps, nous supprimerons tous les enregistrements associés de la table de faits :

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

D'accord, nous avons maintenant l'ID de la dimension temporelle stocké dans le dim_time_id variable. C'était facile car nous ne pouvons avoir qu'une seule valeur de dimension temporelle. L'histoire sera différente pour la dimension ville. Tout d'abord, nous allons charger tous les valeurs dont nous avons besoin - des valeurs qui décrivent de manière unique la ville (pas l'ID) et des valeurs agrégées :

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Il y a quelques points sur lesquels je voudrais insister sur la requête ci-dessus :

  • func.sum(...) est SOMME(...) du "SQL standard".
  • Le case(...) la syntaxe utilise and_ avant les conditions, pas entre elles.
  • .label(...) fonctionne comme un alias SQL AS.
  • Nous utilisons \ pour passer à la ligne suivante et augmenter la lisibilité de la requête. (Croyez-moi, c'est quasiment illisible sans le slash - j'ai essayé :) )
  • .group_by(...) joue le rôle de GROUP BY de SQL.

Ensuite, nous allons parcourir chaque enregistrement renvoyé à l'aide de la requête précédente. Pour chaque enregistrement, nous comparerons les valeurs qui définissent de manière unique une ville (city_name , postal_code , country_name ) avec les valeurs stockées dans la liste créée à partir du DWH dim_city table. Si les trois valeurs correspondent, nous stockons l'ID de la liste et l'utilisons lors de l'insertion de nouvelles données. Ainsi, pour chaque enregistrement, nous aurons des identifiants pour les deux dimensions :

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

Et c'est tout. Nous avons mis à jour notre DWH. Le script serait beaucoup plus long si nous mettions à jour toutes les tables de dimensions et de faits. La complexité serait également plus grande lorsqu'une table de faits est liée à plusieurs tables de dimension. Dans ce cas, nous aurions besoin d'un for boucle pour chaque table de dimension.

Ça ne marche pas !

J'ai été très déçu quand j'ai écrit ce script et j'ai ensuite découvert que quelque chose comme ça ne fonctionnerait pas :

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

Dans cet exemple, j'essaie d'utiliser des tables de deux bases de données différentes. Si nous établissons deux connexions distinctes, la première connexion ne "verra" pas les tables d'une autre connexion. Si nous nous connectons directement au serveur, et non à une base de données, nous ne pourrons pas charger les tables.

Jusqu'à ce que cela change (espérons-le bientôt), vous devrez utiliser une sorte de structure (par exemple, ce que nous avons fait aujourd'hui) pour communiquer entre les deux bases de données. Cela complique le code, car vous devez remplacer une seule requête par deux listes et imbriqué for boucles.

Partagez vos réflexions sur SQLAlchemy et Python

Ceci était le dernier article de cette série. Mais qui sait? Nous essaierons peut-être une autre approche dans les prochains articles, alors restez à l'écoute. En attendant, partagez vos réflexions sur SQLAlchemy et Python en combinaison avec des bases de données. Que pensez-vous qu'il nous manque dans cet article ? Qu'ajouteriez-vous ? Dites-le nous dans les commentaires ci-dessous.

Vous pouvez télécharger le script complet que nous avons utilisé dans cet article ici.

Et un merci spécial à Dirk J Bosman (@dirkjobosman), qui a recommandé cette série d'articles.