我大约五个月前写了一篇关于使用 FastAPI 调整存储库模式的文章,我得到了很多阅读(谢谢)。我来写一个有效的方法来处理会话处理,仍然使用存储库模式。
在我直接进入它之前,我注意到在生产中,每当我的 API 尝试进行涉及读取或写入数据库的事务时,我都会收到以下错误之一:
此错误表明有一个未提交的事务正在进行,需要在继续任何其他数据库操作之前回滚。
此错误的最常见原因是在数据库事务期间发生未处理的异常,这会阻止事务被正确提交或回滚。
此错误表示您对数据库所做的操作或请求无效或不受支持。此错误可能有多种原因,包括:
我确信您对如何解决这些错误有想法,但是,我想声明的是,即使我确定了问题所在并进行了修复,但问题仍然存在。
如果您对我如何排查和解决它们感到好奇,您可以考虑按照以下步骤操作:
让我们直接了解我是如何制定一个已证明对我有用的永久解决方案的。我将继续使用我在演示如何使用存储库模式时从事的项目。
我们有一个模块,我们使用以下代码存储我们的 orm 基础会话 mixin:
# SQLAlchemy Imports from sqlalchemy.orm import Session # Own Imports from config.database import SessionLocal from core.settings import ledger_settings class ORMSessionMixin: """Base orm session mixin for interacting with the database.""" def __init__(self): """ Get the next database session from the database pool. """ self.orm: Session = self.get_db().__next__() def get_db(self): """ This method creates a database session, yields it, rollback the transaction if there's an exception and then finally closes the session. Yields: db: scoped database session """ db = SessionLocal() try: yield db except Exception: db.rollback() finally: db.close()
这个解决方案的问题是,如果在交易过程中发生异常(这可能是任何事情:创建用户、为你的钱包注资等)——异常没有得到正确处理,并且传输中的数据库会话没有得到回滚。
经过三个月的调试和修补以及大量研究,我终于能够构建一种有效的会话处理方式。
# SQLAlchemy Imports import sqlalchemy from sqlalchemy.orm import Session # Own Imports from config.database.connection import SessionLocal class DatabaseSessionMixin: """Database session mixin.""" def __enter__(self) -> Session: self.db = SessionLocal() return self.db def __exit__(self, exc_type, exc_val, exc_tb): try: if exc_type is not None: self.db.rollback() except sqlalchemy.exc.SQLAlchemyError: pass finally: self.db.close() SessionLocal.remove() def use_database_session(): return DatabaseSessionMixin()
在这段代码中:
DatabaseSession
是一个上下文管理器类,它处理会话并确保在出现错误时正确关闭和回滚。
__enter__
方法初始化会话并返回它。
__exit__
方法检查异常并在发生异常时回滚会话。然后它关闭会话并将其从作用域会话中删除。
use_database_session
是一个实用函数,可用作装饰器或上下文管理器以简化会话使用。
以下是如何使用 use_database_session 实用程序函数的示例:
with use_database_session() as db: # perform logic that uses the session # ... # After exiting the context, the session will be automatically closed and removed from the scoped session.
上述方法提供了一种更干净、更有效的方式来处理会话,并确保在出现错误时正确回滚或关闭它们。让我们继续讨论如何在 ORM 中使用数据库会话时实现存储库模式。
# SQLAlchemy Imports import sqlalchemy from sqlalchemy.orm import Session class BaseRepository: def __init__(self, session: Session): self.db = session class UserRepository(BaseRepository): """Operations to interact with the `users` table in the database.""" def get(self, user_id: int) -> User: """This method gets a user from the database.""" user = ( self.db.query(User) .filter(User.id == user_id) .first() ) return user def create(self, name: str, email: str, password: str) -> User: """This method creates a user.""" user = User(name=name, email=email, password=password) self.db.add(user) self.db.commit() self.db.refresh(user) return user def update_user(self, user_id: int, updated_data: dict): """This method updates a user.""" user = self.get(user_id) if user: for key, value in updated_data.items(): setattr(user, key, value) self.db.commit() return user return None def delete_user(self, user_id): """This method deletes a user.""" user = self.get_user(user_id) if user: self.db.delete(user) self.db.commit() return True return False
接下来是将上述存储库集成到应用程序的服务层中。假设您有一个创建用户帐户的服务功能;这是使用我们的新方法的方法:
# Apps Imports from apps.users.models import User from apps.users.repo import UserRepository from apps.users.schemas.auth import UserCreate # Config Imports from config.security.hashers import password from config.database.session_mixin import use_database_session async def create_user(user: UserCreate) -> User: """ This function creates a new user in the database. :param user: schemas.UserCreate :type user: schemas.UserCreate :return: The user object """ with use_database_session() as db: users_repo = UserRepository(db) user = users_repo.create( user.name, user.email, password.hash(user.password) ) return user
上述模式将允许您在利用继承的数据库会话的同时将数据库操作封装在存储库类中。它还在您的 ORM 模型和存储库逻辑之间提供了清晰的分离。
总之,在构建后端系统时,有效地处理会话很重要。
数据库事务中出现的sqlalchemy.exc.PendingRollbackError
、 sqlalchemy.exc.InvalidRequestError
等错误如果处理不当会导致数据不一致和应用程序失败。
识别并解决这些错误对于维护系统的完整性和可靠性非常重要。
要解决与会话处理相关的问题,必须实施稳健的策略。一种方法是使用上下文管理器,例如我们在文章中演示的DatabaseSessionMixin
。
该上下文管理器可确保在出现异常时正确打开、关闭和回滚会话。通过将会话逻辑封装在上下文管理器中,您可以简化会话管理并改进错误处理。
此外,将存储库模式集成到应用程序的服务层中可以进一步提高会话处理的效率。
通过将数据库操作分离到存储库类中并利用从上下文管理器继承的会话,您可以实现更清晰的代码组织并在 ORM 模型和存储库逻辑之间保持清晰的分离。
总的来说,高效的会话处理对于保持数据一致性、防止错误和确保后端系统的稳定性至关重要。
通过遵循最佳实践,例如使用上下文管理器和采用存储库模式,开发人员可以构建健壮且可靠的系统,以有效地管理会话并处理数据库事务期间的错误。
我愿意编写演出并积极寻找涉及使用 Python(Django、FastAPI 等)构建的合同角色。