当我第一次听说要在FastAPI中使用异步SQLAlchemy时,我的第一反应是:“又一个流行词,能有多复杂?”结果发现,这可不是简单地加个async就能搞定的。以下就是我在困惑、误区和最终豁然开朗之间跌跌撞撞的心路历程。
初印象:Async、Await与SQLAlchemy
我一直写同步代码,对async总觉得像是给代码撒了点魔法粉,让它“更好”。而且我用TypeScript的async模式很熟,觉得Python的async应该也差不多。很简单,对吧?
错了。
我最初的想法是:只要在FastAPI路由上加个async def,对SQLAlchemy查询加点await,API就能飞快了。
剧透: 根本不是这么回事。
心智模型的问题
我一开始把async当成简单的性能增强器——就像给汽车加了氮气加速。但这个心智模型是错的。Python里的async更像是把单车道公路换成了多车道高速公路,还配有智能交通管理。
在TypeScript/JavaScript里,事件循环是语言底层自带的。而在Python里,async是一种架构选择,不同场景有不同模式。
深入异步SQLAlchemy:第一道困惑
最初,我常用的同步SQLAlchemy查询是这样的:
def get_user(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
很直观。但当我试图加上async def,并随意加点await:
async def get_user(db: Session, email: str):
return await db.query(User).filter(User.email == email).first() # 这是错的!
我很快发现哪里不对劲。Python立刻报错:
AttributeError: 'Query' object has no attribute '__await__'
啊,经典错误。原来我犯了第一个致命错误:
错误假设: SQLAlchemy经典的
session.query()模式会自动支持async。
并不会。这就像试图把普通电器插到USB口——系统根本不兼容!
“阻塞”带来的顿悟
这让我更深入地理解了“阻塞”到底是什么意思。想象一下,餐厅里只有一个服务员(事件循环)。当服务员去厨房(数据库)取餐时,其他客人只能等着。
同步系统里,服务员进了厨房就出不来,直到餐做好。异步系统里,服务员可以一边让厨房做菜,一边继续接新单。
问题在于,我的SQLAlchemy session就是那个死守厨房、拒绝多任务的服务员:
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: Session = Depends(get_db)):
# 即使在async def函数里,这依然是阻塞调用!
user = db.query(User).filter(User.id == user_id).first()
return user
这段代码是最糟糕的混合体——用了async语法,但数据库调用还是阻塞的!
神秘的.execute()和select()登场
疯狂翻阅Stack Overflow后(大家都懂的),我发现异步SQLAlchemy用的是完全不同的语法:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def get_user(db: AsyncSession, email: str):
result = await db.execute(select(User).where(User.email == email))
return result.scalars().first()
“等等,”我盯着.scalars().first()发呆,“查一条记录怎么突然变成两步了?”
感觉太复杂了。老的.query().first()多好啊!
数据库依赖:被遗忘的一环
我后来意识到,FastAPI的依赖也要变。同步的session依赖:
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
要变成:
async def get_async_db():
async with async_session() as session:
yield session
这不仅是语法变化,更是session管理方式的根本转变!
拆解我的误区:.scalars()到底是什么?
我一开始以为.scalars()是个异步收集器,等流结束后自动吐出对象。后来才发现,.execute()返回的是一个Result对象,本质上是个表结构,就算你只查一列也是如此。
让我用一个比喻来解释,终于让我恍然大悟:
把SQLAlchemy查询比作点餐:
同步SQLAlchemy(老方式):
pizza = restaurant.order(Pizza).with_topping('pepperoni').make()
很简单,直接拿到披萨。
异步SQLAlchemy(新方式):
order_slip = await restaurant.execute(order(Pizza).with_topping('pepperoni'))
pizza = order_slip.items().first()
为啥多了几步?想象服务员给你端来一个托盘(Result),上面有多个盒子(Row),每个盒子里装着你点的东西(如User对象)。哪怕你只点了一份披萨,也会被装在托盘的盒子里:
execute()= 服务员端来托盘.scalars()= 打开所有盒子,把食物直接放到托盘上.first()= 从托盘上拿第一个食物
也就是:
# execute() 得到: [Row(User(id=1),), Row(User(id=2),)]
result = await db.execute(select(User).where(...))
# scalars() 得到: [User(id=1), User(id=2)]
users = result.scalars()
# first() 得到: User(id=1)
first_user = users.first()
CRUD语法对照表
理解了这个模式后,我需要把常用的CRUD操作翻译成新语法:
| 旧同步语法 | 新异步语法 |
|---|---|
db.query(User).first() | (await db.execute(select(User))).scalars().first() |
db.query(User).all() | (await db.execute(select(User))).scalars().all() |
db.query(User).filter(User.name == name).one() | (await db.execute(select(User).where(User.name == name))).scalars().one() |
db.add(user); db.commit() | db.add(user); await db.commit() |
db.query(User).filter(User.id == id).update({User.name: new_name}) | await db.execute(update(User).where(User.id == id).values(name=new_name)) |
这张表成了我迁移代码的“罗塞塔石碑”。
性能焦虑:我真的需要异步吗?
下一个问题:折腾这么多,真的有用吗?
我做了个简单基准测试,用典型API接口做数据库调用。1000个并发请求下结果如下:
- 同步FastAPI + 同步SQLAlchemy:约600请求/秒
- 异步FastAPI + 同步SQLAlchemy:约550请求/秒(反而更慢!)
- 异步FastAPI + 异步SQLAlchemy:约1400请求/秒
中间那个最差,因为有async的开销却没有async的好处!就像雇了个多任务服务员,却只让他一次只服务一桌。
如果你要做高并发API(每秒上千请求),async的优势才明显。同步DB调用会阻塞整个API,异步则能让Python在等数据库时处理更多请求。
Python 3.13的剧情反转
后来我又听说Python 3.13要解锁GIL(全局解释器锁),实现真正多线程。那是不是可以用线程代替async?我一开始也这么想——但又错了。
我以为Python 3.13的GIL改进能让多线程通吃一切,但其实我混淆了两类问题:
- CPU密集型任务:解锁GIL后多线程才有用
- I/O密集型任务(如数据库查询):无论GIL如何,async依然更优
解锁GIL对CPU密集型有帮助,但对于网络I/O密集型,async依然是王道——这正是大多数API的场景。所以,即使多线程更强,async依然很重要。
混合方案:务实的过渡
我的项目有几百个路由,没法一次性全部重写。我找到了一个渐进迁移的办法:
from fastapi.concurrency import run_in_threadpool
@app.get("/legacy-but-important")
async def read_complex_report(db: Session = Depends(get_db)):
# 把阻塞代码放到线程池,避免阻塞事件循环
result = await run_in_threadpool(
lambda: db.query(ComplexReport).all()
)
return result
这样可以先迁移最常用的接口,逐步推进。
特殊数据库扩展,比如pgvector怎么办?
我最后的顾虑是:像pgvector这样的特殊扩展能用async吗?查了下资料,async和pgvector配合得很好。用asyncpg作为PostgreSQL驱动,异步集成这些扩展毫无障碍。
下面是用pgvector异步查询的例子:
stmt = select(Document).order_by(
l2_distance(Document.embedding, query_vector)
).limit(5)
result = await db.execute(stmt)
docs = result.scalars().all()
我很欣慰地发现,未来要加向量检索也不会被async迁移卡住。
辅助函数:让生活更简单
迁移了几个路由后,我发现新语法太啰嗦了,于是写了辅助函数:
async def db_get(db: AsyncSession, model, **kwargs):
stmt = select(model).filter_by(**kwargs)
result = await db.execute(stmt)
return result.scalars().first()
# 用法
user = await db_get(db, User, email="[email protected]")
这些辅助函数让新语法恢复了同步时代的简洁,同时保留了async的优势。
总结我的Async之旅
从一开始觉得async只是个噱头,到被语法折磨、误解.scalars(),再到最终欣然接受它的优点——这一路既痛苦又收获满满。异步SQLAlchemy其实没那么可怕,但确实需要转变思维和拥抱新模式。
如果你要从我的经历里记住三点:
- 心智模型很重要:async关注的是并发,不是并行
- 要么全异步,要么全同步:async路由配同步DB调用是最差组合
- 抽象很有用:写辅助函数简化冗长语法
所以,如果你正盯着db.query()发愁要不要迁移到异步SQLAlchemy——别怕!刚开始确实有点懵,但坚持下去,明朗和高性能就在前方。
