Mysql
 sql >> Base de données >  >> RDS >> Mysql

Google Dataflow (Apache beam) Insertion en bloc JdbcIO dans la base de données mysql

MODIF 2018-01-27 :

Il s'avère que ce problème est lié au DirectRunner. Si vous exécutez le même pipeline à l'aide de DataflowRunner, vous devriez obtenir des lots contenant jusqu'à 1 000 enregistrements. Le DirectRunner crée toujours des bundles de taille 1 après une opération de regroupement.

Réponse originale :

J'ai rencontré le même problème lors de l'écriture dans des bases de données cloud à l'aide de JdbcIO d'Apache Beam. Le problème est que même si JdbcIO prend en charge l'écriture jusqu'à 1 000 enregistrements en un seul lot, je ne l'ai jamais vu écrire plus d'une ligne à la fois (je dois admettre que cela utilisait toujours DirectRunner dans un environnement de développement).

J'ai donc ajouté une fonctionnalité à JdbcIO où vous pouvez contrôler vous-même la taille des lots en regroupant vos données et en écrivant chaque groupe comme un seul lot. Vous trouverez ci-dessous un exemple d'utilisation de cette fonctionnalité basée sur l'exemple original WordCount d'Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

La différence avec la méthode d'écriture normale de JdbcIO est la nouvelle méthode writeIterable() qui prend un PCollection<Iterable<RowT>> comme entrée au lieu de PCollection<RowT> . Chaque itérable est écrit en un seul lot dans la base de données.

La version de JdbcIO avec cet ajout peut être trouvée ici :https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

L'intégralité du projet d'exemple contenant l'exemple ci-dessus peut être trouvée ici :https://github.com/ olavloite/spanner-beam-example

(Il y a aussi une pull request en attente sur Apache Beam pour l'inclure dans le projet)