Les opérations d'agrégation dans MongoDB vous permettent de traiter des enregistrements de données, de les regrouper et de renvoyer leurs résultats calculés. MongoDB prend en charge trois types d'opérations d'agrégation :
- Commandes d'agrégation à usage unique
- Carte-Réduire
- Pipeline d'agrégation
Vous pouvez utiliser ce document de comparaison MongoDB pour voir lequel correspond à vos besoins.
Pipeline d'agrégation
Le pipeline d'agrégation est un framework MongoDB qui permet l'agrégation de données via un pipeline de traitement de données. Autrement dit, les documents sont envoyés via un pipeline en plusieurs étapes, filtrant, regroupant et transformant autrement les documents à chaque étape. Il fournit SQL « GROUPE PAR… ». type de constructions pour MongoDB qui s'exécutent sur la base de données elle-même. La documentation sur l'agrégation fournit des exemples utiles de création de tels pipelines.
Pourquoi exécuter des agrégations sur le secondaire ?
Les pipelines d'agrégation sont des opérations gourmandes en ressources. Il est logique de décharger les tâches d'agrégation sur les secondaires d'un ensemble de répliques MongoDB lorsqu'il est possible d'opérer sur des données légèrement obsolètes. Cela est généralement vrai pour les opérations « par lots », car elles ne s'attendent pas à s'exécuter sur les dernières données. Si la sortie doit être écrite dans une collection, les tâches d'agrégation ne s'exécutent que sur le primaire puisque seul le primaire est accessible en écriture dans MongoDB.
Dans cet article, nous vous montrerons comment vous assurer que les pipelines d'agrégation sont exécutés sur le secondaire à la fois à partir du shell mongo et de Java.
Exécuter des pipelines d'agrégation sur le secondaire à partir de Mongo Shell et Java dans MongoDBClick To TweetRemarque :Nous utilisons l'exemple d'ensemble de données fourni par MongoDB dans leur exemple d'agrégation de codes postaux pour présenter nos exemples. Vous pouvez le télécharger comme indiqué dans l'exemple.
Pipeline d'agrégation sur les ensembles de répliques
Shell MongoDB
Définition de la préférence de lecture sur secondaire fait l'affaire lors de l'exécution d'une tâche d'agrégation à partir du shell mongo. Essayons de récupérer tous les états avec une population supérieure à 10 millions (1ère agrégation dans l'exemple des codes postaux). Le shell et le serveur exécutent MongoDB version 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }
Un examen des journaux MongoDB (avec la journalisation activée pour les commandes) sur le secondaire montre que l'agrégation a bien fonctionné sur le secondaire :
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Java
À partir du pilote Java MongoDB, définir à nouveau la préférence de lecture fait l'affaire. Voici un exemple utilisant la version 3.2.2 du pilote :
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
Se connecte au secondaire :
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
Aucune opération n'a été enregistrée sur le primaire.
Pipeline d'agrégation sur des clusters partagés
Les pipelines d'agrégation sont pris en charge sur les clusters partitionnés. Le comportement détaillé est expliqué dans la documentation. Du point de vue de la mise en œuvre, il y a peu de différence entre le jeu de réplicas et le cluster fragmenté lors de l'utilisation d'un pipeline d'agrégation.
Comment configurer un pipeline d'agrégation sur des clusters partagés dans MongoDBClick To TweetShell MongoDB
Avant d'importer des données dans le cluster partitionné, activez le partitionnement sur la collection.
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
Après cela, les opérations sont identiques à celles du jeu de réplicas :
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
Journaux de l'un des secondaires :
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Java
Le même code que celui applicable dans le jeu de répliques fonctionne correctement avec un cluster fragmenté. Remplacez simplement la chaîne de connexion du jeu de répliques par celle du cluster partitionné. Les journaux d'un secondaire indiquent que la tâche a bien été exécutée sur les secondaires :
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
Ce contenu vous a-t-il été utile ? Faites-le nous savoir en nous tweetant @scaledgridio et comme toujours, si vous avez des questions, faites-le nous savoir dans les commentaires ci-dessous. Oh et! N'oubliez pas de consulter nos produits d'hébergement MongoDB qui peuvent économiser jusqu'à 40 % sur les coûts d'hébergement MongoDB® à long terme.