news 2026/4/30 13:03:58

如何使用 SQLAlchemy 异步进行数据库请求

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
如何使用 SQLAlchemy 异步进行数据库请求

原文:towardsdatascience.com/how-to-use-sqlalchemy-to-make-database-requests-asynchronously-e90a4c8c11b1

数据库请求是一个典型的 I/O 密集型任务,因为它大部分时间都在等待数据库服务器的响应。因此,如果你的应用程序进行了大量的数据库请求,那么通过并发执行它们,性能可以得到显著提升,这是 SQLAlchemy(一个多功能的 Python SQL 工具包和对象关系映射器)所支持的。

此外,异步编程在 Python 中越来越受欢迎,尤其是在使用 FastAPI 进行 Web 开发时,我们经常需要在协程中执行数据库请求,即在用async def语句定义的函数中。不幸的是,我们不能使用经典的同步版本的 SQLAlchemy,而需要创建引擎、连接和会话的异步版本。

在本文中,我们将介绍如何在不同的场景下使用 SQLAlchemy 异步,即使用简单的 SQL 查询、Core 和 ORM。重要的是,我们将介绍如何在多个异步任务中并发使用它,如果使用得当,可以显著提高 I/O 密集型应用程序的效率。


准备工作

我们将使用 Docker 在本地启动一个 MySQL 服务器,在其中我们将创建用于演示的数据库和表:

# Create a volume to persist the data.$ docker volume create mysql8-data# Create the container for MySQL.$ docker run--name mysql8-d-e MYSQL_ROOT_PASSWORD=root-p13306:3306-v mysql8-data:/var/lib/mysql mysql:8# Connect to the local MySQL server in Docker.$ dockerexec-it mysql8 mysql-u root-proot mysql>SELECT VERSION();+-----------+|VERSION()|+-----------+|8.3.0|+-----------+1rowinset(0.00sec)
CREATE DATABASE sales;CREATE TABLE `sales`.`customers`(`id` SMALLINT NOT NULL AUTO_INCREMENT,`name` VARCHAR(50)NOT NULL,`job` VARCHAR(50)DEFAULT'',PRIMARY KEY(`id`),UNIQUE `UQ_name`(`name`));INSERT INTO sales.customers(name,job)VALUES('Lynn','Backend Developer');

然后让我们创建一个 虚拟环境,这样我们就可以尝试最新的 Python 和库版本:

conda create-n sql python=3.12conda activate sql pip install-U"sqlalchemy[asyncio]>=2.0,<2.1"pip install-U"aiomysql>=0.2,<0.3"pip install-U"cryptography>=42.0,<42.1"
  • sqlalchemy[asyncio]– SQLAlchemy 与greenlet依赖项一起安装,这是一个 SQLAlchemy 用于异步工作的库。

  • aiomysql– 一个从 asyncio 框架访问 MySQL 数据库的驱动程序,它背后使用 PyMySQL。

  • cryptography– 由 SQLAlchemy 用于身份验证。


异步执行简单的 SQL 查询

要使用 SQLAlchemy 异步运行 SQL 查询,我们首先需要使用create_async_engine()创建一个异步引擎。然后,在创建连接、执行查询和处置引擎时,我们需要使用await

importasynciofromsqlalchemyimporttextfromsqlalchemy.ext.asyncioimportcreate_async_engineasyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithasync_engine.begin()asconn:insert_query=text(""" INSERT INTO sales.customers (name, job) VALUES (:name, :job) """)awaitconn.execute(insert_query,{"name":"Hans","job":"Data Engineer"})# Check the data afer it's inserted.asyncwithasync_engine.connect()asconn:select_query=text(""" SELECT * FROM sales.customers WHERE name = :name """)result=awaitconn.execute(select_query,{"name":"Hans"})print(result.fetchall())# Close and clean-up pooled connections.awaitasync_engine.dispose()asyncio.run(main())

注意,当异步执行如上所示的简单 SQL 查询时,我们需要使用字典传递变量,而不是像同步版本那样使用关键字参数。

当运行上面的代码时,你将看到以下结果打印出来:

[(2,'Hans','Data Engineer'))]

当你想快速开始使用 SQLAlchemy 而又不了解 Core 和 ORM 功能时,使用简单的 SQL 查询是一个不错的选择。然而,如你所见,它并不太符合 Python 风格,因为它使用了自由风格的简单 SQL 查询。当你对 SQLAlchemy 有更多经验时,你可能想使用 Core 或 ORM 功能。


使用 SQLAlchemy Core 异步执行

