Redis
 sql >> Base de données >  >> NoSQL >> Redis

Flask par exemple - Implémentation d'une file d'attente de tâches Redis

Cette partie du didacticiel explique comment implémenter une file d'attente de tâches Redis pour gérer le traitement de texte.

Mises à jour :

  • 12/02/2020 :mis à jour vers Python version 3.8.1 ainsi que vers les dernières versions de Redis, Python Redis et RQ. Voir ci-dessous pour plus de détails. Mentionnez un bogue dans la dernière version de RQ et fournissez une solution. Résolution du bogue http avant https.
  • 22/03/2016 :mis à jour vers Python version 3.5.1 ainsi que vers les dernières versions de Redis, Python Redis et RQ. Voir ci-dessous pour plus de détails.
  • 22/02/2015 :Ajout de la prise en charge de Python 3.

Bonus gratuit : Cliquez ici pour accéder à un didacticiel vidéo Flask + Python gratuit qui vous montre comment créer une application Web Flask, étape par étape.

N'oubliez pas :voici ce que nous construisons :une application Flask qui calcule les paires mot-fréquence en fonction du texte d'une URL donnée.

  1. Première partie :Configurez un environnement de développement local, puis déployez un environnement de préproduction et un environnement de production sur Heroku.
  2. Deuxième partie :Configurer une base de données PostgreSQL avec SQLAlchemy et Alembic pour gérer les migrations.
  3. Troisième partie :ajouter la logique back-end pour récupérer puis traiter le nombre de mots d'une page Web à l'aide des requêtes, de BeautifulSoup et des bibliothèques Natural Language Toolkit (NLTK).
  4. Quatrième partie :Mettre en œuvre une file d'attente de tâches Redis pour gérer le traitement du texte. (actuel )
  5. Cinquième partie :Configurer Angular sur le front-end pour interroger en continu le back-end pour voir si la demande est traitée.
  6. Sixième partie :Transférez vers le serveur de préproduction sur Heroku :configurez Redis et expliquez comment exécuter deux processus (Web et Worker) sur un seul Dyno.
  7. Septième partie :Mettez à jour l'interface pour la rendre plus conviviale.
  8. Huitième partie :Créer une directive angulaire personnalisée pour afficher un tableau de distribution des fréquences à l'aide de JavaScript et de D3.

Besoin du code ? Récupérez-le dans le dépôt.


Configuration requise pour l'installation

Outils utilisés :

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - une bibliothèque simple pour créer une file d'attente de tâches

Commencez par télécharger et installer Redis depuis le site officiel ou via Homebrew (brew install redis ). Une fois installé, démarrez le serveur Redis :

$ redis-server

Installez ensuite Python Redis et RQ dans une nouvelle fenêtre de terminal :

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Configurer le travailleur

Commençons par créer un processus de travail pour écouter les tâches en file d'attente. Créez un nouveau fichier worker.py , et ajoutez ce code :

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Ici, nous avons écouté une file d'attente appelée default et établi une connexion au serveur Redis sur localhost:6379 .

Lancez ceci dans une autre fenêtre de terminal :

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Nous devons maintenant mettre à jour notre app.py pour envoyer des travaux à la file d'attente…



Mettre à jour app.py

Ajoutez les importations suivantes à app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Mettez ensuite à jour la section de configuration :

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) configurer une connexion Redis et initialiser une file d'attente basée sur cette connexion.

Déplacez la fonctionnalité de traitement de texte hors de notre route d'index vers une nouvelle fonction appelée count_and_save_words() . Cette fonction accepte un argument, une URL, que nous lui transmettons lorsque nous l'appelons depuis notre route d'index.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Prenez note du code suivant :

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Remarque : Nous devons importer les count_and_save_words fonction dans notre fonction index car le paquet RQ a actuellement un bogue, où il ne trouvera pas de fonctions dans le même module.

Ici, nous avons utilisé la file d'attente que nous avons initialisée plus tôt et appelée enqueue_call() une fonction. Cela a ajouté un nouveau travail à la file d'attente et ce travail a exécuté le count_and_save_words() fonction avec l'URL comme argument. Le result_ttl=5000 L'argument de ligne indique à RQ combien de temps conserver le résultat de la tâche pendant - 5 000 secondes, dans ce cas. Ensuite, nous avons sorti l'identifiant du travail sur le terminal. Cet identifiant est nécessaire pour voir si le travail est terminé.

Configurons un nouvel itinéraire pour cela…



Obtenir des résultats

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Testons cela.

Lancez le serveur, accédez à http://localhost:5000/, utilisez l'URL https://realpython.com et récupérez l'identifiant de la tâche sur le terminal. Utilisez ensuite cet identifiant dans le point de terminaison "/results/", c'est-à-dire http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Tant que moins de 5 000 secondes se sont écoulées avant que vous ne vérifiiez l'état, vous devriez voir un numéro d'identification, qui est généré lorsque nous ajoutons les résultats à la base de données :

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Maintenant, refactorisons légèrement la route pour renvoyer les résultats réels de la base de données au format JSON :

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Assurez-vous d'ajouter l'importation :

from flask import jsonify

Testez cela à nouveau. Si tout s'est bien passé, vous devriez voir quelque chose de similaire dans votre navigateur :

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Quelle est la prochaine ?

Bonus gratuit : Cliquez ici pour accéder à un didacticiel vidéo Flask + Python gratuit qui vous montre comment créer une application Web Flask, étape par étape.

Dans la partie 5, nous réunirons le client et le serveur en ajoutant Angular au mélange pour créer un poller, qui enverra une requête toutes les cinq secondes au /results/<job_key> endpoint demandant des mises à jour. Une fois les données disponibles, nous les ajouterons au DOM.

Santé !

Ceci est une collaboration entre Cam Linke, co-fondateur de Startup Edmonton, et les gens de Real Python