Dans un cas aussi extrême, vous feriez mieux de penser d'abord à la solution SQL recommandée, puis de l'implémenter dans SQLAlchemy - même en utilisant du SQL brut, si nécessaire. Une de ces solutions consiste à créer une table temporaire pour key_set
données et de les remplir.
Afin de tester quelque chose comme votre configuration, j'ai créé le modèle suivant
class Table(Base):
__tablename__ = 'mytable'
my_key = Column(Integer, primary_key=True)
et remplissez-le avec 20 000 000 lignes :
In [1]: engine.execute("""
...: insert into mytable
...: select generate_series(1, 20000001)
...: """)
J'ai également créé des assistants pour tester différentes combinaisons de tables temporaires, de remplissage et de requêtes. Notez que les requêtes utilisent la table Core, afin de contourner l'ORM et sa machinerie - la contribution aux délais serait de toute façon constante :
# testdb is just your usual SQLAlchemy imports, and some
# preconfigured engine options.
from testdb import *
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Executable, ClauseElement
from io import StringIO
from itertools import product
class Table(Base):
__tablename__ = "mytable"
my_key = Column(Integer, primary_key=True)
def with_session(f):
def wrapper(*a, **kw):
session = Session(bind=engine)
try:
return f(session, *a, **kw)
finally:
session.close()
return wrapper
def all(_, query):
return query.all()
def explain(analyze=False):
def cont(session, query):
results = session.execute(Explain(query.statement, analyze))
return [l for l, in results]
return cont
class Explain(Executable, ClauseElement):
def __init__(self, stmt, analyze=False):
self.stmt = stmt
self.analyze = analyze
@compiles(Explain)
def visit_explain(element, compiler, **kw):
stmt = "EXPLAIN "
if element.analyze:
stmt += "ANALYZE "
stmt += compiler.process(element.stmt, **kw)
return stmt
def create_tmp_tbl_w_insert(session, key_set, unique=False):
session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
x = table("x", column("k"))
session.execute(x.insert().values([(k,) for k in key_set]))
if unique:
session.execute("CREATE UNIQUE INDEX ON x (k)")
session.execute("ANALYZE x")
return x
def create_tmp_tbl_w_copy(session, key_set, unique=False):
session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
# This assumes that the string representation of the Python values
# is a valid representation for Postgresql as well. If this is not
# the case, `cur.mogrify()` should be used.
file = StringIO("".join([f"{k}\n" for k in key_set]))
# HACK ALERT, get the DB-API connection object
with session.connection().connection.connection.cursor() as cur:
cur.copy_from(file, "x")
if unique:
session.execute("CREATE UNIQUE INDEX ON x (k)")
session.execute("ANALYZE x")
return table("x", column("k"))
tmp_tbl_factories = {
"insert": create_tmp_tbl_w_insert,
"insert (uniq)": lambda session, key_set: create_tmp_tbl_w_insert(session, key_set, unique=True),
"copy": create_tmp_tbl_w_copy,
"copy (uniq)": lambda session, key_set: create_tmp_tbl_w_copy(session, key_set, unique=True),
}
query_factories = {
"in": lambda session, _, x: session.query(Table.__table__).
filter(Table.my_key.in_(x.select().as_scalar())),
"exists": lambda session, _, x: session.query(Table.__table__).
filter(exists().where(x.c.k == Table.my_key)),
"join": lambda session, _, x: session.query(Table.__table__).
join(x, x.c.k == Table.my_key)
}
tests = {
"test in": (
lambda _s, _ks: None,
lambda session, key_set, _: session.query(Table.__table__).
filter(Table.my_key.in_(key_set))
),
"test in expanding": (
lambda _s, _kw: None,
lambda session, key_set, _: session.query(Table.__table__).
filter(Table.my_key.in_(bindparam('key_set', key_set, expanding=True)))
),
**{
f"test {ql} w/ {tl}": (tf, qf)
for (tl, tf), (ql, qf)
in product(tmp_tbl_factories.items(), query_factories.items())
}
}
@with_session
def run_test(session, key_set, tmp_tbl_factory, query_factory, *, cont=all):
x = tmp_tbl_factory(session, key_set)
return cont(session, query_factory(session, key_set, x))
Pour les petits ensembles de clés, le simple IN
la requête que vous avez est à peu près aussi rapide que les autres, mais en utilisant un key_set
sur 100 000, les solutions les plus impliquées commencent à gagner :
In [10]: for test, steps in tests.items():
...: print(f"{test:<28}", end=" ")
...: %timeit -r2 -n2 run_test(range(100000), *steps)
...:
test in 2.21 s ± 7.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in expanding 630 ms ± 929 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert 1.83 s ± 3.73 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert 1.83 s ± 3.99 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert 1.86 s ± 3.76 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert (uniq) 1.87 s ± 6.67 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert (uniq) 1.84 s ± 125 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert (uniq) 1.85 s ± 2.8 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy 246 ms ± 1.18 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy 243 ms ± 2.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy 258 ms ± 3.05 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy (uniq) 261 ms ± 1.39 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy (uniq) 267 ms ± 8.24 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy (uniq) 264 ms ± 1.16 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
Élever le key_set
à 1 000 000 :
In [11]: for test, steps in tests.items():
...: print(f"{test:<28}", end=" ")
...: %timeit -r2 -n1 run_test(range(1000000), *steps)
...:
test in 23.8 s ± 158 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in expanding 6.96 s ± 3.02 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert 19.6 s ± 79.3 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert 20.1 s ± 114 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert 19.5 s ± 7.93 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert (uniq) 19.5 s ± 45.4 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert (uniq) 19.6 s ± 73.6 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert (uniq) 20 s ± 57.5 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy 2.53 s ± 49.9 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy 2.56 s ± 1.96 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy 2.61 s ± 26.8 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy (uniq) 2.63 s ± 3.79 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy (uniq) 2.61 s ± 916 µs per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy (uniq) 2.6 s ± 5.31 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
Jeu de clés de 10 000 000, COPY
solutions uniquement, puisque les autres ont mangé toute ma RAM et passaient par swap avant d'être tués, laissant entendre qu'ils ne finiraient jamais sur cette machine :
In [12]: for test, steps in tests.items():
...: if "copy" in test:
...: print(f"{test:<28}", end=" ")
...: %timeit -r1 -n1 run_test(range(10000000), *steps)
...:
test in w/ copy 28.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy 29.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy 29.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test in w/ copy (uniq) 28.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy (uniq) 27.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy (uniq) 28.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Ainsi, pour les petits ensembles de clés (~ 100 000 ou moins), peu importe ce que vous utilisez, bien que vous utilisiez bindparam
en expansion est clairement gagnant dans le temps par rapport à la facilité d'utilisation, mais pour des ensembles beaucoup plus volumineux, vous voudrez peut-être envisager d'utiliser une table temporaire et COPY
.
Il est à noter que pour les grands ensembles, les plans de requête sont identiques, si vous utilisez l'index unique :
In [13]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["in"],
...: cont=explain()), sep="\n")
Merge Join (cost=45.44..760102.11 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
In [14]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["exists"],
...: cont=explain()), sep="\n")
Merge Join (cost=44.29..760123.36 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
In [15]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["join"],
...: cont=explain()), sep="\n")
Merge Join (cost=39.06..760113.29 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
Étant donné que les tables de test sont en quelque sorte artificielles, il est possible d'utiliser uniquement des parcours d'index.
Enfin, voici les délais pour la méthode "piéton", pour une comparaison approximative :
In [3]: for ksl in [100000, 1000000]:
...: %time [session.query(Table).get(k) for k in range(ksl)]
...: session.rollback()
...:
CPU times: user 1min, sys: 1.76 s, total: 1min 1s
Wall time: 1min 13s
CPU times: user 9min 48s, sys: 17.3 s, total: 10min 5s
Wall time: 12min 1s
Le problème est que l'utilisation de Query.get()
inclut nécessairement l'ORM, contrairement aux comparaisons originales. Néanmoins, il devrait être quelque peu évident que les allers-retours séparés vers la base de données coûtent cher, même lors de l'utilisation d'une base de données locale.