Implementing Async REST APIs in FastAPI with PostgreSQL CRUD

In this tutorial we will implement a Python based FastAPI with PostgreSQL CRUD. We will focus on implementing Asynchronous REST Endpoints with the help of Python based module databases that gives simple asyncio support for a range of databases including PostgreSQL.

FastAPI CRUD PostgreSQL Async RESTAPIs SQLAlchemy – TutLinks
FastAPI CRUD PostgreSQL Async RESTAPIs SQLAlchemy – TutLinks

Table of Contents

Prerequisites

Development Requirements

Full Source Code of this tutorial is available on fastapi-postgresql-azure-deploy branch of this repository.

Create a Workspace

Run the following commands to create our development workspace and launch VS Code that opens our workspace as active directory.

mkdir fastapi-postgres-tutorial
cd fastapi-postgres-tutorial
code .

Create & Activate Python Virtual Environment

In VS Code navigate to View and click Terminal to launch command prompt. Run the below scripts in Terminal to create Python virtual environment.

Windows Users

If you are a Windows based OS Users, run the following command from terminal to create Python virtual environment.

python -m venv env
env\Scripts\activate

Linux based OS Users

If you are a Linux based OS Users, run the following command from terminal.

python3 -m venv env
source ./env/bin/activate

Run the following command in terminal to install FastAPI, Uvicorn, Gunicorn and databases packages

pip install fastapi uvicorn gunicorn databases[postgresql]

Note that we are using databases package as it uses asyncpg as an interface to talk to PostgreSQL database. And as per official documentation of asyncpgasyncpg is 3x faster than other Python based PostgreSQL drivers psycopg2 and aipog.

Run the following command to freeze dependencies to requirements.txt file in terminal.

pip freeze > requirements.txt

If you want to use sqlite database for development purpose, you need to install sqlite module support for databases package. Run the following command to install sqlite extension for databases module.

pip install databases[sqlite]

Developing Async CRUD APIs using FastAPI

To speed up things, I’ll be using the code from official documentation of FastAPI using Async SQL Databases. So following are the enhancements that my implementation has, in addition to what is available from code base from official reference.

Let’s Code FastAPI with PostgreSQL CRUD from the scratch

Add a file main.py in the root directory of the workspace.

Import references

To start with, in the main.py file add the following references.

from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import urllib

Configure Database FastAPI PostgreSQL Connection String

For SQLite

Add the following line to main.py to define connection string for our application to talk to sqlite database.

DATABASE_URL = "sqlite:///./test.db"

For PostgreSQL server

In case you have a PostgreSQL server then add the following lines to main.py and configure the values accordingly for environment variables db_username, db_password, host_server, db_server_port, database_name, ssl_mode.

host_server = os.environ.get('host_server', 'localhost')
db_server_port = urllib.parse.quote_plus(str(os.environ.get('db_server_port', '5432')))
database_name = os.environ.get('database_name', 'fastapi')
db_username = urllib.parse.quote_plus(str(os.environ.get('db_username', 'postgres')))
db_password = urllib.parse.quote_plus(str(os.environ.get('db_password', 'secret')))
ssl_mode = urllib.parse.quote_plus(str(os.environ.get('ssl_mode','prefer')))
DATABASE_URL = 'postgresql://{}:{}@{}:{}/{}?sslmode={}'.format(db_username, db_password, host_server, db_server_port, database_name, ssl_mode)

If your database server required SSL then replace prefer with required in the DATABASE_URL connection string for PostgreSQL.

Simply put, the FastAPI PostgreSQL Connection string follows the following format.

'postgresql://db_username:db_password@host_server:db_server_port/database_name?sslmode=prefer'

Create database instance

Once we have DATABASE_URL url built, create instance of database by adding the following line to main.py

database = databases.Database(DATABASE_URL)

Create SQL Alchemy model

We will create a table named notes. The purpose is to store detail of note in textcolumn and its status in completed column. We will use sqlalchemy to define the notes table that resembles the relational database schema in the form of Python code. Add the following lines to define schema for notes table.

metadata = sqlalchemy.MetaData()

notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)

Create Engine

For Sqlite DB

If you are using sqlite database, then add the following lines to main.py

engine = sqlalchemy.create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)

For PostgreSQL Server

If you are having DATABASE_URL built to point to PostgreSQL server database, then add the following lines to main.py

engine = sqlalchemy.create_engine(
    DATABASE_URL, pool_size=3, max_overflow=0
)
metadata.create_all(engine)

Create Models using Pydantic

