Home Articles Categories Series
Pythonise Just now

Task queues with Flask | Learning Flask Ep. 21

An introduction to task queues with Flask and RQ


Article Posted on by in Flask
Julian Nash · 8 months ago in Flask

Sometimes we need to run tasks in the background, outside of the HTTP request/response cycle. Long running tasks such as image, video or audio processing can take anywhere from minutes to hours, depending on the task at hand.

Picture this scenario.

You have a web application that allows users to upload an image, to which you're going to produce 5 copies, all with different sizes to match the various screen sizes used on modern devices, including a thumbnail.

Even if this task only takes a few seconds to process, a few seconds of waiting on the web can feel like an eternity! And we want to keep our users using the application.

This is where task queues come in to play!

Task queues allow us to offload jobs to another worker process, meaning we can return something to the user immidiately whilst the job gets placed in a queue and processed at a later time (depending on how many tasks are currently in the queue, it could start immidiately)

There's many use cases for task queues, a few examples include:

  • Image/video/audio processing
  • Web scraping
  • Analysis/complex calculation
  • Sending emails

And much more!

Task queues

There are a handful of task queues available for Python, however for this introduction we're going to use RQ, a simple yet powerful task queue that uses Redis as a message broker.

Tasks are handled by regular Python functions, which we can call, provide arguments and place in a queue.

For example, a very simple function can be used to handle a task:

def task_handler(args):
    # this function takes around 2 minutes to complete
    return True

When we want to add a task to a queue, we do something like the following:

q.enqueue(task_handler, args)

Where..

  • q is a reference to the queue itself
  • enqueue adds a new task to the queue
  • task_handler is the name of the function we want to call
  • args any arguments to pass to the function

Any tasks in the queue will be executed sequentially until all tasks are complete.

The message broker is the middleman between our application and our workers, delivering messages when we want to schedule a task in thr queue.

Redis is a fast, in memory database which we'll use as our broker. So you'll need to install it.

Installing Redis

We're not going to cover installing Redis in thie article. However some points to note:

  • If you're on Windows 10, consider using the Windows Subsystem for Linux or Docker
  • If you're on Mac, install Redis using Homebrew
  • If you're on Linux, build Redis from source or install it using your distro package manager

We're using the Windows susbsystem for Linux running Ubuntu 18.04 and Redis is easily installed with:

sudo apt install redis-server

Here's a great link to an article on Digital Ocean for setting up Redis on Ubuntu 18.04 if you'd like it to run as a service:

https://www.digitalocean.com/community/tutorials/how-to-install-and-secure-redis-on-ubuntu-18-04

Otherwise just start Redis in a new terminal with the following command:

redis-server

Note - We haven't set up any authentication on our installation of Redis as it's only for local development. If you're running in production, you'll want to enable password auth.

The Flask app

In this example, we're going to build a simple application, allowing a user to submit a URL via a form.

We'll create a task handler function which we'll use to fetch the HTML of the URL and count the occurances of every word on the page.

But before we do that, let's create a simple example first.

To get things going, we're going to create a new virtual environment and install the dependencies:

python -m venv env && source env/bin/activate

We're going to install flask, redis, beautifulsoup4 and rq with pip:

pip install flask redis rq beautifulsoup4

We're going to create a simple, single file application in a file called app.py.

First up, we need a few imports:

app.py

from flask import Flask, request
import redis
from rq import Queue

import time

We're importing time to simulate some delay in our background task.

Next, we'll create the flask app variable and setup our Redis instance and task queue object:

app = Flask(__name__)

r = redis.Redis()
q = Queue(connection=r)

Lets create a very simple function that will handle a task. It takes an argument (n) and returns the length of it with a simulated delay.

As you can see, it's just a normal Python function!

def background_task(n):

    """ Function that returns len(n) and simulates a delay """

    delay = 2

    print("Task running")
    print(f"Simulating a {delay} second delay")

    time.sleep(delay)

    print(len(n))
    print("Task complete")

    return len(n)

Finally, we'll create a route which looks for a query string with n as the parameter.

@app.route("/task")
def index():

    if request.args.get("n"):

        job = q.enqueue(background_task, request.args.get("n"))

        return f"Task ({job.id}) added to queue at {job.enqueued_at}"

    return "No value for count provided"


if __name__ == "__main__":
    app.run()

