PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

Comment configurer un tunnel SSH dans Google Cloud Dataflow vers un serveur de base de données externe ?

Problème résolu ! Je n'arrive pas à croire que j'ai passé deux jours complets là-dessus... Je cherchais complètement dans la mauvaise direction.

Le problème n'était pas lié à une configuration de réseau Dataflow ou GCP, et pour autant que je sache...

est vrai.

Le problème était bien sûr dans mon code :seul le problème n'a été révélé que dans un environnement distribué. J'ai fait l'erreur d'ouvrir le tunnel depuis le processeur principal du pipeline, au lieu des ouvriers. Le tunnel SSH était donc opérationnel mais pas entre les workers et le serveur cible, uniquement entre le pipeline principal et la cible !

Pour résoudre ce problème, j'ai dû modifier mon DoFn demandeur pour envelopper l'exécution de la requête avec le tunnel :

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

comme vous pouvez le voir, j'ai dû remplacer certains morceaux de la bibliothèque pysql_beam.

Enfin, chaque travailleur ouvre son propre tunnel pour chaque requête. Il est probablement possible d'optimiser ce comportement mais c'est suffisant pour mes besoins.