MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

Streaming de données en temps réel avec MongoDB Change Streams

Récemment, MongoDB a publié une nouvelle fonctionnalité à partir de la version 3.6, Change Streams. Cela vous donne un accès instantané à vos données qui vous aide à rester à jour avec vos modifications de données. Dans le monde d'aujourd'hui, tout le monde veut des notifications instantanées plutôt que de les recevoir après quelques heures ou quelques minutes. Pour certaines applications, il est essentiel d'envoyer des notifications en temps réel à tous les utilisateurs abonnés pour chaque mise à jour. MongoDB a rendu ce processus vraiment facile en introduisant cette fonctionnalité. Dans cet article, nous découvrirons le flux de modifications MongoDB et ses applications avec quelques exemples.

Définir les flux de changement

Les flux de modifications ne sont rien d'autre que le flux en temps réel de toutes les modifications qui se produisent dans la base de données ou la collection ou même dans les déploiements. Par exemple, chaque fois qu'une mise à jour (Insert, Update ou Delete) se produit dans une collection spécifique, MongoDB déclenche un événement de changement avec toutes les données qui ont été modifiées.

Vous pouvez définir des flux de modifications sur n'importe quelle collection comme n'importe quel autre opérateur d'agrégation normal à l'aide de l'opérateur $changeStream et de la méthode watch(). Vous pouvez également définir le flux de modifications à l'aide de la méthode MongoCollection.watch().

Exemple

db.myCollection.watch()

Modifier les fonctionnalités des flux

  • Filtrage des modifications

    Vous pouvez filtrer les modifications pour obtenir des notifications d'événements pour certaines données ciblées uniquement.

    Exemple :

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Ce code garantira que vous n'obtenez des mises à jour que pour les enregistrements dont le nom est égal à Bob. De cette façon, vous pouvez écrire n'importe quel pipeline pour filtrer les flux de modifications.

  • Reprise des flux de modification

    Cette fonctionnalité garantit qu'il n'y a pas de perte de données en cas de panne. Chaque réponse du flux contient le jeton de reprise qui peut être utilisé pour redémarrer le flux à partir d'un point spécifique. Pour certaines pannes de réseau fréquentes, le pilote mongodb tentera de rétablir la connexion avec les abonnés à l'aide du jeton de reprise le plus récent. Cependant, en cas d'échec complet de l'application, le jeton de reprise doit être conservé par les clients pour reprendre le flux.

  • Streams de modification ordonnés

    MongoDB utilise une horloge logique globale pour ordonner tous les événements de flux de modifications sur tous les réplicas et fragments de n'importe quel cluster. Ainsi, le récepteur recevra toujours les notifications dans le même ordre que les commandes ont été appliquées sur la base de données.

  • Événements avec documents complets

    MongoDB renvoie la partie des documents correspondants par défaut. Cependant, vous pouvez modifier la configuration du flux de modifications pour recevoir un document complet. Pour ce faire, passez { fullDocument :"updateLookup"} à la méthode watch.
    Exemple :

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Durabilité

    Les flux de modifications ne notifient que les données qui sont validées pour la majorité des répliques. Cela garantira que les événements sont générés par des données de persistance majoritaires garantissant la durabilité du message.

  • Sécurité/Contrôle d'accès

    Les flux de changement sont très sécurisés. Les utilisateurs peuvent créer des flux de modifications uniquement sur les collections sur lesquelles ils disposent d'autorisations de lecture. Vous pouvez créer des flux de modifications en fonction des rôles des utilisateurs.

Plusieursnines Devenez un administrateur de base de données MongoDB – Amener MongoDB en productionDécouvrez ce que vous devez savoir pour déployer, surveiller, gérer et faire évoluer MongoDBDélécharger gratuitement

Exemple de flux de changement

Dans cet exemple, nous allons créer des flux de modifications sur la collection Stock pour être averti lorsque le cours d'une action dépasse un seuil.

  • Configurer le cluster

    Pour utiliser les flux de modifications, nous devons d'abord créer un jeu de répliques. Exécutez la commande suivante pour créer un jeu de répliques à nœud unique.

    mongod --dbpath ./data --replSet “rs”
  • Insérer quelques enregistrements dans la collection Stocks

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Configurer l'environnement du nœud et installer les dépendances

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Abonnez-vous pour les changements

    Créez un fichier index.js et insérez-y le code suivant.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Exécutez maintenant ce fichier :

    node index.js
  • Insérez un nouvel enregistrement dans la base de données pour recevoir une mise à jour

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Vérifiez maintenant votre console, vous recevrez une mise à jour de MongoDB.
    Exemple de réponse :

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Ici, vous pouvez modifier la valeur du paramètre operationType avec les opérations suivantes pour écouter différents types de modifications dans une collection :

  • Insérer
  • Remplacer (sauf identifiant unique)
  • Mettre à jour
  • Supprimer
  • Invalider (Chaque fois que Mongo renvoie un curseur invalide)

Autres modes de flux de modifications

Vous pouvez démarrer des flux de modifications sur une base de données et le déploiement de la même manière que sur une collection. Cette fonctionnalité a été publiée à partir de la version 4.0 de MongoDB. Voici les commandes pour ouvrir un flux de modifications sur la base de données et les déploiements.

Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusion

MongoDB Change Streams simplifie l'intégration entre le frontend et le backend en temps réel et de manière transparente. Cette fonctionnalité peut vous aider à utiliser MongoDB pour le modèle pubsub afin que vous n'ayez plus besoin de gérer les déploiements Kafka ou RabbitMQ. Si votre application nécessite des informations en temps réel, vous devez vérifier cette fonctionnalité de MongoDB. J'espère que cet article vous permettra de démarrer avec les flux de modifications MongoDB.