Add the following models to main.py. These are models built with Pydantic’s BaseModelPydantic models help you define request payload models and response models in Python Class object notation. FastAPI also uses these models in its auto generated OpenAPI Specs (Swagger) to indicate response and request payload models.

class NoteIn(BaseModel):
    text: str
    completed: bool

class Note(BaseModel):
    id: int
    text: str
    completed: bool

NoteIn is the model in its JSON form used as payload to Create or Update note endpoints. Note is the model in its JSON form will be used as response to retrieve notes collection or a single note given its id.

The JSON notation of these models will look something similar as mentioned below.

JSON for NoteIn model.

{
    "text": "fix tap in kitchen sink",
    "completed": false
}

JSON for Note model.

{
    "id": 7,
    "text": "fix tap in kitchen sink",
    "completed": false
}

Add CORS to FastAPI

In order for our REST API endpoints to be consumed in client applications such as Vue, React, Angular or any other Web applications that are running on other domains, we should tell our FastAPI to allow requests from the external callers to the endpoints of this FastAPI application. We can enable CORS (Cross Origin Resource Sharing) either at application level or at specific endpoint level. But in this situation we will add the following lines to main.py to enable CORS at the application level by allowing requests from all origins specified by allow_origins=[*].

app = FastAPI(title="REST API using FastAPI PostgreSQL Async EndPoints")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

allow_origins=[*] is not recommended for Production purposes. It is recommended to have specified list of origins such as mentioned below.

allow_origins=['client-facing-example-app.com', 'localhost:5000']

For more details, refer the official documentation on How to configure CORS for FastAPI

Application Startup & Shutdown Events

FastAPI can be run on multiple worker process with the help of Gunicorn server with the help of uvicorn.workers.UvicornWorker worker class. Every worker process starts its instance of FastAPI application on its own Process Id. In order to ensure every instance of application communicates to the database, we will connect and disconnect to the database instance in the FastAPI events   startup  and  shutdown  respectively. So add the following code to main.py to do that.

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

Create a Note using HTTP Verb POST

We will use POST HTTP Verb available as post method of FastAPI’s instance variable app to create/ insert a new note in our notes table.

The status code on successful creation of note will be 201. This can be seen as an argument status_code passed to post method which accepts integer value held by status.HTTP_201_CREATED. Add the following code to main.py to add a note to the table.

@app.post("/notes/", response_model=Note, status_code = status.HTTP_201_CREATED)
async def create_note(note: NoteIn):
    query = notes.insert().values(text=note.text, completed=note.completed)
    last_record_id = await database.execute(query)
    return {**note.dict(), "id": last_record_id}

Update Note using HTTP Verb PUT

We will use PUT HTTP Verb available as put method of FastAPI’s instance variable appto update / modify an existing note in our notes table. Add the following code to main.py to modify a note from the notes table.

@app.put("/notes/{note_id}/", response_model=Note, status_code = status.HTTP_200_OK)
async def update_note(note_id: int, payload: NoteIn):
    query = notes.update().where(notes.c.id == note_id).values(text=payload.text, completed=payload.completed)
    await database.execute(query)
    return {**payload.dict(), "id": note_id}

Get Paginated List of Notes using HTTP Verb GET

We will use GET HTTP Verb available as get method of FastAPI’s instance variable app to retrieve paginated 🗐 collection of notes available in our notes table. Add the following code to main.py to get list of notes from the table.

@app.get("/notes/", response_model=List[Note], status_code = status.HTTP_200_OK)
async def read_notes(skip: int = 0, take: int = 20):
    query = notes.select().offset(skip).limit(take)
    return await database.fetch_all(query)

Here the skip and take arguments will define how may notes to be skipped and how many notes to be returned in the collection respectively. If you have a total of 13 notes in your database and if you provide skip a value of 10 and take a value of 20, then only 3 notes will be returned. skip will ignore the value based on the identity of the collection starting from old to new.

Get single Note Given its Id using HTTP Verb GET

We will again use the GET HTTP Verb available as get method of FastAPI’s instance variable app to retrieve a single note identified by provided id in the request as a note_id query parameter. Add the following code to main.py to get a note given its id.

@app.get("/notes/{note_id}/", response_model=Note, status_code = status.HTTP_200_OK)
async def read_notes(note_id: int):
    query = notes.select().where(notes.c.id == note_id)
    return await database.fetch_one(query)

Delete single Note Given its Id using HTTP Verb DELETE

