Using SQLAlchemy and Alembic with FastAPI

Creating a fullstack application with FastAPI, Nuxt and Postgres - part 2

published on January 16, 2021

In the last post we set up VSCode and Docker Compose so that we have a pleasant experience developing our FastAPI application. What's missing is our datastore, a Postgres database, which we will add as part of this article.

We will use SQLAlchemy's ORM to talk to the database and let Alembic handle our database migrations.

Then we will implement our API endpoints and see how Pydantic is used for data validation and generating HTTP responses directly from our SQLAlchemy models.

For this article we assume the reader already knows SQLAlchemy and how Pydantic is used as part of FastAPI. If not, we recommend to read FastAPI's SQL Tutorial first.

A note on async

We are using sqlalchemy<1.4 with psycopg2 here, so querying the database will block the current worker. Maybe we will do another blog post to have a look at FastAPI + SQLAlchemy 2.0 once it's stable.

Application Scope

Before we have a look at the different steps we have to do, let's first talk about what exactly we will actually build.

Users will be able to browse all kinds of products, similar to Amazon. Those products are offered by different stores and can have different prices. A user can then order an arbitrary number of products from an arbitrary number of stores.

The application we build should serve as a project skeleton, not a production ready app, so we will keep it very simple. There will be only CRUD (create, read, update, delete) functionality including the following limitations:

  • no user management (maybe we will add one including a blog post later on)
  • every user can create/update/delete stores and products (e.g. it's not restricted to a store owner)
  • orders won't have a status (e.g. in preparation, fulfilled) and cannot be canceled or updated
  • products have infinite stock

Database Design

For our database schema we will create four tables:

  • stores
  • products
  • orders
  • order_items

In our products table we will store the different items that can be ordered. Those always belong to a store. When an order is created, we create a new entry in the orders table which will only have an ID and a date for when the order was made.

To have a list of products that were ordered, we create an additional table, order_items, in which we will have the link to products and the order. Our database schema will then look like this:

Orders API Database Schema

Adding Postgres to the Docker Compose Setup

With Docker Compose already in place, adding Postgres is quite simple:

docker-compose.yml
version: "3.8"

services:
  api:
      image: orders_api:latest
      ports:
        - "8000:8000"
      command: uvicorn --reload --host 0.0.0.0 --port 8000 orders_api.main:app
      volumes:
        - .:/workspace:z
      depends_on:
        - db
      environment:
        DATABASE_URL: "postgresql://postgres:mypassword@db/orders_api_db"

  db:
    image: postgres:13
    ports:
      - "2345:5432"
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "mypassword"
      POSTGRES_DB: "orders_api_db"

The connection info, the database url, is passed es environment variable to our api service. There we can use it to connect to Postgres via SQLAlchemy.

Application Configuration

To configure our application we will use environment variables, like already defined for the database connection info. Using this method for our application configuration is following The Twelve-Factor App and we will also be able to use this once we deploy our app to Heroku.

Reading Environment Variables in Python

Reading environment variables in Python can be done with os.getenv from the standard library

database_url = os.getenv("DATABASE_URL")

but a nicer way here is using Pydantic's settings management.

Reading Settings with Pydantic

Pydantic is a library for data validation and settings management and is already part of our dependencies since it is used by FastAPI.

To read the settings with Pydantic, we have to create a class that inherits from Pydantic's BaseSettings:

src/orders_api/config.py
from functools import lru_cache

from pydantic import BaseSettings, PostgresDsn

class Settings(BaseSettings):
    database_url: PostgresDsn

@lru_cache
def get_settings() -> Settings:
    settings = Settings()
    return settings

This will not only read the DATABASE_URL environment variable, but also validate that its value is a valid Postgres URL.

Setting up SQLAlchemy

With the connection info available, we can now implement how we want to connect to our database. We will use SQLAlchemy's scoped_session for this, like described in its documentation, and create a dependency. This dependency will take care of creating a session at the beginning of a web request and close it at the end.

Creating SQLAlchemy Sessions

We will create database related code in a new subpackage, db, and configure SQLAlchemy in a session.py module:

src/orders_api/db/session.py
from functools import lru_cache
from typing import Generator

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

from orders_api.config import get_settings

engine = create_engine(get_settings().database_url, pool_pre_ping=True)

@lru_cache
def create_session() -> scoped_session:
    Session = scoped_session(
        sessionmaker(autocommit=False, autoflush=False, bind=engine)
    )
    return Session

def get_session() -> Generator[scoped_session, None, None]:
    Session = create_session()
    try:
        yield Session
    finally:
        Session.remove()

Note that we don't have to explicitly create a session but can directly return the scoped_session.

Next we will create our first database tables.

Defining the Models

In a new models.py module we define our models by creating classes that inherit from the declarative Base class.

src/orders_api/db/models.py
import uuid
from typing import Any

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql.schema import ForeignKey, UniqueConstraint

Base: Any = declarative_base()

class Store(Base):
    __tablename__ = "stores"

    store_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    city = sa.Column(sa.Text, nullable=False)
    country = sa.Column(sa.Text, nullable=False)
    currency = sa.Column(sa.String(3), nullable=False)
    domain = sa.Column(sa.Text)
    name = sa.Column(sa.Text, nullable=False)
    phone = sa.Column(sa.Text)
    street = sa.Column(sa.Text, nullable=False)
    zipcode = sa.Column(sa.Text, nullable=False)

class Product(Base):
    __tablename__ = "products"

    product_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    store_id = sa.Column(ForeignKey("stores.store_id"), nullable=False)
    store = relationship("Store", backref="products")
    name = sa.Column(sa.Text, nullable=False)
    price = sa.Column(sa.Numeric(12, 2), nullable=False)

    __table_args__ = (UniqueConstraint("name", "store_id", name="uix_products"),)

class Order(Base):
    __tablename__ = "orders"

    order_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    date = sa.Column(sa.DateTime, server_default=sa.func.now(), nullable=False)
    items = relationship("OrderItem", backref="order")

class OrderItem(Base):
    __tablename__ = "order_items"

    order_id = sa.Column(ForeignKey("orders.order_id"), primary_key=True)
    product_id = sa.Column(ForeignKey("products.product_id"), primary_key=True)
    product = relationship("Product", uselist=False)
    quantity = sa.Column(sa.Integer, nullable=False)

In addition to the table columns, we also added relationship() attributes for nicer access to related rows from child/parent tables.

products = store.products

# instead of
products = db_session.query(Product).filter_by(store_id=store.store_id).all()

Those relationships can also be used with our Pydantic schemas, via Pydantic's orm_mode:

from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, Field, HttpUrl
from pydantic.types import UUID4, condecimal, constr

def to_camel(string: str) -> str:
    if "_" not in string:
        return string
    words = string.split("_")
    words = [words[0]] + [word.capitalize() for word in words[1:]]
    return "".join(words)

class ProductBase(BaseModel):
    class Config:
        alias_generator = to_camel
        allow_population_by_field_name = True

class ProductUpdate(ProductBase):
    name: Optional[str]
    price: Optional[condecimal(decimal_places=2)]  # type: ignore

class ProductCreate(ProductBase):
    store_id: UUID4
    name: str
    price: condecimal(decimal_places=2)  # type: ignore

class Product(ProductCreate):
    product_id: UUID4

    class Config:
        orm_mode = True

class OrderItemCreate(BaseModel):
    product_id: UUID4
    quantity: int = Field(..., gt=0)

    class Config:
        orm_mode = True
        alias_generator = to_camel
        allow_population_by_field_name = True

class OrderCreate(BaseModel):
    items: List[OrderItemCreate]

class Order(BaseModel):
    order_id: UUID4
    date: datetime

    class Config:
        orm_mode = True
        alias_generator = to_camel
        allow_population_by_field_name = True

class OrderItem(BaseModel):
    quantity: int = Field(..., gt=0)
    product: Product

    class Config:
        orm_mode = True
        alias_generator = to_camel
        allow_population_by_field_name = True

class OrderDetail(Order):
    order_id: UUID4
    date: datetime
    items: List[OrderItem]

Here we have the schemas for Product and Order and how we want to have them represented in our API.

We have a to_camel function we use as an alias_generator so that fields like store_id will be represented as storeId. To enable that request payloads with fields like storeId can correctly be transformed to our Pydantic schemas, we have to use allow_population_by_field_name = True. Otherwise, only store_id would be recognized.

The orm_mode enables that we can return instances of our SQLAlchemy models directly from our view functions without having to serialize anything manually. Like already said, this also takes care of relationships. For example for our OrderDetail:

>>> from orders_api.db.session import *
>>> from orders_api.db.models import *
>>> from orders_api.db import schemas
>>> db_session = create_session()
>>> order = db_session.query(Order).first()
>>> order.items
[<orders_api.db.models.OrderItem object at 0x7f24c0edce20>, <orders_api.db.models.OrderItem object at 0x7f24c0edcee0>]
>>> order.items[0].product
<orders_api.db.models.Product object at 0x7f24c0e67760>
>>> order_detail = schemas.OrderDetail.from_orm(order)
>>> order_detail
OrderDetail(
    order_id=UUID('7fe3625d-9f67-441e-a6e4-93ff1c89ab4e'),
    date=datetime.datetime(2021, 1, 13, 10, 15, 29, 789684),
    items=[
        OrderItem(quantity=2, product=Product(store_id=UUID('457ff44a-6be5-4585-8a4f-6707332265ca'), name='SomeProduct', price=Decimal('1.00'), product_id=UUID('d1882432-5acc-42c7-85a6-f5e1cd419fb1'))),
        OrderItem(quantity=1, product=Product(store_id=UUID('776dc442-d256-44e8-b1af-a0a758db5370'), name='PS 6', price=Decimal('356.44'), product_id=UUID('6821eb84-8870-4c68-9e70-9a099bb80814')))
    ]
)

Reusable CRUD Service

By now we have all the pieces to implement our API endpoints. For example for a /stores endpoint, that returns a list of all available Stores, we could write:

from typing import List
from fastapi.routing import APIRouter, Depends
from sqlalchemy.orm import Session

from orders_api.db import models, schemas, session

router = APIRouter(prefix="/stores")

@router.get("/", response_model=List[schemas.Store])
async def list_stores(
    db_session: Session = Depends(session.get_session),
) -> List[models.Store]:
    return db_session.query(models.Store).all()

With the get_session dependency we get our SQLAlchemy session which we then use to get a list of models.Store instances for all stores from the database. This list is returned and FastAPI takes care of generating the desired response format using our Stores schema.

For listing all Products, the implementation would look exactly the same (besides using the Product model and schema). This would also be the case for the other operations, like create. So we can abstract away that functionality and create a generic base class, with the possibility to initialize it with different models and schemas.

src/orders_api/services/base.py
from typing import Any, Generic, List, Optional, Type, TypeVar

import sqlalchemy
from pydantic import BaseModel
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException

from orders_api.db.models import Base

ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)

