MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

Importer CSV à l'aide du schéma Mongoose

Vous pouvez le faire avec fast-csv en obtenant les headers à partir de la définition de schéma qui renverra les lignes analysées en tant qu'"objets". Vous avez en fait quelques décalages, je les ai donc marqués avec des corrections :

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Tant que le schéma s'aligne réellement sur le CSV fourni, tout va bien. Ce sont les corrections que je peux voir, mais si vous avez besoin que les noms de champs réels soient alignés différemment, vous devez les ajuster. Mais il y avait essentiellement un Number à la position où il y a une String et essentiellement un champ supplémentaire, qui je suppose est le champ vide dans le CSV.

Les choses générales sont d'obtenir le tableau de noms de champs à partir du schéma et de le transmettre aux options lors de la création de l'instance d'analyseur csv :

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Une fois que vous faites cela, vous récupérez un "Objet" au lieu d'un tableau :

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Ne vous inquiétez pas des "types" car Mongoose convertira les valeurs en fonction du schéma.

Le reste se passe dans le gestionnaire pour les data un événement. Pour une efficacité maximale, nous utilisons insertMany() pour n'écrire dans la base de données qu'une fois toutes les 10 000 lignes. La façon dont cela va réellement au serveur et aux processus dépend de la version de MongoDB, mais 10 000 devrait être assez raisonnable en fonction du nombre moyen de champs que vous importeriez pour une seule collection en termes de "compromis" pour l'utilisation de la mémoire et l'écriture d'un demande de réseau raisonnable. Réduisez le nombre si nécessaire.

Les parties importantes sont de marquer ces appels comme async fonctions et await le résultat de la insertMany() avant de continuer. Nous devons également pause() le flux et resume() sur chaque élément sinon nous courons le risque d'écraser le buffer de documents à insérer avant leur envoi effectif. La pause() et resume() sont nécessaires pour mettre une "contre-pression" sur le tuyau, sinon les éléments continuent de "sortir" et de tirer les data événement.

Naturellement, le contrôle des 10 000 entrées nécessite de vérifier cela à la fois à chaque itération et à la fin du flux afin de vider le tampon et d'envoyer les documents restants au serveur.

C'est vraiment ce que vous voulez faire, car vous ne voulez certainement pas envoyer une requête asynchrone au serveur à la fois à "chaque" itération via les data événement ou essentiellement sans attendre la fin de chaque demande. Vous vous en tirerez en ne vérifiant pas cela pour les "très petits fichiers", mais pour toute charge réelle, vous êtes certain de dépasser la pile d'appels en raison d'appels asynchrones "en vol" qui ne sont pas encore terminés.

Pour info - un package.json utilisé. Le mz est facultatif car il ne s'agit que d'une Promise modernisée bibliothèque activée de bibliothèques "intégrées" de nœud standard que j'ai simplement l'habitude d'utiliser. Le code est bien sûr complètement interchangeable avec le fs module.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

En fait, avec Node v8.9.x et supérieur, nous pouvons même rendre cela beaucoup plus simple avec une implémentation de AsyncIterator via le stream-to-iterator module. C'est toujours dans Iterator<Promise<T>> mode, mais cela devrait suffire jusqu'à ce que Node v10.x devienne stable LTS :

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Fondamentalement, toute la gestion des "événements" du flux, la mise en pause et la reprise sont remplacées par un simple for boucle :

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Facile! Cela est nettoyé dans l'implémentation ultérieure du nœud avec for..await..of quand il devient plus stable. Mais ce qui précède fonctionne bien sur la version spécifiée et supérieure.