在 SQLAlchemy 2.0 中,核心功能,通常意味着直接与Table对象交互,现在非常强大。它实际上与 ORM 功能混合到了非常高的程度。例如,select操作符可以用于核心和 ORM。

对于核心使用,我们还需要创建一个异步引擎,然后使用它来异步创建连接。基本工作流程与普通查询相同,不同之处在于语句是通过核心操作符如insertselect构建的。

importasynciofromsqlalchemyimportColumn,Integer,insertfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefmain():# Create an asynchronous engine.async_engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Insert new data with a transation.asyncwithengine.begin()asconn:stmt=insert(table).values(name="Jack",job="Frontend Developer")awaitconn.execute(stmt)# Check the data afer it's inserted.asyncwithengine.connect()asconn:result=awaitconn.execute(select(table).where(table.c.name=="Jack"))print(result.fetchall())# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,将显示以下结果:

[(3,'Jack','Frontend Developer')]

使用 SQLAlchemy ORM 异步

使用 SQLAlchemy ORM 的 ORM 功能要复杂一些,尤其是在 2.0 版本中,ORM 类的创建语法发生了显著变化。特别是,Mapped[]用于指定类型,mapped_column()构造其他列属性。

fromsqlalchemyimportStringfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")

要异步处理 ORM,我们需要使用async_sessionmaker()创建一个异步会话工厂,然后使用with来创建异步会话实例:

# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:...

处理 ORM 的完整异步代码如下:

importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)# Create an async session instance.asyncwithasync_session()assession:# Insert new data with a transation.asyncwithsession.begin():session.add(Customer(name="Stephen",job="Manager"))# Check the data afer it's inserted.asyncwithasync_session()assession:result=awaitsession.execute(select(Customer).where(Customer.name=="Stephen"))customer=result.scalars().one()print(f"name ={customer.name}, job ={customer.job}")# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,将显示以下结果:

name=Stephen,job=Manager

使用 SQLAlchemy Core 在多个异步任务中

在多个异步任务中并发使用 SQLAlchemy Core 简单,因为连接对象可以直接在多个异步任务中传递和使用:

importasynciofrompprintimportpprintfromsqlalchemyimportColumn,IntegerfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_data=MetaData()table=Table("customers",meta_data,Column("id",Integer,primary_key=True),Column("name",String(50),nullable=False),Column("job",String(50),default=""),)asyncdefget_customer(name,conn):result=awaitconn.execute(select(table).where(table.c.name==name))returnresult.fetchone()asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithengine.connect()asconn:fornameinnames:tasks.append(get_customer(name,conn))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,你会看到以下结果打印出来:

[(1,'Lynn','Backend Developer'),(2,'Hans','Data Engineer'),(3,'Jack','Frontend Developer'),(4,'Stephen','Manager')]

在多个异步任务中使用 SQLAlchemy ORM

另一方面,在多个异步任务中使用 SQLAlchemy ORM 要复杂一些,因为不能直接在并发任务中使用相同的AsyncSession实例。

让我们直接尝试使用它并看看会发生什么:

importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name,session):result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session=async_sessionmaker(engine,expire_on_commit=False)names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.asyncwithasync_session()assession:fornameinnames:tasks.append(get_customer(name,session))results=awaitasyncio.gather(*tasks)print(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当运行上述代码时,你会看到以下错误:

sqlalchemy.exc.InvalidRequestError:This sessionisprovisioning a new connection;concurrent operations arenotpermitted

这个错误意味着单个AsyncSession实例不能在多个并发任务(例如使用asyncio.gather()之类的函数)之间共享。如果你想深入了解这个话题,可以查看这个参考。

解决这个问题的简单可行方案是在每个任务中创建一个AsyncSession实例。我们将重构代码以全局创建engineasync_session_factory,然后在每个任务中调用async_session_factory()来创建一个独立的会话:

importasynciofrompprintimportpprintfromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_column# Create an asynchronous engine.engine=create_async_engine("mysql+aiomysql://root:root@localhost:13306/sales")# Create an asynchronous session.async_session_factory=async_sessionmaker(engine,expire_on_commit=False)classBase(DeclarativeBase):passclassCustomer(Base):__tablename__="customers"id:Mapped[int]=mapped_column(primary_key=True)name:Mapped[str]=mapped_column(String(50),nullable=False,unique=True)job:Mapped[str|None]=mapped_column(String(50),default="")asyncdefget_customer(name):# Create an async session instance.asyncwithasync_session_factory()assession:result=awaitsession.execute(select(Customer).where(Customer.name==name))customer=result.scalars().one()return{"name":customer.name,"job":customer.job}asyncdefmain():names=["Lynn","Hans","Jack","Stephen"]tasks=[]# Check the data afer it's inserted.fornameinnames:tasks.append(get_customer(name))results=awaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())

当代码运行时,你会看到以下结果打印出来:

[{'job':'Backend Developer','name':'Lynn'},{'job':'Data Engineer','name':'Hans'},{'job':'Frontend Developer','name':'Jack'},{'job':'Manager','name':'Stephen'}]

就像 HTTP 请求一样,数据库请求也是 I/O 密集型任务,因为它们大部分时间都在等待数据库服务器的响应。因此,我们可以通过并发而不是顺序地执行数据库请求来显著提高应用程序的效率。

另一方面,异步地执行数据库请求也越来越重要,因为异步编程在 Python 中变得越来越流行,尤其是在使用 FastAPI 进行 Web 开发时,这也突出了学习这个主题的必要性。

在这篇文章中,我们介绍了如何在不同的场景下使用 SQLAlchemy 进行异步操作,即使用纯 SQL 查询、Core 和 ORM。你可以简单地调整代码以适应你的特定使用。我们特别介绍了如何在多个异步任务中并发使用 SQLAlchemy,如果应用程序需要并发执行大量数据库请求,这将提高应用程序的效率。


相关文章

  • 学习基础知识并开始使用 SQLAlchemy ORM

  • 如何在 Python 中使用 SQLAlchemy 执行纯 SQL 查询

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:49:35

三脚电感温升特性:选型时必须考虑的因素

三脚电感温升特性&#xff1a;选型时必须考虑的因素从一个烧毁的电感说起某工程师在调试一款48V转12V、输出功率达60W的Buck电源时&#xff0c;发现满载运行不到两小时&#xff0c;主功率电感就出现冒烟现象。示波器显示开关波形正常&#xff0c;控制环路稳定&#xff0c;电感量…

作者头像 李华
网站建设 2026/4/30 10:58:09

为PyTorch项目添加Type Hint提升可维护性

为 PyTorch 项目添加 Type Hint 提升可维护性 在现代深度学习开发中&#xff0c;一个常见的场景是&#xff1a;你接手了一个几个月前由同事训练的模型代码&#xff0c;准备做些微调并重新部署。打开脚本后却发现&#xff0c;某个函数接收一个叫 data 的参数——它到底是个张量&…

作者头像 李华
网站建设 2026/5/1 7:51:39

Markdown数学公式书写:表达PyTorch算法结构

Markdown数学公式书写&#xff1a;表达PyTorch算法结构 在深度学习项目开发中&#xff0c;一个常见的痛点是&#xff1a;模型代码写完了&#xff0c;却难以向同事或评审者清晰地解释其背后的数学逻辑。你可能在 Jupyter Notebook 里跑通了训练流程&#xff0c;但别人打开你的 .…

作者头像 李华
网站建设 2026/5/1 6:57:52

我发现Scikit-learn OneHotEncoder漏sparse,补sparse=True才稳住医疗分类

&#x1f4dd; 博客主页&#xff1a;jaxzheng的CSDN主页 当数据会说话&#xff1a;我的医疗数据科学小故事目录当数据会说话&#xff1a;我的医疗数据科学小故事 一、数据科学&#xff1f;不&#xff0c;是“老中医”的数字版 二、真实故事&#xff1a;从“乱码”到“救命符” …

作者头像 李华
网站建设 2026/5/1 7:51:22

Betaflight高级滤波技巧:适用于高阶飞行场景

Betaflight滤波进阶实战&#xff1a;如何让穿越机“又快又稳”&#xff1f;你有没有遇到过这种情况——刚调好一套高KV电机和轻量化机架&#xff0c;满心期待地起飞&#xff0c;结果一推油门&#xff0c;画面就开始“雪花抖动”&#xff1b;或者在高速穿门时突然机身一震&#…

作者头像 李华
网站建设 2026/5/1 9:07:06

PyTorch模型导出ONNX格式并在其他平台部署

PyTorch模型导出ONNX格式并在其他平台部署 在当今AI产品快速迭代的背景下&#xff0c;一个常见的挑战浮出水面&#xff1a;如何将实验室里训练得很好的PyTorch模型&#xff0c;高效、稳定地部署到从边缘设备到云端服务器的各类硬件平台上&#xff1f;毕竟&#xff0c;不是每个目…

作者头像 李华