Mysql
 sql >> Base de données >  >> RDS >> Mysql

Automatisez le chargement en masse des données de s3 vers l'instance Aurora MySQL RDS

L'approche est comme indiqué ci-dessus, avoir un déclencheur d'événement S3 et une tâche lambda à l'écoute sur l'emplacement du compartiment/objet s3. Dès qu'un fichier est chargé à l'emplacement s3, la tâche lambda s'exécute et, dans la lambda, vous pouvez configurer pour appeler une tâche AWS Glue. C'est exactement ce que nous avons fait et a été mis en ligne avec succès. Lambda a une durée de vie de 15 minutes, et cela devrait prendre moins d'une minute pour déclencher/démarrer une tâche Glue.

Veuillez trouver ci-joint un exemple de source pour référence.

from __future__ import print_function
import json
import boto3
import time
import urllib

print('Loading function')

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="your-glue-job-name here"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e

Pour créer une fonction Lambda, accédez à AWS Lambdra->Créer une nouvelle fonction à partir de zéro->Sélectionner S3 pour l'événement, puis configurez les emplacements de compartiment S3, préfixes selon les besoins. Ensuite, copiez-collez l'exemple de code ci-dessus, la zone de code en ligne et configurez le nom du travail de colle selon vos besoins. Veuillez vous assurer que vous disposez de tous les rôles/configuration d'accès IAM requis.

Le travail de colle doit avoir la possibilité de se connecter à votre Aurora, puis vous pouvez utiliser la commande "LOAD FROM S3...." fournie par Aurora. Assurez-vous que tous les paramètres/configurations des groupes de paramètres sont effectués selon les besoins.

Faites-moi savoir si vous rencontrez des problèmes.

MISE À JOUR :EXEMPLE d'extrait de code pour LOAD FROM S3 :

conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, [email protected];"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()