Bienvenue dans la diffusion en continu. Ce que vous voulez vraiment, c'est un "flux événementiel" qui traite votre entrée "un morceau à la fois", et bien sûr idéalement par un délimiteur commun tel que le caractère "newline" que vous utilisez actuellement.
Pour des choses vraiment efficaces, vous pouvez ajouter l'utilisation de MongoDB "Bulk API" inserts pour rendre votre chargement aussi rapide que possible sans consommer toute la mémoire de la machine ou les cycles du processeur.
Ne préconisez pas car il existe différentes solutions disponibles, mais voici une liste qui utilise le line- package de flux d'entrée pour simplifier la partie "terminateur de ligne".
Définitions de schéma par "exemple" uniquement :
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
Donc, généralement, l'interface "stream" "décompose l'entrée" afin de traiter "une ligne à la fois". Cela vous empêche de tout charger en même temps.
Les parties principales sont l'"Bulk Operations API" de MongoDB. Cela vous permet de "mettre en file d'attente" de nombreuses opérations à la fois avant de les envoyer réellement au serveur. Donc dans ce cas avec l'utilisation d'un "modulo", les écritures ne sont envoyées que pour 1000 entrées traitées. Vous pouvez vraiment tout faire jusqu'à la limite de 16 Mo BSON, mais gardez-le gérable.
En plus des opérations traitées en bloc, il y a un "limiteur" supplémentaire en place à partir du async bibliothèque. Ce n'est pas vraiment nécessaire, mais cela garantit que pratiquement pas plus que la "limite modulo" de documents ne sont en cours de traitement à tout moment. Les "inserts" par lots généraux n'ont aucun coût d'E/S autre que la mémoire, mais les appels "d'exécution" signifient que l'E/S est en cours de traitement. Nous attendons donc plutôt que de faire la queue pour plus de choses.
Il existe sûrement de meilleures solutions que vous pouvez trouver pour le "traitement de flux" des données de type CSV, ce que cela semble être. Mais en général, cela vous donne les concepts sur la façon de le faire d'une manière efficace en mémoire sans consommer de cycles CPU également.