How to Write a Task Queue in SQL

How to Write a Task Queue in SQL

A task queue is a mechanism for managing and executing tasks in an ordered and efficient way. In this blog post, we will explore how to write a task queue in SQL. We will use PostgreSQL as our database management system.

1. Design the Task Queue Table

To begin, we need to design a table to store our tasks. The table should have the following fields:

  • id: A unique identifier for the task.
  • task_name: The name of the task.
  • task_params: The parameters required for the task.
  • status: The status of the task (e.g. pending, processing, completed).
  • created_at: The timestamp when the task was created.
  • updated_at: The timestamp when the task was last updated.

Here’s the SQL code to create the task queue table:

CREATE TABLE task_queue (
  id SERIAL PRIMARY KEY,
  task_name TEXT NOT NULL,
  task_params JSONB,
  status TEXT NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

2. Add Tasks to the Queue

To add a task to the queue, we simply insert a new row into the task_queue table. The status of the task should be set to “pending” by default.

Here’s an example SQL code to insert a task into the queue:

INSERT INTO task_queue (task_name, task_params) VALUES ('send_email', '{"to": "john@example.com", "subject": "Hello", "body": "World"}');

3. Fetch Pending Tasks

To fetch pending tasks from the queue, we use the following SQL code:

SELECT * FROM task_queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED;

This code selects the oldest pending task and locks it so that other processes cannot select the same task at the same time. The SKIP LOCKED clause skips tasks that are already locked by other processes.

4. Update Task Status

Once a task is being processed, we need to update its status in the task_queue table. We also need to update the updated_at field to reflect the time when the status was changed.

Here’s an example SQL code to update the status of a task:

UPDATE task_queue SET status = 'processing', updated_at = NOW() WHERE id = 1;

5. Mark Task as Completed

Once a task is completed, we need to update its status in the task_queue table. We also need to update the updated_at field to reflect the time when the status was changed.

Here’s an example SQL code to mark a task as completed:

UPDATE task_queue SET status = 'completed', updated_at = NOW() WHERE id = 1;

6. Error Handling

In case a task encounters an error, we need to update its status to “failed” and store the error message in the task_params field.

Here’s an example SQL code to update the status of a task that encountered an error:

UPDATE task_queue SET status = 'failed', task_params = '{"error": "Failed to send email."}', updated_at = NOW() WHERE id = 1;

7. Scaling the Task Queue

To scale the task queue, we can use multiple workers to fetch and process tasks from the queue simultaneously. Each worker should have a separate database connection and fetch one task at a time. This way, we can process tasks in parallel.