PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

Accrocher le script Python à l'aide de SQLAlchemy et du multitraitement

Je crois que le TypeError vient du multiprocessing est get .

J'ai supprimé tout le code DB de votre script. Jetez un oeil à ceci :

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Utiliser r.wait renvoie le résultat attendu, mais en utilisant r.get lève TypeError . Comme décrit dans la documentation de Python , utilisez r.wait après un map_async .

Modifier :Je dois modifier ma réponse précédente. Je crois maintenant que le TypeError vient de SQLAlchemy. J'ai modifié mon script pour reproduire l'erreur.

Modifier 2 :Il semble que le problème est que multiprocessing.pool ne fonctionne pas bien si un travailleur lève une exception dont le constructeur nécessite un paramètre (voir aussi ici ).

J'ai modifié mon script pour mettre cela en évidence.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Dans votre cas, étant donné que votre code lève une exception SQLAlchemy, la seule solution à laquelle je peux penser est d'attraper toutes les exceptions dans le do fonction et relancer une Exception normale Au lieu. Quelque chose comme ça :

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Modifier 3  :donc, cela semble être un bogue avec Python , mais des exceptions appropriées dans SQLAlchemy permettraient de contourner ce problème :par conséquent, j'ai soulevé le problème avec SQLAlchemy , aussi.

Pour contourner le problème, je pense que la solution à la fin de Edit 2 ferait (envelopper les rappels dans try-except et re-raise).