Using SQLAlchemy and Alembic with FastAPI
In the last post we set up VSCode and Docker Compose so that we have a pleasant experience developing our FastAPI application. What’s missing is our datastore, a Postgres database, which we will add as part of this article.
We will use SQLAlchemy’s ORM to talk to the database and let Alembic handle our database migrations.
Then we will implement our API endpoints and see how Pydantic is used for data validation and generating HTTP responses directly from our SQLAlchemy models.
For this article we assume the reader already knows SQLAlchemy and how Pydantic is used as part of FastAPI. If not, we recommend to read FastAPI’s SQL Tutorial first.
A note on async
We are using
sqlalchemy<1.4
withpsycopg2
here, so querying the database will block the current worker. Maybe we will do another blog post to have a look at FastAPI + SQLAlchemy 2.0 once it’s stable.
Application Scope
Before we have a look at the different steps we have to do, let’s first talk about what exactly we will actually build.
Users will be able to browse all kinds of products, similar to Amazon. Those products are offered by different stores and can have different prices. A user can then order an arbitrary number of products from an arbitrary number of stores.
The application we build should serve as a project skeleton, not a production ready app, so we will keep it very simple. There will be only CRUD
(create, read, update, delete) functionality including the following limitations:
- no user management (maybe we will add one including a blog post later on)
- every user can create/update/delete stores and products (e.g. it’s not restricted to a store owner)
- orders won’t have a status (e.g.
in preparation
,fulfilled
) and cannot be canceled or updated - products have infinite stock
Database Design
For our database schema we will create four tables:
stores
products
orders
order_items
In our products
table we will store the different items that can be ordered. Those always belong to a store
. When an order
is created, we create a new entry in the orders
table which will only have an ID and a date for when the order was made.
To have a list of products
that were ordered, we create an additional table, order_items
, in which we will have the link to products
and the order
. Our database schema will then look like this:
Adding Postgres to the Docker Compose Setup
With Docker Compose already in place, adding Postgres is quite simple:
version: "3.8"
services:
api:
image: orders_api:latest
ports:
- "8000:8000"
command: uvicorn --reload --host 0.0.0.0 --port 8000 orders_api.main:app
volumes:
- .:/workspace:z
depends_on:
- db
environment:
DATABASE_URL: "postgresql://postgres:mypassword@db/orders_api_db"
db:
image: postgres:13
ports:
- "2345:5432"
environment:
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "mypassword"
POSTGRES_DB: "orders_api_db"
The connection info, the database url, is passed es environment
variable to our api service. There we can use it to connect to Postgres via SQLAlchemy.
Application Configuration
To configure our application we will use environment variables, like already defined for the database connection info. Using this method for our application configuration is following The Twelve-Factor App and we will also be able to use this once we deploy our app to Heroku.
Reading Environment Variables in Python
Reading environment variables in Python can be done with os.getenv
from the standard library
database_url = os.getenv("DATABASE_URL")
but a nicer way here is using Pydantic’s settings management.
Reading Settings with Pydantic
Pydantic is a library for data validation and settings management and is already part of our dependencies since it is used by FastAPI.
To read the settings with Pydantic, we have to create a class that inherits from Pydantic’s BaseSettings
:
from functools import lru_cache
from pydantic import BaseSettings, PostgresDsn
class Settings(BaseSettings):
database_url: PostgresDsn
@lru_cache
def get_settings() -> Settings:
settings = Settings()
return settings
This will not only read the DATABASE_URL
environment variable, but also validate that its value is a valid Postgres URL.
Setting up SQLAlchemy
With the connection info available, we can now implement how we want to connect to our database. We will use SQLAlchemy’s scoped_session
for this, like described in its documentation, and create a dependency. This dependency will take care of creating a session at the beginning of a web request and close it at the end.
Creating SQLAlchemy Sessions
We will create database related code in a new subpackage, db
, and configure SQLAlchemy in a session.py
module:
from functools import lru_cache
from typing import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from orders_api.config import get_settings
engine = create_engine(get_settings().database_url, pool_pre_ping=True)
@lru_cache
def create_session() -> scoped_session:
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine)
)
return Session
def get_session() -> Generator[scoped_session, None, None]:
Session = create_session()
try:
yield Session
finally:
Session.remove()
Note that we don’t have to explicitly create a session but can directly return the scoped_session
.
Next we will create our first database tables.
Defining the Models
In a new models.py
module we define our models by creating classes that inherit from the declarative Base
class.
import uuid
from typing import Any
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql.schema import ForeignKey, UniqueConstraint
Base: Any = declarative_base()
class Store(Base):
__tablename__ = "stores"
store_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
city = sa.Column(sa.Text, nullable=False)
country = sa.Column(sa.Text, nullable=False)
currency = sa.Column(sa.String(3), nullable=False)
domain = sa.Column(sa.Text)
name = sa.Column(sa.Text, nullable=False)
phone = sa.Column(sa.Text)
street = sa.Column(sa.Text, nullable=False)
zipcode = sa.Column(sa.Text, nullable=False)
class Product(Base):
__tablename__ = "products"
product_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
store_id = sa.Column(ForeignKey("stores.store_id"), nullable=False)
store = relationship("Store", backref="products")
name = sa.Column(sa.Text, nullable=False)
price = sa.Column(sa.Numeric(12, 2), nullable=False)
__table_args__ = (UniqueConstraint("name", "store_id", name="uix_products"),)
class Order(Base):
__tablename__ = "orders"
order_id = sa.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
date = sa.Column(sa.DateTime, server_default=sa.func.now(), nullable=False)
items = relationship("OrderItem", backref="order")
class OrderItem(Base):
__tablename__ = "order_items"
order_id = sa.Column(ForeignKey("orders.order_id"), primary_key=True)
product_id = sa.Column(ForeignKey("products.product_id"), primary_key=True)
product = relationship("Product", uselist=False)
quantity = sa.Column(sa.Integer, nullable=False)
In addition to the table columns, we also added relationship()
attributes for nicer access to related rows from child/parent tables.
products = store.products
# instead of
products = db_session.query(Product).filter_by(store_id=store.store_id).all()
Those relationships can also be used with our Pydantic schemas, via Pydantic's orm_mode
:
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field, HttpUrl
from pydantic.types import UUID4, condecimal, constr
def to_camel(string: str) -> str:
if "_" not in string:
return string
words = string.split("_")
words = [words[0]] + [word.capitalize() for word in words[1:]]
return "".join(words)
class ProductBase(BaseModel):
class Config:
alias_generator = to_camel
allow_population_by_field_name = True
class ProductUpdate(ProductBase):
name: Optional[str]
price: Optional[condecimal(decimal_places=2)] # type: ignore
class ProductCreate(ProductBase):
store_id: UUID4
name: str
price: condecimal(decimal_places=2) # type: ignore
class Product(ProductCreate):
product_id: UUID4
class Config:
orm_mode = True
class OrderItemCreate(BaseModel):
product_id: UUID4
quantity: int = Field(..., gt=0)
class Config:
orm_mode = True
alias_generator = to_camel
allow_population_by_field_name = True
class OrderCreate(BaseModel):
items: List[OrderItemCreate]
class Order(BaseModel):
order_id: UUID4
date: datetime
class Config:
orm_mode = True
alias_generator = to_camel
allow_population_by_field_name = True
class OrderItem(BaseModel):
quantity: int = Field(..., gt=0)
product: Product
class Config:
orm_mode = True
alias_generator = to_camel
allow_population_by_field_name = True
class OrderDetail(Order):
order_id: UUID4
date: datetime
items: List[OrderItem]
Here we have the schemas
for Product
and Order
and how we want to have them represented in our API.
We have a to_camel
function we use as an alias_generator
so that fields like store_id
will be represented as storeId
. To enable that request payloads with fields like storeId
can correctly be transformed to our Pydantic schemas, we have to use allow_population_by_field_name = True
. Otherwise, only store_id
would be recognized.
The orm_mode
enables that we can return instances of our SQLAlchemy models directly from our view functions without having to serialize anything manually. Like already said, this also takes care of relationships. For example for our OrderDetail
:
>>> from orders_api.db.session import *
>>> from orders_api.db.models import *
>>> from orders_api.db import schemas
>>> db_session = create_session()
>>> order = db_session.query(Order).first()
>>> order.items
[<orders_api.db.models.OrderItem object at 0x7f24c0edce20>, <orders_api.db.models.OrderItem object at 0x7f24c0edcee0>]
>>> order.items[0].product
<orders_api.db.models.Product object at 0x7f24c0e67760>
>>> order_detail = schemas.OrderDetail.from_orm(order)
>>> order_detail
OrderDetail(
order_id=UUID('7fe3625d-9f67-441e-a6e4-93ff1c89ab4e'),
date=datetime.datetime(2021, 1, 13, 10, 15, 29, 789684),
items=[
OrderItem(quantity=2, product=Product(store_id=UUID('457ff44a-6be5-4585-8a4f-6707332265ca'), name='SomeProduct', price=Decimal('1.00'), product_id=UUID('d1882432-5acc-42c7-85a6-f5e1cd419fb1'))),
OrderItem(quantity=1, product=Product(store_id=UUID('776dc442-d256-44e8-b1af-a0a758db5370'), name='PS 6', price=Decimal('356.44'), product_id=UUID('6821eb84-8870-4c68-9e70-9a099bb80814')))
]
)
Reusable CRUD Service
By now we have all the pieces to implement our API endpoints. For example for a /stores
endpoint, that returns a list of all available Store
s, we could write:
from typing import List
from fastapi.routing import APIRouter, Depends
from sqlalchemy.orm import Session
from orders_api.db import models, schemas, session
router = APIRouter(prefix="/stores")
@router.get("/", response_model=List[schemas.Store])
async def list_stores(
db_session: Session = Depends(session.get_session),
) -> List[models.Store]:
return db_session.query(models.Store).all()
With the get_session
dependency we get our SQLAlchemy session which we then use to get a list of models.Store
instances for all stores from the database. This list is returned and FastAPI takes care of generating the desired response format using our Store
s schema.
For listing all Product
s, the implementation would look exactly the same (besides using the Product
model and schema). This would also be the case for the other operations, like create. So we can abstract away that functionality and create a generic base class, with the possibility to initialize it with different models and schemas.
from typing import Any, Generic, List, Optional, Type, TypeVar
import sqlalchemy
from pydantic import BaseModel
from sqlalchemy.orm import Session
from starlette.exceptions import HTTPException
from orders_api.db.models import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseService(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType], db_session: Session):
self.model = model
self.db_session = db_session
def get(self, id: Any) -> Optional[ModelType]:
obj: Optional[ModelType] = self.db_session.query(self.model).get(id)
if obj is None:
raise HTTPException(status_code=404, detail="Not Found")
return obj
def list(self) -> List[ModelType]:
objs: List[ModelType] = self.db_session.query(self.model).all()
return objs
def create(self, obj: CreateSchemaType) -> ModelType:
db_obj: ModelType = self.model(**obj.dict())
self.db_session.add(db_obj)
try:
self.db_session.commit()
except sqlalchemy.exc.IntegrityError as e:
self.db_session.rollback()
if "duplicate key" in str(e):
raise HTTPException(status_code=409, detail="Conflict Error")
else:
raise e
return db_obj
def update(self, id: Any, obj: UpdateSchemaType) -> Optional[ModelType]:
db_obj = self.get(id)
for column, value in obj.dict(exclude_unset=True).items():
setattr(db_obj, column, value)
self.db_session.commit()
return db_obj
def delete(self, id: Any) -> None:
db_obj = self.db_session.query(self.model).get(id)
self.db_session.delete(db_obj)
self.db_session.commit()
Here we have a base class expecting a SQLAlchemy session, a model class and also has extra type information about the necessary Pydantic schemas. Based on those generic infos it implements the CRUD functions and even has some error handling we otherwise would have to implement in each view function separately.
With this base class we can create the specific services:
from fastapi import Depends
from sqlalchemy.orm import Session
from orders_api.db.models import Store
from orders_api.db.schemas import StoreCreate, StoreUpdate
from orders_api.db.session import get_session
from .base import BaseService
class StoresService(BaseService[Store, StoreCreate, StoreUpdate]):
def __init__(self, db_session: Session):
super(StoresService, self).__init__(Store, db_session)
def get_stores_service(db_session: Session = Depends(get_session)) -> StoresService:
return StoresService(db_session)
Our list_stores
function can now be written as:
@router.get("/", response_model=List[Store])
async def list_stores(
store_service: StoresService = Depends(get_stores_service),
) -> List[models.Store]:
return store_service.list()
Besides the possibility to reuse the base implementation and have error handling at one place instead of in each view function, this has some more nice properties:
- the services/database code can be tested separated from the view functions
- in case you want to migrate to a totally different kind of datastore, for example Parquet files, you would only have to extend your service implementation:
- having an abstract base class defining the main interface
- one specialized class for each service, for example a
SQLAlchemyStoresService
andParquetStoresService
- based on the desired storage backend the
get_stores_service
dependency returns the correct implementation
In case the method of the base class is not sufficient, e.g. that’s the case for our OrdersService
, we can simply overwrite it:
class OrdersService(BaseService[Order, OrderCreate, Any]):
def __init__(self, db_session: Session):
super(OrdersService, self).__init__(Order, db_session)
def create(self, obj: OrderCreate) -> Order:
order = Order()
self.db_session.add(order)
self.db_session.flush()
order_items = [
OrderItem(**item.dict(), order_id=order.order_id) for item in obj.items
]
self.db_session.add_all(order_items)
self.db_session.commit()
return order
Now we have our endpoints and all the code to interact with the database, but we still have to create the tables in our database.
Database Migrations with Alembic
To create our database tables and do migrations in case there are any changes/additions of the database schema, we use Alembic
. We can initialize it with
docker-compose run --rm api alembic init src/orders_api/migrations
This creates a config file alembic.ini
in our workspace and a folder src/orders_api/migrations
.
In alembic.ini
we remove the line
sqlalchemy.url = driver://user:pass@localhost/dbname
and src/orders_api/migrations/env.py
we change to:
from logging.config import fileConfig
from os import getenv
from alembic import context
from orders_api.db.models import Base
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
context.configure(
url=getenv("DATABASE_URL"),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
from orders_api.db.session import engine
connectable = engine
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
...
We set the target_metadata
to the metadata of our Base
class and use the engine
from our session.py
module. With that, Alembic
will use the correct connection info and also has the info about our models that are inheriting from Base
.
That let’s alembic generate migration scripts directly from the changes we apply to our models:
docker-compose run --rm api alembic revision --autogenerate -m "create inital tables"
Autogenerate works pretty well, but make sure to always check the created scripts.
The only thing left to do is applying the migrations:
docker-compose run --rm api alembic upgrade head
Writing Tests
We wouldn’t be done if we would not test our code. For our tests we want to
- have a separated database (not the development database)
- run each test in isolation
So we have to create our test database before running any of the tests using the database and before each test make sure that we have a clean database state. To achieve that we will write one Pytest fixture to create the database with the help of sqlalchemy-utils and one that will clean up the tables. The former will run only once before all tests (session
scope), the latter will run before each test (function
scope).
from decimal import Decimal
from typing import Any, Generator
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from sqlalchemy_utils import create_database, database_exists
from orders_api.app import create_app
from orders_api.db.models import Base, Order, OrderItem, Product, Store
from orders_api.db.session import create_session
@pytest.fixture(scope="session")
def db() -> Session:
db_session = create_session()
engine = db_session.bind
if not database_exists(engine.url):
create_database(engine.url)
Base.metadata.bind = engine
Base.metadata.drop_all()
Base.metadata.create_all()
return db_session
@pytest.fixture()
def cleanup_db(db: Session) -> None:
for table in reversed(Base.metadata.sorted_tables):
db.execute(table.delete())
@pytest.fixture()
def app_client(cleanup_db: Any) -> Generator[TestClient, None, None]:
app = create_app()
yield TestClient(app)
The db
fixture ensures that the test database exists and also creates all tables. The cleanup_db
fixture truncates all tables.
To easily configure a different database when running the tests, we extend our settings code:
@lru_cache
def get_settings() -> Settings:
settings = Settings()
if (db := getenv("POSTGRES_DB")) is not None:
settings.database_url = PostgresDsn.build(
scheme="postgresql",
user=settings.database_url.user,
password=settings.database_url.password,
host=settings.database_url.host,
port=settings.database_url.port,
path=f"/{db}",
)
return settings
With that the name of the test database can be passed as environment variable:
docker-compose run --rm -e POSTGRES_DB=orders_api_testdb api py.test -sv tests
Now finally the test including setting up fixture data:
@pytest.fixture()
def create_store(db: Session) -> Generator[Store, None, None]:
store = Store(
name="TechStuff Online",
city="Karlsruhe",
country="Germany",
currency="EUR",
zipcode="76131",
street="Kaiserstr. 42",
)
db.add(store)
db.flush()
yield store
db.rollback()
def test_get(app_client: TestClient, create_store: Store) -> None:
rv = app_client.get(f"/stores/{create_store.store_id}")
stores = rv.json()
assert rv.status_code == 200
assert stores["name"] == "TechStuff Online
The complete code can be found on github. Remember that this article was meant as a reference for how we usually set up different pieces when we start developing a new FastAPI service using SQLAlchemy. For detailed tutorials, head over to the FastAPI documentation which even has a section with additional ressources.
In the next post, we will start implementing the UI with Nuxt and Vuetify.