class BaseService(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
    def __init__(self, model: Type[ModelType], db_session: Session):
        self.model = model
        self.db_session = db_session

    def get(self, id: Any) -> Optional[ModelType]:
        obj: Optional[ModelType] = self.db_session.query(self.model).get(id)
        if obj is None:
            raise HTTPException(status_code=404, detail="Not Found")
        return obj

    def list(self) -> List[ModelType]:
        objs: List[ModelType] = self.db_session.query(self.model).all()
        return objs

    def create(self, obj: CreateSchemaType) -> ModelType:
        db_obj: ModelType = self.model(**obj.dict())
        self.db_session.add(db_obj)
        try:
            self.db_session.commit()
        except sqlalchemy.exc.IntegrityError as e:
            self.db_session.rollback()
            if "duplicate key" in str(e):
                raise HTTPException(status_code=409, detail="Conflict Error")
            else:
                raise e
        return db_obj

    def update(self, id: Any, obj: UpdateSchemaType) -> Optional[ModelType]:
        db_obj = self.get(id)
        for column, value in obj.dict(exclude_unset=True).items():
            setattr(db_obj, column, value)
        self.db_session.commit()
        return db_obj

    def delete(self, id: Any) -> None:
        db_obj = self.db_session.query(self.model).get(id)
        self.db_session.delete(db_obj)
        self.db_session.commit()

Here we have a base class expecting a SQLAlchemy session, a model class and also has extra type information about the necessary Pydantic schemas. Based on those generic infos it implements the CRUD functions and even has some error handling we otherwise would have to implement in each view function separately.

With this base class we can create the specific services:

src/orders_api/services/stores.py
from fastapi import Depends
from sqlalchemy.orm import Session

from orders_api.db.models import Store
from orders_api.db.schemas import StoreCreate, StoreUpdate
from orders_api.db.session import get_session

from .base import BaseService

class StoresService(BaseService[Store, StoreCreate, StoreUpdate]):
    def __init__(self, db_session: Session):
        super(StoresService, self).__init__(Store, db_session)

def get_stores_service(db_session: Session = Depends(get_session)) -> StoresService:
    return StoresService(db_session)

Our list_stores function can now be written as:

@router.get("/", response_model=List[Store])
async def list_stores(
    store_service: StoresService = Depends(get_stores_service),
) -> List[models.Store]:
    return store_service.list()

Besides the possibility to reuse the base implementation and have error handling at one place instead of in each view function, this has some more nice properties:

  • the services/database code can be tested separated from the view functions
  • in case you want to migrate to a totally different kind of datastore, for example Parquet files, you would only have to extend your service implementation:
    • having an abstract base class defining the main interface
    • one specialized class for each service, for example a SQLAlchemyStoresService and ParquetStoresService
    • based on the desired storage backend the get_stores_service dependency returns the correct implementation

In case the method of the base class is not sufficient, e.g. that's the case for our OrdersService, we can simply overwrite it:

class OrdersService(BaseService[Order, OrderCreate, Any]):
    def __init__(self, db_session: Session):
        super(OrdersService, self).__init__(Order, db_session)

    def create(self, obj: OrderCreate) -> Order:
        order = Order()
        self.db_session.add(order)
        self.db_session.flush()
        order_items = [
            OrderItem(**item.dict(), order_id=order.order_id) for item in obj.items
        ]
        self.db_session.add_all(order_items)
        self.db_session.commit()
        return order

Now we have our endpoints and all the code to interact with the database, but we still have to create the tables in our database.

Database Migrations with Alembic

To create our database tables and do migrations in case there are any changes/additions of the database schema, we use Alembic. We can initialize it with

docker-compose run --rm api alembic init src/orders_api/migrations

This creates a config file alembic.ini in our workspace and a folder src/orders_api/migrations.

In alembic.ini we remove the line

sqlalchemy.url = driver://user:pass@localhost/dbname

and src/orders_api/migrations/env.py we change to:

src/orders_api/migrations/env.py
from logging.config import fileConfig
from os import getenv

from alembic import context

from orders_api.db.models import Base

config = context.config
fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline():
    context.configure(
        url=getenv("DATABASE_URL"),
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    from orders_api.db.session import engine

    connectable = engine
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        with context.begin_transaction():
            context.run_migrations()

...

We set the target_metadata to the metadata of our Base class and use the engine from our session.py module. With that, Alembic will use the correct connection info and also has the info about our models that are inheriting from Base.

That let's alembic generate migration scripts directly from the changes we apply to our models:

docker-compose run --rm api alembic revision --autogenerate -m "create inital tables"

Autogenerate works pretty well, but make sure to always check the created scripts.

The only thing left to do is applying the migrations:

docker-compose run --rm api alembic upgrade head

Writing Tests

We wouldn't be done if we would not test our code. For our tests we want to

  • have a separated database (not the development database)
  • run each test in isolation

So we have to create our test database before running any of the tests using the database and before each test make sure that we have a clean database state. To achieve that we will write one Pytest fixture to create the database with the help of sqlalchemy-utils and one that will clean up the tables. The former will run only once before all tests (session scope), the latter will run before each test (function scope).

tests/conftest.py
from decimal import Decimal
from typing import Any, Generator

import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from sqlalchemy_utils import create_database, database_exists

from orders_api.app import create_app
from orders_api.db.models import Base, Order, OrderItem, Product, Store
from orders_api.db.session import create_session

@pytest.fixture(scope="session")
def db() -> Session:
    db_session = create_session()
    engine = db_session.bind
    if not database_exists(engine.url):
        create_database(engine.url)
    Base.metadata.bind = engine
    Base.metadata.drop_all()
    Base.metadata.create_all()
    return db_session

@pytest.fixture()
def cleanup_db(db: Session) -> None:
    for table in reversed(Base.metadata.sorted_tables):
        db.execute(table.delete())

@pytest.fixture()
def app_client(cleanup_db: Any) -> Generator[TestClient, None, None]:
    app = create_app()
    yield TestClient(app)

The db fixture ensures that the test database exists and also creates all tables. The cleanup_db fixture truncates all tables.

To easily configure a different database when running the tests, we extend our settings code:

@lru_cache
def get_settings() -> Settings:
    settings = Settings()
    if (db := getenv("POSTGRES_DB")) is not None:
        settings.database_url = PostgresDsn.build(
            scheme="postgresql",
            user=settings.database_url.user,
            password=settings.database_url.password,
            host=settings.database_url.host,
            port=settings.database_url.port,
            path=f"/{db}",
        )
    return settings

With that the name of the test database can be passed as environment variable:

docker-compose run --rm -e POSTGRES_DB=orders_api_testdb api py.test -sv tests

Now finally the test including setting up fixture data:

@pytest.fixture()
def create_store(db: Session) -> Generator[Store, None, None]:
    store = Store(
        name="TechStuff Online",
        city="Karlsruhe",
        country="Germany",
        currency="EUR",
        zipcode="76131",
        street="Kaiserstr. 42",
    )
    db.add(store)
    db.flush()
    yield store
    db.rollback()

def test_get(app_client: TestClient, create_store: Store) -> None:
    rv = app_client.get(f"/stores/{create_store.store_id}")
    stores = rv.json()
    assert rv.status_code == 200
    assert stores["name"] == "TechStuff Online

The complete code can be found on github. Remember that this article was meant as a reference for how we usually set up different pieces when we start developing a new FastAPI service using SQLAlchemy. For detailed tutorials, head over to the FastAPI documentation which even has a section with additional ressources.

In the next post, we will start implementing the UI with Nuxt and Vuetify.