We will use DELETE HTTP Verb available as delete method of FastAPI’s instance variable app to permanently delete an existing note in our notes table. Add the following code to main.py to wipe off the note permanently given note_id as query parameter.

@app.delete("/notes/{note_id}/", status_code = status.HTTP_200_OK)
async def update_note(note_id: int):
    query = notes.delete().where(notes.c.id == note_id)
    await database.execute(query)
    return {"message": "Note with id: {} deleted successfully!".format(note_id)}

Full Code

from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import urllib

# SQLAlchemy specific code, as with any other app
# DATABASE_URL = "sqlite:///./test.db"

host_server = os.environ.get('host_server', 'localhost')
db_server_port = urllib.parse.quote_plus(str(os.environ.get('db_server_port', '5432')))
database_name = os.environ.get('database_name', 'fastapi')
db_username = urllib.parse.quote_plus(str(os.environ.get('db_username', 'postgres')))
db_password = urllib.parse.quote_plus(str(os.environ.get('db_password', 'secret')))
ssl_mode = urllib.parse.quote_plus(str(os.environ.get('ssl_mode','prefer')))
DATABASE_URL = 'postgresql://{}:{}@{}:{}/{}?sslmode={}'.format(db_username,db_password, host_server, db_server_port, database_name, ssl_mode)

database = databases.Database(DATABASE_URL)

metadata = sqlalchemy.MetaData()

notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)


engine = sqlalchemy.create_engine(
    DATABASE_URL, pool_size=3, max_overflow=0
)
metadata.create_all(engine)

class NoteIn(BaseModel):
    text: str
    completed: bool

class Note(BaseModel):
    id: int
    text: str
    completed: bool

app = FastAPI(title = "REST API using FastAPI PostgreSQL Async EndPoints")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

@app.get("/notes/", response_model=List[Note], status_code = status.HTTP_200_OK)
async def read_notes(skip: int = 0, take: int = 20):
    query = notes.select().offset(skip).limit(take)
    return await database.fetch_all(query)

@app.get("/notes/{note_id}/", response_model=Note, status_code = status.HTTP_200_OK)
async def read_notes(note_id: int):
    query = notes.select().where(notes.c.id == note_id)
    return await database.fetch_one(query)

@app.post("/notes/", response_model=Note, status_code = status.HTTP_201_CREATED)
async def create_note(note: NoteIn):
    query = notes.insert().values(text=note.text, completed=note.completed)
    last_record_id = await database.execute(query)
    return {**note.dict(), "id": last_record_id}

@app.put("/notes/{note_id}/", response_model=Note, status_code = status.HTTP_200_OK)
async def update_note(note_id: int, payload: NoteIn):
    query = notes.update().where(notes.c.id == note_id).values(text=payload.text, completed=payload.completed)
    await database.execute(query)
    return {**payload.dict(), "id": note_id}

@app.delete("/notes/{note_id}/", status_code = status.HTTP_200_OK)
async def delete_note(note_id: int):
    query = notes.delete().where(notes.c.id == note_id)
    await database.execute(query)
    return {"message": "Note with id: {} deleted successfully!".format(note_id)}

Run the FastAPI app

Save changes to main.py and run the following command in the terminal to spin up the FastAPI app.

uvicorn --port 8000 --host 127.0.0.1 main:app --reload

With the above command, we are invoking the call to the Uvicorn ASGI server with the following infrastructure settings.

  • host 127.0.0.1 means we are configuring uvicorn to run our application on the localhost of the PC. The other possible values for host parameter is 0.0.0.0 or localhost. 0.0.0.0 is recommended when deploying the FastAPI to production environments.
  • port 8000 is the port on which we want our application to run. If you have any other application or service already running on this port, the above command will fail to execute. In such situation, try to change it to any other four digit number of your choice that is found to be freely available for the application to consume.
  • reload It is recommended to set this flag for the development purposes only. Enabling this flag will automatically restart the uvicorn server with any changes you make to your code while in development. It is obvious that, in case there are any run time failures, you will quickly identify those changes from the error trace that caused the failure of uvicorn server to restart.
  • main:app This follows a pattern as detailed below.
    • main is the module where the FastAPI is initialized. In our case, all we have is main.py at the root level. If you are initializing the FastAPI variable somewhere under the other directories, you need to add an __init__.py file and expose that module so that uvicorn can properly identify the configuration. In our case main becomes the module.
    • app is the name of the variable which is assigned with the instance of FastAPI. You are free to change these names but reflect the same syntax that follows module-name:fastapi-initialization-variable pattern.

