Redis
 sql >> Base de données >  >> NoSQL >> Redis

Comment implémenter un flux de futures pour un appel bloquant en utilisant futures.rs et Redis PubSub ?

Mise en garde importante Je n'ai jamais utilisé cette bibliothèque auparavant, et ma connaissance de bas niveau de certains concepts est un peu... insuffisante. La plupart du temps, je lis le tutoriel. Je suis presque sûr que quiconque a fait du travail asynchrone lira ceci et rira, mais cela peut être un point de départ utile pour d'autres personnes. Caveat emptor !

Commençons par quelque chose d'un peu plus simple, démontrant comment un Stream œuvres. Nous pouvons convertir un itérateur de Result s dans un flux :

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Cela nous montre une façon de consommer le flux. Nous utilisons and_then pour faire quelque chose à chaque payload (ici juste en l'imprimant) puis for_each pour convertir le Stream retour dans un Future . Nous pouvons alors exécuter le futur en appelant l'étrangement nommé forget méthode.

Ensuite, il faut lier la bibliothèque Redis au mélange, en ne traitant qu'un seul message. Depuis le get_message() est bloquante, nous devons introduire quelques threads dans le mix. Ce n'est pas une bonne idée d'effectuer une grande quantité de travail dans ce type de système asynchrone car tout le reste sera bloqué. Par exemple :

Sauf disposition contraire, il convient de s'assurer que les implémentations de cette fonction se terminent très rapidement .

Dans un monde idéal, la caisse redis serait construite au sommet d'une bibliothèque comme les futures et exposerait tout cela de manière native.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Ma compréhension devient plus floue ici. Dans un fil séparé, nous bloquons le message et le poussons dans le canal lorsque nous le recevons. Ce que je ne comprends pas, c'est pourquoi nous devons tenir la poignée du fil. Je m'attendrais à ce que foo.forget se bloquerait, attendant que le flux soit vide.

Dans une connexion telnet au serveur Redis, envoyez ceci :

publish rust awesome

Et vous verrez que cela fonctionne. L'ajout d'instructions d'impression montre que (pour moi) le foo.forget l'instruction est exécutée avant que le thread ne soit généré.

Plusieurs messages sont plus délicats. L'Sender se consomme pour éviter que le côté générateur ne prenne trop d'avance sur le côté consommateur. Ceci est accompli en renvoyant un autre futur de send ! Nous devons le faire sortir de là pour le réutiliser pour la prochaine itération de la boucle :

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Je suis sûr qu'il y aura plus d'écosystème pour ce type d'interopération au fil du temps. Par exemple, la caisse futures-cpupool pourrait probablement être étendu pour prendre en charge un cas d'utilisation similaire à celui-ci.