Map-reduce est peut-être la plus polyvalente des opérations d'agrégation prises en charge par MongoDB.
Map-Reduce est un modèle de programmation populaire créé par Google pour le traitement et l'agrégation de gros volumes de données en parallèle. Une discussion détaillée sur Map-Reduce sort du cadre de cet article, mais il s'agit essentiellement d'un processus d'agrégation en plusieurs étapes. Les deux étapes les plus importantes sont l'étape de la carte (traite chaque document et émet les résultats) et l'étape de réduction (regroupe les résultats émis lors de l'étape de la carte).
MongoDB prend en charge trois types d'opérations d'agrégation :Map-Reduce, le pipeline d'agrégation et les commandes d'agrégation à usage unique. Vous pouvez utiliser ce document de comparaison MongoDB pour voir lequel correspond à vos besoins.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
Dans mon dernier article, nous avons vu, avec des exemples, comment exécuter des pipelines d'agrégation sur des secondaires. Dans cet article, nous allons parcourir l'exécution des tâches Map-Reduce sur les réplicas secondaires MongoDB.
Réduction de carte MongoDB
MongoDB prend en charge l'exécution de tâches Map-Reduce sur les serveurs de base de données. Cela offre la flexibilité d'écrire des tâches d'agrégation complexes qui ne sont pas aussi faciles à réaliser via des pipelines d'agrégation. MongoDB vous permet d'écrire une carte personnalisée et de réduire les fonctions en Javascript qui peuvent être transmises à la base de données via le shell Mongo ou tout autre client. Sur des ensembles de données volumineux et en croissance constante, on peut même envisager d'exécuter des tâches Map-Reduce incrémentielles pour éviter de traiter à chaque fois des données plus anciennes.
Historiquement, les méthodes map et reduce étaient exécutées dans un contexte monothread. Cependant, cette limitation a été supprimée dans la version 2.4.
Pourquoi exécuter des tâches Map-Reduce sur le secondaire ?
Comme les autres tâches d'agrégation, Map-Reduce est également une tâche "par lot" gourmande en ressources, elle convient donc parfaitement à l'exécution sur des réplicas en lecture seule. Les mises en garde sont les suivantes :
1) Il devrait être acceptable d'utiliser des données légèrement obsolètes. Ou vous pouvez modifier le problème d'écriture pour vous assurer que les répliques sont toujours synchronisées avec le primaire. Cette deuxième option suppose qu'il est acceptable de prendre un coup sur les performances d'écriture.
2) La sortie de la tâche Map-Reduce ne doit pas être écrite dans une autre collection de la base de données, mais plutôt renvoyée à l'application (c'est-à-dire sans écriture dans la base de données).
Voyons comment procéder via des exemples, à la fois du shell mongo et du pilote Java.
Map-Reduce sur les ensembles de répliques
Ensemble de données
À titre d'illustration, nous utiliserons un ensemble de données assez simple :un vidage quotidien des enregistrements de transactions d'un détaillant. Un exemple d'entrée ressemble à :
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
Dans nos exemples, nous allons calculer la dépense totale d'un client donné ce jour-là. Ainsi, étant donné notre schéma, les méthodes map et reduce ressembleront à :
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
Une fois notre schéma établi, regardons Map-Reduce en action.
Interface MongoDB
Afin de s'assurer qu'une tâche Map-Reduce est exécutée sur le secondaire, la préférence de lecture doit être définie sur secondaire . Comme nous l'avons dit plus haut, pour qu'un Map-Reduce s'exécute sur un secondaire, la sortie du résultat doit être inline (En fait, c'est la seule valeur de sortie autorisée sur les secondaires). Voyons comment cela fonctionne.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
Un coup d'œil aux journaux sur le secondaire confirme que le travail a bien été exécuté sur le secondaire.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Essayons maintenant d'exécuter une tâche Map-Reduce sur les réplicas en lecture à partir d'une application Java. Sur le pilote Java MongoDB, la définition de la préférence de lecture fait l'affaire. La sortie est en ligne par défaut, donc aucun paramètre supplémentaire n'a besoin d'être passé. Voici un exemple utilisant la version 3.2.2 du pilote :
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Comme le montrent les journaux, la tâche s'est exécutée sur le secondaire :
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce sur les clusters partagés
MongoDB prend en charge Map-Reduce sur les clusters fragmentés, à la fois lorsqu'une collection fragmentée est l'entrée et lorsqu'elle est la sortie d'une tâche Map-Reduce. Cependant, MongoDB ne prend actuellement pas en charge l'exécution de tâches de réduction de carte sur les secondaires d'un cluster fragmenté. Ainsi, même si l'option out est défini sur inline , les tâches Map-Reduce s'exécutent toujours sur les primaires d'un cluster partitionné. Ce problème est suivi via ce bogue JIRA.
La syntaxe d'exécution d'une tâche Map-Reduce sur un cluster partitionné est la même que celle d'un jeu de réplicas. Ainsi, les exemples fournis dans la section ci-dessus sont valables. Si l'exemple Java ci-dessus est exécuté sur un cluster partitionné, des messages de journal apparaissent sur les primaires indiquant que la commande s'y est exécutée.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Veuillez visiter notre page produit MongoDB pour en savoir sur notre liste complète de fonctionnalités.