Curl Commands to perform CRUD

There are three ways to perform CRUD for FastAPI REST Endpoints.

  • Postman, a REST Client (in fact a lot more than a REST Client) to perform calls to REST APIs
  • OpenAPI User Interface accessible via /docs (Swagger UI) to perform CRUD operations by clicking Try it out button available for every end point
  • cURL commands via a command terminal.

If you want to explore the hardcore programmer in you, I recommend trying out cURL.

Let’s execute the following cURL commands to perform CRUD on our FastAPI Async REST Endpoints to Create, Read, Update and Delete data into PostgreSQL database.

Add a single Note via cURL using POST

Run the following command in the command terminal to Add a single Note.

Request

curl -X POST "http://localhost:8000/notes/" ^
 -H "accept: application/json" ^
 -H "Content-Type: application/json" ^
 -d "{\"text\":\"Get Groceries from the store\",\"completed\":false}"

In the above command are commanding cURL with following args:

  • -X indicates the presence of one of the HTTP Verbs or HTTP Request Methods, followed by URL. The values include POST, GET, PUT, DELETE, PATCH.
  • -H indicates header. Headers are optional unless required as specified in API specifications. Some times though they are optional, specifying wrong headers may not guarantee the expected result processed by cURL.
  • -d is mostly used for non GET HTTP verbs and indicates data or payload that is required by the request.
  • ^ is a line break added just for readability. It works only on Windows OS command terminals. If you are on any Mac or Linux distributions, replace ^ with \ (backward slash). Alternatively, you can have the cURL sent in single line without having any ^ line breaks.

Response body

{
  "id": 1,
  "text": "Get Groceries from the store",
  "completed": false
}

Perform POST Curl command to add a bunch of notes to play around by changing text in the payload. You should also notice the response status code 201 Created if you are consuming in any program or sending query via Postman client

Get Paginated list of Notes via cURL using GET

Run the following command in the command terminal to retrieve a collection of Notes.

Request

curl -X GET "http://localhost:8000/notes/?skip=0&take=20"

Response body

[
  {
    "id": 1,
    "text": "Get Groceries from the store",
    "completed": false
  },
  {
    "id": 2,
    "text": "Plan a lavish dinner at Restaurant",
    "completed": false
  },
  {
    "id": 3,
    "text": "Finish Physics Assignment",
    "completed": false
  }
]

Get a single Note given its Id via cURL using GET

Run the following command in the command terminal to retrieve a Note given the Note id.

Request

curl -X GET "http://localhost:8000/notes/2/"

Response body

{
    "id": 2,
    "text": "Plan a lavish dinner at Restaurant",
    "completed": false
}

Update a single Note given its Id via cURL using PUT

Run the following command in the command terminal to modify a Note given the Note id and updated properties with updated values in Payload.

Request

curl -X PUT "http://localhost:8000/notes/2/" ^
 -H "accept: application/json" ^
 -H "Content-Type: application/json" ^
 -d "{\"text\":\"Plan a lavish dinner at Restaurant\",\"completed\":true}"

Response body

{
  "id": 2,
  "text": "Plan a lavish dinner at Restaurant",
  "completed": true
}

Here, I have successfully completed (marked it true) planning a lavish dinner at Restaurant and can’t wait to enjoy the food.

Delete a single Note given its Id via cURL using DELETE

Run the following command in the command terminal to permanently delete a Note given the its id.

Request

curl -X DELETE "http://localhost:8000/notes/2/" -H "accept: application/json"

Response body

{
  "message": "Note with id: 2 deleted successfully!"
}

At any point you find something thats not working as expected, you can debug the FasAPI application in Visual Studio Code IDE and see root cause of the problem.

Video

Implementing Async REST APIs in Python using FastAPI with PostgreSQL CRUD – TutLinks

Navule Pavan Kumar Rao

I am a Full Stack Software Engineer with the Product Development experience in Banking, Finance, Corporate Tax and Automobile domains. I use SOLID Programming Principles and Design Patterns and Architect Software Solutions that scale using C#, .NET, Python, PHP and TDD. I am an expert in deployment of the Software Applications to Cloud Platforms such as Azure, GCP and non cloud On-Premise Infrastructures using shell scripts that become a part of CI/CD. I pursued Executive M.Tech in Data Science from IIT, Hyderabad (Indian Institute of Technology, Hyderabad) and hold B.Tech in Electonics and Communications Engineering from Vaagdevi Institute of Technology & Science.

This Post Has 4 Comments

Leave a Reply