If n is provided in the URL, it will add a task to the queue, with the value for n as the function argument.

We add a task to a queue with:

q.enqueue(function_name, args)

In this case, we've stored this in a variable called job.

We now have access to the job object. You'll notice we call job.id and job.enqueued_at which return a unique task id and the date when the task was enqueued.

Some other interesting attributes of the job object include:

  • job.status
  • job.func_name
  • job.args
  • job.kwargs
  • job.result
  • job.enqueued_at
  • job.started_at
  • job.ended_at
  • job.exc_info

Before you run the app, you need to start the rq worker.

Open a new terminal (In the same directory as run.py) and start the worker with:

rq worker

You'll see something like:

20:55:23 RQ worker 'rq:worker:jnwt.4968' started, version 0.13.0
20:55:23 *** Listening on default...
20:55:23 Cleaning registries for queue: default

Start the Flask app with:

export FLASK_APP=run.py
export FLASK_ENV = development
flask run

Go to /task?n=100 in your browser and keep an eye on your terminal running the rq worker process.

You should see something similar to this in your browser:

Task (7dc516bb-7720-446a-acce-cbb272d7f598) added to queue at 2019-03-08 22:03:15.936306

In your terminal, you should see:

22:03:18 default: Job OK (86a8479a-e02d-4aca-8466-6df47fa69efe)
22:03:18 Result is kept for 500 seconds
22:03:18 default: run.background_task('100') (7dc516bb-7720-446a-acce-cbb272d7f598)
Task running
Simulating a 2 second delay
3
Task complete
22:03:20 default: Job OK (7dc516bb-7720-446a-acce-cbb272d7f598)
22:03:20 Result is kept for 500 seconds

You'll notice, the app returns a response immediately while the task runs in the background.

Go ahead and refresh the URL a few times to add multiple tasks to the queue!

Example 2

Moving on to a more practical example of using the rq task queue.

We're going to rearrange our application structure to the following:

app
├── app
│   ├── __init__.py
│   ├── tasks.py
│   ├── templates
│   │   └── index.html
│   └── views.py
└── run.py

Rather than writing our task functions in the same file as our views, we're going to separate them out into their own file called tasks.py.

run.py is the entry point to our application:

run.py

from app import app

if __name__ == "__main__":
    app.run()

__init__.py is going to bring our app together as a package and initialize some key components:

__init__.py

from flask import Flask
import redis
from rq import Queue

app = Flask(__name__)

r = redis.Redis()

q = Queue(connection=r)

from app import views
from app import tasks

We start out by importing the required packages and creating the app variable.

  • r = redis.Redis() creates our Redis connection
  • q = Queue(connection=r) creates our task queue

There's lots of things we can do with queues, but for now we're just going to keep it simple and create a single queue.

Lastly, we import views and tasks from app. Where views.py contains our application routes and tasks.py contains our tasks.

views.py - For now, we're just going to import a few objects from our app along with a couple of imports from flask and render a template. We'll come back to it shortly:

views.py

from app import app
from app import r
from app import q

from flask import render_template, request


@app.route("/add-task", methods=["GET", "POST"])
def add_task():
    return render_template("add_task.html")

tasks.py is going to contain all of our task handler functions. We're going to create a function that counts all of the occurances of each word at a given URL:

tasks.py

from urllib import request
from bs4 import BeautifulSoup
import lxml

import time

def count_words(url):

    print(f"Counting words at {url}")

    start = time.time()

    r = request.urlopen(url)

    soup = BeautifulSoup(r.read().decode(), "lxml")

    paragraphs = " ".join([p.text for p in soup.find_all("p")])

    word_count = dict()

    for i in paragraphs.split():
        if not i in word_count:
            word_count[i] = 1
        else:
            word_count[i] += 1

    end = time.time()

    time_elapsed = end - start

    print(word_count)
    print(f"Total words: {len(word_count)}")
    print(f"Time elapsed: {time_elapsed} ms")

    return len(word_count)

add_task.html is the HTML template rendered by the add_task view.

The first container holds a single form element, allowing us to submit a URL. The second container displays some information about the current tasks.

We're also using Bootstrap 4 for our CSS

add_task.html

<!doctype html>
<html lang="en">

