PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

Django ORM perd des connexions lors de l'utilisation de ThreadPoolExecutor

Je suppose que le ThreadPoolExecutor n'est pas ce qui crée la connexion à la base de données, mais les tâches filetées sont celles qui maintiennent la connexion. J'ai déjà dû gérer ça.

J'ai fini par construire ce wrapper, pour m'assurer que les threads sont fermés manuellement chaque fois que des travaux sont effectués dans un ThreadPoolExecutor. Cela devrait être utile pour s'assurer que les connexions ne fuient pas, jusqu'à présent, je n'ai vu aucune fuite lors de l'utilisation de ce code.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Ensuite, vous pouvez simplement utiliser ceci :

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...et soyez sûr que les connexions se fermeront.