Основы FastAPI
Чем займемся?
В данном посте постараемся разобраться с основами FastAPI, поговорим про API вообще и про REST API в частности, затронем тему асинхронной разработки и тестирования. Конечная цель — иметь репозиторий с небольшим проектом-заготовкой написанным по всем канонам, с использованием FastAPI, асинхронной работой с БД и покрытым тестами с использованием pytest. В сети множества такого рода репозиториев можно найти, но в этом посте я постараюсь не просто описывать как делать в плане кода, но и пояснять почему. Поехали.
Асинхронное программирование
Асинхронная разработка — одна из самых сложных тем на мой взгляд, но существует множество библиотек, реализующих асинхронную работу и делающую жизнь разработчика довольно простой. Наиболее популярной библиотекой на сегодняшний момент является asyncio, которая входит в состав стандартной библиотеки Python.
Самым простым способом реализовать асинхронную работу — внедрить потоки. Потоки позволяют выполнять целое множество задач в параллели. Подробнее можно почитать в моей статьей про Thread Pool Executor. Но многопоточные программы сразу поднимают сложность разработки на уровень, а то и два, потому что мы должны заботиться с потоками о таких вещах как состояния гонки, блокировки, исчерпание ресурсов и пр.
Для того, чтобы потоки и процессы могли получать совместный доступ к ресурсам, процессор все время с некоторой определенной периодичностью сохраняет контекст (информацию) потока/процесса и переключается на другой поток/процесс (привет GIL). Так вот, важно понимать, что в асинхронном программировании приложение, а не процессор управляет переключением контекста и задач.
Во многих статьях описывается хороший пример асинхронности — это секретарь в крупной фирме. Секретарь выполняет одновременно сразу множество дел. Предположим, в течение дня он ответственен за бронирование авиарейсов начальству, за заполнение текущей документации компании, за прием входящих звонков и прием документов в бумажном формате. Звонки входящие и документы возникают периодически в течение дня, а основное время секретарь тратит на постоянные переговоры с авиакомпаниями и на заполнение документации. Когда поступает входящий звонок, секретарь просит подождать оператора авиакомпании и переключается на входящий звонок, аналогично секретарь прерывается в заполнении документов, чтобы ответить на звонок, либо принять бумажный документ от коллег. Секретарь — сам решает, когда надо сменить задачу и “переключить контекст. Секретарь — асинхронен.
Как уже упоминал выше, существует множество библиотек для асинхронной работы. Многие из них, например, Tornado — реализует асинхронную работу через callback-и. Callback — это функция, которая должна выполнить после окончания того или иного процесса. Т.е. в том же Tornado если неблокирующие методы, которые выполняют то, что хочет разработчик, например, получают какую-то информацию вызывая API или просто парсят страничку, чтобы обработать ответ полученный от асинхронного метода, в него передается специальный callable объект (callback функция), которая будет вызвана библиотекой по завершению процесса получения данных и уже сделает с ответом полученным то, что хочет разработчик.
Callback-и являлись главным инструментом реализации асинхронной работы до Python 3.3. Для того, чтобы заменить callback-и на что-то более оптимальное были нужны специальные инструменты внутри самого языка, которые бы могли начать выполнять вызов, затем останавливаться, сохранять контекст, возвращать результат. В общем, как уже можно понять — речь про генераторы, которые уже были частью Python. Все, что надо было сделать — написать библиотеку, которая бы реализовывала цикл событий, в котором запускаются генераторы. Так появилась Asyncio.
Asyncio оказалась настолько успешной, что Python включил её в состав языка и ввел два дополнительных слова: async и await. Async говорит о том, что текущий метод является асинхронным, а await говорит, что необходимо дождаться выполнения корутины (сопрограммы).
Сейчас будет немного кода, чтобы лучше понять проблематику. Решаем следующую задачу: необходимо найти сумму элементов списка; есть два списка, так что надо будет вызвать два раза выполнение; в процессе выполнения задачи нам надо что-то сделать длительное по времени (в реальной жизни это могут быть всякие обращения по API к сторонним сервисам и т.п.) — в нашем случае мы будем задержку симулировать функцией sleep. Итак, первое приближение — полностью синхронный код:
import time
def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
sleep()
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()
tasks = [
sum("A", [1, 2]),
sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')
Мы засекаем начало времени, формируем список с задачами, которые необходимо выполнить (суммировать сначала один список, потом второй), в процессе выполнения выводим на экран, что именно сейчас суммируем, в рамках какой задачи и сколько времени занимает, по итогу — выводим общее время выполнения программы. Результат выполнения нашего кода будет следующий:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3Task B: Computing 0+1
Time: 2.00
Task B: Computing 1+2
Time: 3.00
Task B: Computing 3+3
Time: 4.00
Task B: Sum = 6Time: 5.01 sec
Видим, что код выполняется, как и задуманно, полностью синхронно, суммируя элемент за элементом одной задачи, потом переходя на вторую, ну и по итогу работает скрипт 5 секунд. Теперь напишем асинхронный код решающий эту же задачу на asyncio.
import asyncio
import time
async def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
async def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await sleep()
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()end = time.time()
print(f'Time: {end-start:.2f} sec')
Первым делом мы добавляем ключевое слово async к нашим функциям говоря, что они теперь асинхронные. Перед выполнением длительной операции мы ставим ключевое слово await, которое даст понять Python-у, что мы хотим дождаться выполнения корутины. Основной код меняется — мы обращаемся к циклу событий, создаем все те же задачи на суммирование и с помощью специального метода run_until_complete отправляем их на исполнение. Фиксация времени происходит аналогично. Смотрим на то, что нам выведет консоль:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6Time: 5.01 sec
Неожиданно, не правда ли? Наш асинхронный код работает точь в точь как и синхронный, последовательно выполняя суммирование и отрабатывая столько же времени как и раньше. Почему? А все довольно просто. Все потому, что несмотря на то, что мы говорим Python-у, что наши методы асинхронные, мы используем синхронный блокирующий метод sleep, который нарушает всю нашу концепцию асинхронного кода. Чтобы избежать этого, необходимо использовать неблокирующие специальные методы. Вот код, который сделаем все как надо.
import asyncio
import time
async def sleep():
print(f'Time: {time.time() - start:.2f}')
await asyncio.sleep(1)
async def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await sleep()
total += number
print(f'Task {name}: Sum = {total}\n')
start = time.time()loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()end = time.time()
print(f'Time: {end-start:.2f} sec')
Все, что изменилось в коде — функция sleep, вместо вызова блокирующего sleep() мы вызываем asyncio.sleep(), которая является неблокирующей операцией, поэтому получаем такой прекрасный результат в консоли:
Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6Time: 3.00 sec
Как видим, задачи выполняются параллельно, а затраченное время на выполнение обеих задач — 3 секунды.
API и REST API
API — Application Programming Interface — обеспечивает взаимодействие между двумя системами. Web-сервис — это приложение, предоставляющее ресурсы в формате, используемом другими пользователями. Web-сервисы, в основном, это запросы и ответы между клиентами и серверами. Мы можем сказать, что API, использующие протокол HTTP для передачи данных и являются web-сервисами. При этом, не имеет значения, какой язык программирования используется, какие платформы, важно лишь одно — запрос сообщения и ответ сделаны через общий веб-протокол HTTP.
Что такое REST API вообще? REST — Representational State Transfer. Общая модель REST API может быть проиллюстрирована следующей схемой:
Как видно, клиент и сервер написаны на разных языках, но обмен информации происходит всегда по HTTP протоколу. REST по своей сути — не стандарт, а архитектурный стиль, поэтому REST API часто называют RESTful API, потому что REST — это просто стиль, которому следует API.
REST API могут использовать любой формат сообщений: XML, JSON, Atom, RSS, CSV, HTML и пр. Но большинство REST API используют JSON в качестве формата по умолчанию, как наиболее легкий, гибкий и простой формат.
Отличительной особенностью REST является то, что API фокусируется на ресурсах (вещах, а не действиях) и способах доступа к ресурсам. Под ресурсами понимается разные типы информации, доступ к которым можно получить через специальный заранее определенный URL адрес. Все URL-адреса сопровождаются методами (как их часто называют — глаголами), которые как раз и указывают как взаимодействовать с ресурсом. Обычно методы и действия соответствуют следующим сопоставлениям: GET — чтение, POST — создание, PUT — обновление, DELETE — удаление. Конечная точка (endpoint) также может включать некоторые параметры запроса, которые определяют более подробную информацию о представлении ресурса: например ограничения на количество выбираемых записей, формат и пр (https://service.com/something?limit=5&format=xml).
Отношения между ресурсами и методами, как упоминалось ранее, часто описывается в терминах “существительное” и “глагол”. Ресурс — это существительное, потому что это объект. Глагол — это то, что хотим делать с ресурсом (объектом). Таким образом, можно сказать, что объединение существительных и глаголов и формирует REST.
Наконец, финальной чертой REST является то, что REST API не сохраняют свои состояния и могут кэшироваться. Под отсутствием состояния понимается тот факт, что каждый раз, при обращении к ресурсу через соответствующий endpoint, API возвращает один и тот же ответ, т.е. не происходит запоминания предыдущих запросов, они никак не влияют на последующие ответы.
FastAPI
FastAPI- современный, высокопроизводительный Python фреймворк, который идеален для реализации RESTful API. Для начала, бегло о плюсах и минусах фреймворках.
Плюсы https://www.techempower.com/benchmarks/ — действительно быстрый, потому что FastAPI основывается на Starlette и Pydantic. В списке фреймворк находится на 247 месте, aiohttp на 321, django на 359, flask на 370. Поддержка асинхронной работы “из коробки” с использованием async/await. Легко и удобно тестировать с помощью встроенное TestClient-а (https://fastapi.tiangolo.com/tutorial/testing), что позволяет легко разрабатывать API в аспекте TDD. Бесшовная удобная обработка ошибок с помощью @app.exception_handler. Поддержка OpenAPI (Swagger) из коробки; встроенный Redoc. На выходе имеет полностью автоматически сгенерированную документацию, основывающуюся на коде. Не зависит ни от базы, ни от ORM; совместим с чем угодно. Поддержка GraphQL из коробки. Глубокая валидация данных (даже из json объектов с большой вложенностью), что позволяет реализовывать более отказоустойчивые API. Поддержка Dependency injections, что позволяет: переиспользовать логику; расшаривать ресурсы и пр.
Минусы Неудобная валидация запросов. Валидация работает через Pydantic и приходится иметь дело с захардкоженными исключениями приходящими из него, если что пошло не так. Нет возможности кастомизировать их, если только не писать свой собственный валидатор с нуля. Т.к. FastAPI относительно молод, то комьюнити небольшое, помимо официальной документации (хоть она исчерпывающая) — сложно найти доп. источники информации. Многое, что другие фреймворки предоставляют из коробки (тот же DRF) — придется писать руками, либо искать сторонние решения.
Реализация REST API сервиса списка покупок
Наконец, мы имеем фундаментальные знания связанные с асинхронностью и с сутью REST API, можно начать разработку. Для практики мы реализуем REST API сервиса списка покупок. Сервис будет позволять проходить процедуру авторизации и регистрации пользователей и давать возможность добавлять покупки в список. Список для простоты у нас будет один на пользователь.
Первым делом, наши зависимости из requirements.txt:
asyncpg==0.22.0
psycopg2-binary==2.8.6
databases[postgresql]==0.4.1
SQLAlchemy==1.3.23
SQLAlchemy-Utils==0.36.8
alembic==1.5.6
fastapi==0.63.0
uvicorn==0.13.4
pydantic==1.8.1
email-validator==1.1.2
python-multipart==0.0.5
pytest==6.2.2
pytest-freezegun==0.4.2
requests==2.25.1
В нашем сервисе мы используем асинхронный драйвер для работы с PostgreSQL — asyncpg. Asyncpg был разработан специально для PostgreSQL и работы с Python/asyncio. По своей сути, asyncpg — эффективная, чистая и быстрая имплементация серверного протокола работы с PostgreSQL, заточенного для использования с asyncio фреймворком. Поверх asyncpg мы будем использовать databases — пакет, позволяющий легко работать в асинхронном ключе используя мощный SQLAlchemy Core Expression язык. Без psycopg2 нам не обойтись так же, потому что синхронные операции с базой должны выполняться синхронным драйвером (например, изначальное создание табличек и пр.). Alembic нам потребуется, потому что мы хотим работать с базой красиво, используя миграции. Uvicorn — пожалуй, лучшая, легкая и самая быстрая имплементация ASGI сервера, который и будет запускать наш сервис. Тут сделаем небольшой экскурс в понятия.
Раньше абсолютным стандартом был WSGI — Web Server Gateway Interface. Этот протокол описывал то, как веб-сервер мог передавать HTTP запросы на обработку в Python приложения и получать оттуда ответы. Великое множество фреймворков и библиотек написано с использование WSGI стандарта. Django и Flask поддерживают WSGI до сих пор. Но проблемы с WSGI начались примерно тогда, когда Asyncio был добавлен в Python, потому что со всеми мощными асинхронными наворотами, корутинами и всем таким — WSGI перестал справляться, т.к. В годы его создания про это речи не было вообще и, соответственно, под асинхронную разработку стандарт не заточен вообще. Тут то на сцену выходит новый стандарт ASGI — Asynchronous Server Gateway Interface, поддерживающий корутины, все крутые фишки асинхронности и все прочее.
Вернемся к коду. Мы хотим, чтобы наш проект был докеризован. Базу мы не будет включать в докер, потому что сразу делаем проект будто хотим его выкатывать на прод, поэтому база у нас будет развернута локально. В корне проекта создаем Dockerfile со следующим содержимым:
FROM python:3.8.1-alpineWORKDIR /usr/src/appENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1COPY ./requirements.txt /usr/src/app/requirements.txtRUN set -eux \
&& apk add --no-cache --virtual .build-deps build-base \
libressl-dev libffi-dev gcc musl-dev python3-dev \
postgresql-dev \
&& pip install --upgrade pip setuptools wheel \
&& pip install -r /usr/src/app/requirements.txt \
&& rm -rf /root/.cache/pipCOPY . /usr/src/app/
Тут мы устанавливаем все необходимые пакеты, зависимости и копируем проект в контейнер.
Добавим рядом docker-compose.yml:
version: '3.7'services:
web:
build: .
command: uvicorn app.main:app --reload --workers 1 --host 0.0.0.0 --port 8000
volumes:
- ./:/usr/src/app/
ports:
- 8002:8000
environment:
DB_USER: ${DB_USER}
DB_PASS: ${DB_PASS}
DB_HOST: ${DB_HOST}
DB_NAME: ${DB_NAME}
Тут мы говорим, что наш web контейнер будет запускаться через uvicorn, а так же пробрасываем порты, чтобы иметь доступ к сервису с нашей клиентской машины по порту 8002. В environment заносим 4 переменных окружения, хранящих данные для доступа к нашей БД, которую, напомню, мы создаем локально. После чего добавляем переменные окружения на машине, где разворачиваем решение. Если это Windows, то пишем
set DB_USER=MY_USER_1
Если никсы, то пишем
export DB_USER=MY_USER_1
Повторяем для всех оставшихся переменных для работы с БД.
Создаем рядом в корне две папки: app и tests. В первой будет весь код нашего сервиса, а во второй — тесты на pytest. Внутри папки app создаем следующие папки: models (тут будут модели нашей БД), routers (тут мы будем описывать FastAPI роутеры для всех наших сущностей API), schemas (тут будут Pydantic схемы данных для валидации), utils (тут будут разные функции-helper-ы, к которым будем обращаться из других мест).
Начнем с самого базового функционала и реализуем метод ping. Для этого в папке /app/routers создадим файл ping.py:
# -*- coding: utf-8 -*-from fastapi import APIRouterrouter = APIRouter()
@router.get('/ping')
async def pong():
return {'ping': 'pong!'}
Все, что мы делаем тут — создаем экземлпяр APIRouter-а, используя соответствующий декоратор определяем запрос по URL /ping с методом GET, который в ответ вернет JSON объект.
Теперь нам надо подключить наш роутер в главный файл приложения. Создаем внутри /app main.py:
# -*- coding: utf-8 -*-from fastapi import FastAPIfrom app.routers import ping# Initialize the app
app = FastAPI()app.include_router(ping.router)
Тут мы создаем экземпляр FastAPI(). Подключаем только что созданный роутер и все, готово! Мы написали первый метод используя FastAPI. Идем в консоль и создаем контейнер: docker-compose up -d — build
После билда можем открыть в браузере URL http://localhost:8002/ping и убедиться, что там выводиться наш JSON. Кроме этого откроем http://localhost:8002/docs — Swagger документация любезно созданная автоматически FastAPI, основываясь на нашем коде.
Теперь пора позаботиться о тестировании. Добавим в папку /tests файл conftest.py, где на данный момент создадим простую фикстуру, возвращающую тестовый клиент, к которому будем обращаться с запросами из нашей функции тестирования:
# -*- coding: utf-8 -*-from starlette.testclient import TestClient
@pytest.fixture()
def client():
with TestClient(app) as test_client:
yield test_client
Делаем docker-compose down и вновь docker-compose up -d — build. После успешного билда мы можем прогнать наше первое тестирование командой
docker-compose exec web pytest .
Переходим к более сложным вещам — работа с базой. Займемся регистрацией и авторизацией пользователей.
Добавляем внутри app файл db.py и конфигурируем подключение к БД:
# -*- coding: utf-8 -*-from databases import Database
from os import environDB_USER = environ.get('DB_USER', 'user')
DB_PASSWORD = environ.get('DB_PASS', 'password')
DB_HOST = environ.get('DB_HOST', 'localhost')
DB_NAME = environ.get('DB_NAME', 'localhost')
DATABASE_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}'# databases query builder
database = Database(DATABASE_URL)
Все доступы получаем из окружения. Формируем URL для подключения. Используя databases подключаемся к базе.
Пора создать модели, которые необходимы для нашей текущей задачи. В папке /app/models создаем users.py файл и определяем модели пользователя и токена пользователя, используя SQLAlchemy Expression язык:
# -*- coding: utf-8 -*-import sqlalchemy
from sqlalchemy.dialects.postgresql import UUIDmetadata = sqlalchemy.MetaData()
users_table = sqlalchemy.Table(
'users',
metadata,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('email', sqlalchemy.String(40), unique=True, index=True),
sqlalchemy.Column('name', sqlalchemy.String(100)),
sqlalchemy.Column('hashed_password', sqlalchemy.String()),
sqlalchemy.Column(
'is_active',
sqlalchemy.Boolean(),
server_default=sqlalchemy.sql.expression.true(),
nullable=False,
),
)
tokens_table = sqlalchemy.Table(
'tokens',
metadata,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
'token',
UUID(as_uuid=False),
server_default=sqlalchemy.text('uuid_generate_v4()'),
unique=True,
nullable=False,
index=True,
),
sqlalchemy.Column('expires', sqlalchemy.DateTime()),
sqlalchemy.Column('user_id', sqlalchemy.ForeignKey('users.id')),
)
Тут, в общем, интуитивно понятный код. Единственная сложность заключается в том, что в модели tokens мы используем для самого токена UUID4 формат, по умолчанию соответствующее расширение может не присутствовать в PostgreSQL, поэтому не забудьте, подключившись к базе через psql прописать следующую команду:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
Теперь пора инициализировать наш alembic, чтобы работать с миграциями. Напишем в консоли:
alembic init migrations
У нас в корне проекта появятся файлы alembic.ini и каталог migrations в котором будут: versions — папка с миграциями, env.py — скрипт, который запускается при вызове alembic, script.py.mako — содержащий шаблон для новых миграций. В alembic.ini нам необходимо прописать URL подключения к базе:
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s
Написав URL в таком виде мы даем понять, что данные будут браться из соответствующих переменных, которые мы переопределим в файле /migrations/env.py:
from os import environfrom app.models import usersconfig = context.configsection = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))fileConfig(config.config_file_name)target_metadata = [users.metadata, ]
Как видно, кроме переопределения переменных из окружения, мы еще указали метаданные в соответствущем параметре target_metadata — указывать тут надо все модели с которыми мы работаем в проекте, иначе Alemblic не будет знать откуда считывать изменения.
Мы все сделали. Генерируем миграции и обновляем нашу БД:
alembic revision --autogenerate -m "Added required tables"
alembic upgrade head
Возвращаемся в /app/main.py и дописываем код, чтобы при старте сервиса мы подключались к БД, при завершении — отключались:
# -*- coding: utf-8 -*-from fastapi import FastAPIfrom app.routers import ping, users
from app.db import database
# Initialize the app
app = FastAPI()
@app.on_event('startup')
async def startup():
await database.connect()
@app.on_event('shutdown')
async def shutdown():
await database.disconnect()
app.include_router(ping.router)
app.include_router(users.router)
Заодно мы подключаем новый роутер для работы с методами пользователей, его мы создадим чуть позже. Первым делом, добавим Pydantic схемы для валидации данных приходящих на сервер от клиента, а также для схемы для ответов от сервера клиенту. Идем в /app/schemas/ и создаем файл users.py:
# -*- coding: utf-8 -*-from datetime import datetime
from typing import Optionalfrom pydantic import BaseModel, EmailStr, Field, UUID4, validator
class UserCreate(BaseModel):
"""Sign up request schema.""" email: EmailStr
name: str
password: str
class UserBase(BaseModel):
"""Response schema with user details.""" id: int
email: EmailStr
name: str
class TokenBase(BaseModel):
token: UUID4 = Field(..., alias='access_token')
expires: datetime
token_type: Optional[str] = 'bearer' class Config:
allow_population_by_field_name = True @validator('token')
def hexlify_token(cls, value):
"""UUID to hex string converter.""" return value.hex
class User(UserBase):
"""Response body with user details."""
token: TokenBase = {}
Первым делом замечаем, что типы полей определяются с помощью tipe hinting-а. Итак, у нас есть класс User, который определяет тело ответа с данными пользователя и наследуется от класса UserBase. UserBase наследуется от BaseModel и содержит схему ответа с данными пользователя: id, email и имя. В User классе у нас еще добавляется поле token, а тип у него — TokenBase, описанный выше. TokenBase наследуется от BaseModel и описывает схему возвращаемую для токенов, в которой должны быть поля: непосредственно сам токен, дата его истечения, тип токена, так же описываем метод преобразования UUID в hex. В TokenBase активно используются конструкции Pydantic. Например, первый параметр Field в виде троеточия говорит о том, что поле обязательное, а после мы даем псевдоним полю, и далее в Config-е указываем, что разрешаем обращаться к полю, используя заданный псевдоним. Наконец, UserCreate схема определяет поля, которые сервер ждет от клиента для регистрации нового пользователя. Внутри себя pydantic содержим множество типов комплексных, позволяющих делать валидацию нужных данных без лишних телодвижений — EmailStr тому в пример.
Со схемами для авторизации и регистрации мы закончили. Теперь пойдем в /app/utils и создадим там файл users.py, в котором опишем необходимые нам функции-helper-ы для функционала пользователей:
# -*- coding: utf-8 -*-"""Different helper-functions to work with users."""
import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_from app.db import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema
def get_random_string(length=12):
"""Return generated random string (salt)."""
return "".join(random.choice(string.ascii_letters) for _ in range(length))
def hash_password(password: str, salt: str = None):
"""Hash password with salt."""
if salt is None:
salt = get_random_string()
enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return enc.hex()
def validate_password(password: str, hashed_password: str):
"""Validate password hash with db hash."""
salt, hashed = hashed_password.split("$")
return hash_password(password, salt) == hashed
async def get_user_by_email(email: str):
"""Return user info by email."""
query = users_table.select().where(users_table.c.email == email)
return await database.fetch_one(query)
async def get_user_by_token(token: str):
"""Return user info by token."""
query = tokens_table.join(users_table).select().where(
and_(
tokens_table.c.token == token,
tokens_table.c.expires > datetime.now()
)
)
return await database.fetch_one(query)
async def create_user_token(user_id: int):
"""Create token for user with user_id."""
query = (
tokens_table.insert()
.values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
.returning(tokens_table.c.token, tokens_table.c.expires)
) return await database.fetch_one(query)
async def create_user(user: user_schema.UserCreate):
"""Create new user."""
salt = get_random_string()
hashed_password = hash_password(user.password, salt)
query = users_table.insert().values(
email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
)
user_id = await database.execute(query)
token = await create_user_token(user_id)
token_dict = {"token": token["token"], "expires": token["expires"]} return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}
Пойдем по-порядку. Во-первых, у нас есть функция get_random_string, возвращающая рандомную строчку, чтобы использовать ее в качество соли при хешировании пароля в функции hash_password, получающей на вход пользовательский пароль и соль (опционально), а на выходе мы имеем хешированный пароль готовый для хранения в БД. Далее у нас идет функция validate_password, ее мы будем вызывать при авторизации пользователя, когда надо будет проверить пароль, пришедший от клиента, с хешированным паролем, лежащим в БД. Get_user_by_email и get_user_by_token — геттеры, извлекающие информацию о пользователя из БД по email-у и токену соответственно. Используем async и await для асинхронной работы с базой через databases. Create_user_token получает id пользователя в БД и создает для него токен. Обратите внимание, сам токен тут не генерируется в коде, он создается на уровне БД, так, как это мы определили в модели. Наконец, create_user функция будет вызываться нами при регистрации нового пользователя, ожидает в качестве параметра объект user, соответствующий описанной нами ранее Pydantic схеме UserCreate, после регистрации возвращает все данные пользователя вместе с токеном доступа.
У нас все готово, чтобы написать пользовательский роутер для всех необходимых нам методов. Создаем в /app/routers файл users.py:
# -*- coding: utf-8 -*-from fastapi import APIRouter, Depends, status
from fastapi.exceptions import HTTPException
from fastapi.security import OAuth2PasswordRequestFormfrom app.schemas import users
from app.utils import users as users_utils
from app.utils.dependecies import get_current_userrouter = APIRouter()
@router.post('/sign-up', response_model=users.User)
async def create_user(user: users.UserCreate):
db_user = await users_utils.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Email already registered.')
return await users_utils.create_user(user=user)
@router.post('/auth', response_model=users.TokenBase)
async def auth(format_data: OAuth2PasswordRequestForm = Depends()):
user = await users_utils.get_user_by_email(email=format_data.username)
if not user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Wrong e-mail or password')
if not users_utils.validate_password(password=format_data.password, hashed_password=user['hashed_password']):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Wrong e-mail or password') return await users_utils.create_user_token(user_id=user['id'])
@router.get('/users/me', response_model=users.UserBase)
async def read_user(current_user: users.User = Depends(get_current_user)):
return current_user
Разберемся, что тут происходит. Первым делом — создаем новый экземпляр роутера. С помощью декоратора указываем, что вызывая url /sign-up и используя метод POST у нас будет выполняться функция create_user. В декораторе мы также указываем вторым параметром модель ответа, которую берем из описанной ранее Pydantic схемы. Сама же функция ожидает, что к нему от клиента придут данные соответствующие схеме UserCreate. Первым делом мы пытаемся узнать, нету ли уже пользователя с таким же емейлом в базе. Если таковой есть — поднимаем исключение с 400 статусом и ошибкой. В противном случае — вызываем нашу функцию создания пользователя и возвращаем ее ответ.
Второй роут — авторизация и он интереснее первого, потому что там мы пользуемся техникой Dependency Injection. С помощью мы декоратора мы говорим, что обращение к /auth методом POST будет вызывать работу функции auth(), а модель ответа должна соответствовать описанной схеме TokenBase. А вот функция auth ожидает объект format_data типа OAuth2PasswordRequestForm — мы такую схему не описывали, справедливо заметите вы. Все так, мы пользуемся плюшками FastAPI, предоставляющего нам целый набор dependency классов, среди которых есть OAuth2PasswordRequestForm — он ожидает, что от клиента на сервер придут поля username и password.
Ну и еще более интересный метод — номер три, который выводит информацию о текущем пользователе, который обратился к нему. Чтобы его сделать красиво — опишем свой собственный dependecy-метод, благодаря которому мы сможем использовать данные пользователя во всех прочих методах, которые должны быть закрыты авторизацией. Создадим в /app/utils dependcies файл и опишем наш метод:
# -*- coding: utf-8 -*-"""Here goes different dependencies to be used in project."""from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBeareroauth2_scheme = OAuth2PasswordBearer(tokenUrl='/auth')
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await users_utils.get_user_by_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Wrong credentials',
headers={'WWW-Authenticate': 'Bearer'},
)
if not user['is_active']:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Inactive user') return user
Наш метод в данном случае сам использует еще один dependency-класс fastapi — OAuth2PasswordBearer, описывающий схему oauth2 авторизации. Благодаря этой зависимости FastAPI как раз понимает, что роут закрыт от “гостевых” заходов, доступ разрешен только авторизованным пользователям. Внутри метода мы пытаемся получить данные пользователя по токену, пришедшему в метод из схемы OAuth2PasswordBearer. Если пользователя по токену найти не удалось, то поднимаем исключение со статусом 401. Если же пользователь нашелся, но он деактивирован администратором, то поднимаем исключением со статусом 400. Если все ок — возвращаем объект пользователя.
На этом все! Если мы зайдем по адресу http://localhost:8002/docs — то должны увидеть описания всех наших методов с возможностью их протестировать. Либо, это мы можем сделать непосредственно из Postman-а, например.
Теперь обойдем весь написанный функционал тестами. Идем в /tests/conftest.py и вносим коррективы в файл:
# -*- coding: utf-8 -*-import os
import pytestfrom starlette.testclient import TestClient
from sqlalchemy import create_engine# rewriting db name before including database from db.py, to use test db in our tests
os.environ['DB_NAME'] = 'fastapi_test'from alembic import command
from alembic.config import Config
from app import db
from app.main import app
from sqlalchemy_utils import create_database, drop_database
@pytest.fixture(scope='module')
def setup_db():
"""Fixture. Here we create DB for test purposes, yield test client for requests and finally - cleaning up."""
try:
create_database(db.DATABASE_URL) engine = create_engine(db.DATABASE_URL)
engine.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') base_dir = os.path.dirname(os.path.dirname(__file__))
alembic_cfg = Config(os.path.join(base_dir, 'alembic.ini')) # loading alembic config
command.upgrade(alembic_cfg, 'head') # perform migrations yield db.DATABASE_URL
finally:
drop_database(db.DATABASE_URL)
@pytest.fixture()
def client(setup_db):
with TestClient(app) as test_client:
yield test_client
Тут произошло довольно много изменений. Первым делом, перед подключением /app/db, где формируется database url из environment переменных, мы переопределяем название базы данных, потому что для тестирования мы будем использовать тестовую БД. После чего, мы создаем фикстуру, которая будет при запуске тестов поднимать базу, выполнять необходимые миграции, возвращать URL базы, а по завершению тестов — удалять базу. Обратите внимание, что в фикстуре мы указываем scope=’module’ в параметре декоратора — это сделано для того, чтобы для тестов, описанных в одном модуле, использовалась одна и та же база данных. Если мы опустим scope, то для каждого теста будет происходить процесс развертки базы, а нам это не надо, мы ходим, чтобы база разворачивалась единожды для каждого модуля с тестами. Наконец, корректируем нашу вторую фикстуру, она теперь наследуется от фикстуры создания базы. После этого можем приступить к написанию тестов в файле /tests/users.py:
# -*- coding: utf-8 -*-import asyncio
import pytestfrom fastapi.testclient import TestClientfrom app.schemas.users import UserCreate
from app.utils.users import create_user_token, create_user
def test_sign_up(client: TestClient): # temp_db fixture from conftest module
request_data = {
'email': 'neo@matrix.com',
'name': 'Mr. Anderson',
'password': 'red_pill',
} response = client.post('/sign-up', json=request_data)
assert response.status_code == 200
assert response.json()['id'] == 1
assert response.json()['email'] == 'neo@matrix.com'
assert response.json()['name'] == 'Mr. Anderson'
assert response.json()['token']['expires'] is not None
assert response.json()['token']['access_token'] is not None
def test_login(client: TestClient):
request_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
} response = client.post('/auth', data=request_data)
assert response.status_code == 200
assert response.json()['token_type'] == 'bearer'
assert response.json()['expires'] is not None
assert response.json()['access_token'] is not None
def test_invalid_login(client: TestClient):
request_data = {
'username': 'neo@matrix.com',
'password': 'blue_pill',
} response = client.post('/auth', data=request_data)
assert response.status_code == 400
assert response.json()['detail'] == 'Wrong e-mail or password'
def test_user_detail(client: TestClient):
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.get('/users/me', headers={'Authorization': f'Bearer {token["token"]}'})
assert response.status_code == 200
assert response.json()['id'] == 1
assert response.json()['email'] == 'neo@matrix.com'
assert response.json()['name'] == 'Mr. Anderson'
def test_user_detail_forbidden_without_token(client: TestClient):
response = client.get('/users/me')
assert response.status_code == 401
@pytest.mark.freeze_time('2010-01-01')
def test_user_detail_forbidden_with_expired_token(client: TestClient, freezer):
user = UserCreate(
email='smith@agent.net',
name='Mr. Smith',
password='virus',
)
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
freezer.move_to("'2010-02-01'")
response = client.get('/users/me', headers={'Authorization': f'Bearer {user_db["token"]["token"]}'})
assert response.status_code == 401
В общем, тут описывать нечего, весь код должен быть интуитивно понятен.
Заходим в консоль, пишем
docker-compose exec web pytest .
Наблюдаем, что все тесты должны быть успешно пройдены.
Всё! Функционал для работы с пользователями полностью реализован, покрыт тестами и мы готовы перейти к финальной части — непосредственно функционал работы со списком покупок.
Все, что нам осталось в рамках нашего сервиса “скелета”-заготовки — реализовать функционал работы непосредственно с продуктами. Все, что мы сейчас хотим — это разрешить авторизованному пользователю добавлять продукты в свой список, получать продукты в списке, изменять продукты в списке. Реализуем это.
Первым делом, в /app/models создаем products.py и описываем там модель продуктов:
# -*- coding: utf-8 -*-import sqlalchemyfrom .users import users_tablemetadata = sqlalchemy.MetaData()
products_table = sqlalchemy.Table(
'products',
metadata,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('user_id', sqlalchemy.ForeignKey(users_table.c.id)),
sqlalchemy.Column('title', sqlalchemy.String(100)),
sqlalchemy.Column('quantity', sqlalchemy.Float),
sqlalchemy.Column('created_date', sqlalchemy.DateTime, default=sqlalchemy.func.now(), nullable=False),
)
Привязываемся к пользователю через ForeignKey. Теперь идем в /app/schemas и создаем схемы валидации запросов и ответов для продуктов в файле products.py:
# -*- coding: utf-8 -*-from datetime import datetime
from typing import Optionalfrom pydantic import BaseModel
class Product(BaseModel):
"""Validate request data.""" title: str
quantity: float
class ProductDetails(Product):
"""Response schema.""" id: int
user_id: int
user_name: str
created_date: datetime
Product-схема описывает то, что мы ожидаем получить от пользователя — наименование продукта и количество. ProductDetails — то, что мы будем отдавать пользователю, там у нас более детальная информация по продукту.
Теперь пора создать helper-функции, которые помогут нам реализовать нужные API. Создаем в /app/utils products.py:
# -*- coding: utf-8 -*-"""Helper functions to work with products functionality."""from datetime import datetimefrom app.db import database
from app.models.products import products_table
from app.models.users import users_table
from app.schemas import products as product_schema
from sqlalchemy import desc, func, select, and_
async def create_product(product: product_schema.Product, user):
query = (
products_table.insert()
.values(
title=product.title,
quantity=product.quantity,
user_id=user['id'],
)
.returning(
products_table.c.id,
products_table.c.user_id,
products_table.c.title,
products_table.c.quantity,
products_table.c.created_date,
)
)
product = await database.fetch_one(query) # Here product is Record object product = dict(zip(product, product.values())) # Record to dict goes here
product['user_name'] = user['name'] return product
async def get_product_by_id(product_id: int):
query = (
select(
[
products_table.c.id,
products_table.c.created_date,
products_table.c.title,
products_table.c.quantity,
products_table.c.user_id,
users_table.c.name.label('user_name'),
]
)
.select_from(products_table.join(users_table))
.where(products_table.c.id == product_id)
) return await database.fetch_one(query)
async def get_products(page: int, user):
max_per_page = 10
current_offset = (page-1) * max_per_page
query = (
select(
[
products_table.c.id,
products_table.c.created_date,
products_table.c.title,
products_table.c.quantity,
products_table.c.user_id,
users_table.c.name.label('user_name'),
]
)
.select_from(products_table.join(users_table))
.where(products_table.c.user_id == user['id'])
.order_by(desc(products_table.c.created_date))
.limit(max_per_page)
.offset(current_offset)
) return await database.fetch_all(query)
async def get_products_num(user):
query = (
select(
[
func.count()
]
)
.select_from(products_table)
.where(products_table.c.user_id == user['id'])
) return await database.fetch_one(query)
async def update_product(product_id: int, product: product_schema.Product):
query = (
products_table.update()
.where(products_table.c.id == product_id)
.values(title=product.title, quantity=product.quantity)
) return await database.fetch_one(query)
Мы создали функции создания нового продукта, получения информации о продукте по его id, получения всех продуктов из списка конкретного пользователя, подсчета количества добавленных пользователем продуктов и, наконец, обновление продукта. Код говорит сам за себя, а насчет построения запросов можно почитать в официальной документации по языку запросов SQLAlchemy Core. Сейчас все, что нам осталось — создать роутер в /app/routers products.py:
# -*- coding: utf-8 -*-from fastapi import APIRouter, Depends, HTTPException, statusfrom app.schemas.products import Product, ProductDetails
from app.schemas.users import User
from app.utils import products as product_utils
from app.utils.dependecies import get_current_userrouter = APIRouter()
@router.post('/products', response_model=ProductDetails, status_code=201)
async def create_product(product: Product, current_user: User = Depends(get_current_user)):
product = await product_utils.create_product(product, current_user)
return product
@router.get('/products')
async def get_products(page: int = 1, current_user: User = Depends(get_current_user)):
products_num = await product_utils.get_products_num(current_user)
products = await product_utils.get_products(page, current_user)
return {'total': products_num, 'results': products}
@router.get('/products/{product_id}', response_model=ProductDetails)
async def get_product(product_id: int, current_user: User = Depends(get_current_user)):
product = await product_utils.get_product_by_id(product_id)
if not product:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if product['user_id'] != current_user['id']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='You don\'t have access',
) return product
@router.put('/products/{product_id}', response_model=ProductDetails)
async def update_product(product_id: int, product_data: Product, current_user: User = Depends(get_current_user)):
product = await product_utils.get_product_by_id(product_id)
if not product:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if product['user_id'] != current_user['id']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='You don\'t have access',
) await product_utils.update_product(product_id, product_data) return await product_utils.get_product_by_id(product_id)
Что тут происходит? Первым делом мы описываем POST запрос на endpoint /products, создающий новый продукт. В нем указываем модель ответа ProductDetails из схемы, описанной ранее и возвращаемый в случае успеха статус — 201. Обращу внимание, что схема ответа, которую мы указываем — автоматически фильтрует данные. Т.е. если в функции, отвечающей за эндпойнт, вернется больше данных — не страшно, схема выведет наружу для пользователя только то, что в ней прописано. Сама функция ожидает, что ей будет передана информация по создаваемому продукту, соответствующая ранее созданной схеме Product, кроме всего прочего, мы пользуемся ранее созданной зависимостью get_current_user, чтобы ограничить доступ не авторизованным пользователям и получить информацию о них. Сам код функции — тривиален.
После описываем GET запрос /products, который вернет список всех продуктов пользователя, функция поддерживает пагинацию в виде параметра URL адреса. Следом описываем GET запрос /products/{product_id} — получение детальной информации по продукту списка со схемой ответа ProductDetails. В рамках функции мы проверяем, во-первых, есть ли такой продукт в базе, во-вторых, принадлежит ли он пользователю, который его запрашивает — поднимаем соответствующие исключения. Финальный роут — обновление продукта — PUT запрос /products/{product_id}, возвращающий ответ по схеме ProductDetails. Проверки внутри функции аналогичны сделанным при запросе детальной информации по продукту.
Финальный штрих — добавляем в /tests/ test_products.py, где покрываем тестами наши продуктовые эндпойнты:
# -*- coding: utf-8 -*-from fastapi.testclient import TestClient
def get_user_token(client: TestClient, user_data):
response = client.post('/auth', data=user_data)
token = response.json()['access_token'] return token
def test_create_product(client: TestClient):
request_data = {
'email': 'neo@matrix.com',
'name': 'Mr. Anderson',
'password': 'red_pill',
}
client.post('/sign-up', json=request_data) user_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
}
token = get_user_token(client, user_data) request_data = {
'title': 'Bananas',
'quantity': 3,
}
response = client.post('/products', json=request_data, headers={'Authorization': f'Bearer {token}'}) assert response.status_code == 201
assert response.json()['title'] == 'Bananas'
assert response.json()['quantity'] == 3
assert response.json()['user_name'] == 'Mr. Anderson'
def test_get_products(client: TestClient):
user_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
}
token = get_user_token(client, user_data) response = client.get('/products', headers={'Authorization': f'Bearer {token}'}) assert response.status_code == 200
assert response.json()['total']['count_1'] == 1
assert response.json()['results'][0]['id'] == 1
assert response.json()['results'][0]['title'] == 'Bananas'
def test_get_product(client: TestClient):
user_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
}
token = get_user_token(client, user_data) response = client.get('/products/1', headers={'Authorization': f'Bearer {token}'}) assert response.status_code == 200
assert response.json()['id'] == 1
assert response.json()['title'] == 'Bananas'
assert response.json()['quantity'] == 3
def test_get_not_existed_product(client: TestClient):
user_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
}
token = get_user_token(client, user_data) response = client.get('/products/2', headers={'Authorization': f'Bearer {token}'}) assert response.status_code == 404
def test_get_not_owned_product(client: TestClient):
request_data = {
'email': 'smith@matrix.com',
'name': 'Agent Smith',
'password': 'virus',
}
client.post('/sign-up', json=request_data) user_data = {
'username': 'smith@matrix.com',
'password': 'virus',
}
token = get_user_token(client, user_data) response = client.get('/products/1', headers={'Authorization': f'Bearer {token}'}) assert response.status_code == 403
def test_update_product(client: TestClient):
user_data = {
'username': 'neo@matrix.com',
'password': 'red_pill',
}
token = get_user_token(client, user_data) request_data = {
'title': 'Milk',
'quantity': 1,
}
response = client.put('/products/1', headers={'Authorization': f'Bearer {token}'}, json=request_data) assert response.status_code == 200
assert response.json()['id'] == 1
assert response.json()['title'] == 'Milk'
assert response.json()['quantity'] == 1
def test_update_not_owned_product(client: TestClient):
user_data = {
'username': 'smith@matrix.com',
'password': 'virus',
}
token = get_user_token(client, user_data) request_data = {
'title': 'Milk',
'quantity': 1,
}
response = client.put('/products/1', headers={'Authorization': f'Bearer {token}'}, json=request_data) assert response.status_code == 403
Отладка FastAPI + uvicorn
В процессе разработки может возникнуть ситуация, когда понадобится подебажить код, а у нас все упаковано в докер. На самом деле, добавив буквально пару строк кода мы сможем спокойно дебажить проект прямиком из PyCharm. Для этого нам надо сделать два шага.
Первый шаг — внесем коррективку в main.py, добавив import uvicorn , а затем, в самом конце файла напишем следующий код:
if __name__ == "__main__": # for dev. debugging purposes
uvicorn.run(app, host="0.0.0.0", port=8075)
Т.е., когда мы будем запускать наш main.py из дебагера (а эту цель мы сейчас преследуем), мы говорим, чтобы Python стартовал uvicorn.
Второй шаг — добавляем конфигурацию для исполнения в PyCharm:
Не забываем в Environment variables прописать все, что нужно для нашего проекта (само собой, заменив на свои данные):
PYTHONUNBUFFERED=1;DB_NAME=fastapi_dev;DB_HOST=localhost;DB_USER=fastapi;DB_PASS=fastapi
Готово. Жмем на отладку, выполняем запросы и корректируем наш код.
Ссылки
Github: https://github.com/UNREALre/FastAPIShoppingList
Postman коллекция сервиса: https://www.getpostman.com/collections/cb94cf28a94e04ce807b