<head>
  <!-- Required meta tags -->
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">

  <!-- Bootstrap CSS -->
  <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">

  <title>Job queue</title>
</head>

<body>

  <div class="container">
    <div class="row">
      <div class="col">
        <h5 class="mt-3">Word counter</h5>
        <div class="card mt-3">
          <div class="card-body">
            <form action="/add-task">
              <div class="form-group">
                <label>Word count</label>
                <input type="text" class="form-control" name="url" placeholder="Enter URL" required>
                {% if message %}
                <small id="emailHelp" class="form-text text-muted">{{ message }}</small>
                {% endif %}
              </div>
              <button type="submit" class="btn btn-primary">Submit</button>
            </form>
          </div>
        </div>
      </div>
    </div>
  </div>

  <div class="container">
    <div class="row">
      <div class="col">
        <h5 class="mt-3 mb-3">Job queue</h5>
        {% if jobs %}
        {% for job in jobs %}
        <div class="card mb-3">
          <div class="card-body">
            <h6>{{ job.func_name }}</h6>
            <p>Args: {{ job.args }}</p>
            <small class="text-muted d-block">Job ID: {{ job.id }}</small>
            <small class="text-muted d-block">Status: {{ job.status }}</small>
            <small class="text-muted d-block">Created at: {{ job.created_at.strftime('%a, %d %b %Y %H:%M:%S') }}</small>
            <small class="text-muted d-block">Enqueued at: {{ job.enqueued_at.strftime('%a, %d %b %Y %H:%M:%S') }}</small>
          </div>
        </div>
        {% endfor %}
        {% else %}
        <p>No jobs in the queue</p>
        {% endif %}
      </div>
    </div>
  </div>

</body>

</html>

Adding tasks

Before we run our app, we need to import the count_words function.

We're also importing strftime to format the datetime object:

views.py

from app.tasks import count_words
from time import strftime

We'll now finish off the route:

views.py

@app.route("/add-task", methods=["GET", "POST"])
def add_task():

    jobs = q.jobs  # Get a list of jobs in the queue
    message = None

    if request.args:  # Only run if a query string is sent in the request

        url = request.args.get("url")  # Gets the URL coming in as a query string

        task = q.enqueue(count_words, url)  # Send a job to the task queue

        jobs = q.jobs  # Get a list of jobs in the queue

        q_len = len(q)  # Get the queue length

        message = f"Task queued at {task.enqueued_at.strftime('%a, %d %b %Y %H:%M:%S')}. {q_len} jobs queued"

    return render_template("add_task.html", message=message, jobs=jobs)

We're posting the form data as a GET request so the data will come in as a query string, using request.args.get("url") to create a variable with the form value.

  • q.jobs returns a list of any current jobs in the task queue
  • task = q.enqueue(count_words, url) adds the job to the queue, with count_words as the function and url as the argument
  • len(q) returns the number of jobs in the queue
  • message is just a formatted string containing some information about the task and length of the queue

Running the app

Before we run the app itself, we need to start the RQ worker.

The worker is a process that runs indipendently from our application and will communicate with the message broker.

When a new task is added to the queue, the worker will carry out the task!

To start the rq worker, open a new terminal and from the same directory as run.py, run the following:

rq worker

You should see something like:

20:55:23 RQ worker 'rq:worker:jnwt.4968' started, version 0.13.0
20:55:23 *** Listening on default...
20:55:23 Cleaning registries for queue: default

We're now ready to run our app. Start it as you normally would any other Flask app:

export FLASK_APP=run.py
export FLASK_ENV = development
flask run

Head over to /add-task and submit a URL, keeping an eye on the terminal window running the worker.

Depending on the URL, you should see something like:

20:19:44 default: app.tasks.count_words('https://pythonise.com/feed/flask/the-flask-request-object') (b29f3903-bd7d-4a10-a76f-f7586bb5bc37)
Counting words at https://pythonise.com/feed/flask/the-flask-request-object

This should be followed by a dictionary containing all occurances of each word and their values, finishing up with:

Total words: 455
Time elapsed: 0.8473403453826904 ms

Flask RQ task queue example

Wrapping up

This was a gentle introduction to task queues using rq and Redis, there's lots of extra goodies that we haven't covered!

Head over to the rq docs to learn more.

Last modified · 14 Mar 2019
Did you find this article useful?
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License
Contents
Loading...