PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

INSÉRER ou METTRE À JOUR des données en masse de dataframe/CSV vers la base de données PostgreSQL

Dans ce cas particulier, il est préférable de descendre au niveau DB-API, car vous avez besoin de certains outils qui ne sont même pas exposés directement par SQLAlchemy Core, tels que copy_expert() . Cela peut être fait en utilisant raw_connection() . Si vos données source sont un fichier CSV, vous n'avez pas du tout besoin de pandas dans ce cas. Commencez par créer une table intermédiaire temporaire, copiez les données dans la table temporaire et insérez-les dans la table de destination avec gestion des conflits :

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Si au contraire votre source de données est le DataFrame , vous pouvez toujours utiliser COPY en en passant une fonction comme méthode method= à to_sql() . La fonction pourrait même masquer toute la logique ci-dessus :

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

Vous insérez alors le nouveau DataFrame en utilisant

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

En utilisant cette méthode, la mise à jour d'environ 1 000 000 lignes a pris environ 16 secondes sur cette machine avec une base de données locale.