Problème résolu ! Je n'arrive pas à croire que j'ai passé deux jours complets là-dessus... Je cherchais complètement dans la mauvaise direction.
Le problème n'était pas lié à une configuration de réseau Dataflow ou GCP, et pour autant que je sache...
est vrai.
Le problème était bien sûr dans mon code :seul le problème n'a été révélé que dans un environnement distribué. J'ai fait l'erreur d'ouvrir le tunnel depuis le processeur principal du pipeline, au lieu des ouvriers. Le tunnel SSH était donc opérationnel mais pas entre les workers et le serveur cible, uniquement entre le pipeline principal et la cible !
Pour résoudre ce problème, j'ai dû modifier mon DoFn demandeur pour envelopper l'exécution de la requête avec le tunnel :
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
comme vous pouvez le voir, j'ai dû remplacer certains morceaux de la bibliothèque pysql_beam.
Enfin, chaque travailleur ouvre son propre tunnel pour chaque requête. Il est probablement possible d'optimiser ce comportement mais c'est suffisant pour mes besoins.