Python实现轻量级数据库引擎(续)——新增“事务功能”

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

前期基础教程:

「Python3.11.0」手把手教你安装最新版Python运行环境

讲讲Python环境使用Pip命令快速下载各类库的方法

Python启航:30天编程速成之旅(第2天)-IDE安装

【Python教程】JupyterLab 开发环境安装

接上期教程:

Python实现轻量级数据库引擎——用200行代码复刻SQLite3核心功能

上期教程带着大家实现了简单的数据库基本功能,今天继续增加新的功能:事务处理功能,确保数据操作的原子性

一、事务功能介绍

事务用于将多个数据库操作作为单个逻辑单元执行,保证:

原子性:所有操作成功或全部回滚一致性:保持数据完整性隔离性:事务中间状态不可见持久性:提交后修改永久保存

二、代码修改步骤

1. 添加事务状态管理

import json import copy # 新增导入 class SimpleDatabase: def __init__(self, db_file): self.db_file = db_file self.in_transaction = False # 事务状态标志 self.checkpoint = None # 数据快照 # …其他原有初始化代码…

2. 新增事务控制方法

def begin_transaction(self): “””开启事务””” if self.in_transaction: print(“事务已在进行中”) return self.checkpoint = copy.deepcopy(self.data) # 创建数据快照 self.in_transaction = True def commit(self): “””提交事务””” if not self.in_transaction: print(“没有活跃事务可提交”) return try: self.save() # 持久化数据 finally: self.in_transaction = False self.checkpoint = None def rollback(self): “””回滚事务””” if not self.in_transaction: print(“没有活跃事务可回滚”) return self.data = copy.deepcopy(self.checkpoint) # 恢复数据 self.in_transaction = False self.checkpoint = None

3. 修改原有保存逻辑

在所有数据修改方法中增加事务状态判断:

# 以insert方法为例 def insert(self, table_name, values): if table_name in self.data: table = self.data[table_name] if len(values) == len(table[“columns”]): table[“rows”].append(values) if not self.in_transaction: # 非事务中立即保存 self.save() # …其余原有代码…

三、使用示例

# 初始化数据库 db = SimpleDatabase(test_db.json) db.create_table(accounts, [id, balance]) # 正常事务流程 try: db.begin_transaction() db.insert(accounts, [1, 1000]) db.insert(accounts, [2, 2000]) db.commit() except: db.rollback() # 异常回滚流程 try: db.begin_transaction() db.update(accounts, [None, 1500], (0, 1)) # 账户1余额更新 raise Exception(“模拟故障”) db.commit() except: db.rollback() # 所有修改撤销

四、完整修改后代码

import json import copy class SimpleDatabase: def __init__(self, db_file): self.db_file = db_file self.in_transaction = False self.checkpoint = None try: with open(self.db_file, r) as f: self.data = json.load(f) except FileNotFoundError: self.data = {} def save(self): with open(self.db_file, w) as f: json.dump(self.data, f, indent=4) def begin_transaction(self): if self.in_transaction: print(“Transaction already in progress.”) return self.checkpoint = copy.deepcopy(self.data) self.in_transaction = True def commit(self): if not self.in_transaction: print(“No active transaction to commit.”) return try: self.save() finally: self.in_transaction = False self.checkpoint = None def rollback(self): if not self.in_transaction: print(“No active transaction to rollback.”) return self.data = copy.deepcopy(self.checkpoint) self.in_transaction = False self.checkpoint = None # 原有方法(create_table/insert/select/update/delete)保持结构不变 # 仅在save()调用处添加事务状态判断,例如: def insert(self, table_name, values): if table_name in self.data: table = self.data[table_name] if len(values) == len(table[“columns”]): table[“rows”].append(values) if not self.in_transaction: self.save() else: print(“Number of values does not match number of columns.”) else: print(f”Table {table_name} does not exist.”) def create_table(self, table_name, columns): “”” 创建一个新的表 :param table_name: 表名 :param columns: 列名列表 “”” if table_name not in self.data: self.data[table_name] = { “columns”: columns, “rows”: [] } if not self.in_transaction: self.save() else: print(f”Table {table_name} already exists.”) def select(self, table_name, where=None): “”” 从指定表中查询数据 :param table_name: 表名 :param where: 查询条件,格式为 (column_index, value) :return: 查询结果列表 “”” if table_name in self.data: table = self.data[table_name] if where is None: return table[“rows”] else: column_index, value = where return [row for row in table[“rows”] if row[column_index] == value] else: print(f”Table {table_name} does not exist.”) return [] def update(self, table_name, set_values, where): “”” 更新指定表中的数据 :param table_name: 表名 :param set_values: 要更新的值列表 :param where: 更新条件,格式为 (column_index, value) “”” if table_name in self.data: table = self.data[table_name] column_index, value = where for row in table[“rows”]: if row[column_index] == value: for i, val in enumerate(set_values): if val is not None: row[i] = val if not self.in_transaction: self.save() else: print(f”Table {table_name} does not exist.”) def delete(self, table_name, where): “”” 从指定表中删除数据 :param table_name: 表名 :param where: 删除条件,格式为 (column_index, value) “”” if table_name in self.data: table = self.data[table_name] column_index, value = where table[“rows”] = [row for row in table[“rows”] if row[column_index] != value] if not self.in_transaction: self.save() else: print(f”Table {table_name} does not exist.”) # 使用示例 if __name__ == “__main__”: # 初始化数据库 db = SimpleDatabase(test_db.json) db.create_table(accounts, [id, balance]) # 正常事务流程 try: db.begin_transaction() db.insert(accounts, [1, 1000]) db.insert(accounts, [2, 2000]) db.commit() except: db.rollback() # 异常回滚流程 try: db.begin_transaction() db.update(accounts, [None, 1500], (0, 1)) # 账户1余额更新 raise Exception(“模拟故障”) db.commit() except: db.rollback() # 所有修改撤销

增加事务回滚,账户1未发生改变:

可以将raise Exception(“模拟故障”)这行代码注释掉,再运行。

运行结果,账户1已经更新至1500:

五、注意事项

嵌套事务:当前实现不支持嵌套事务,需在前一个事务提交/回滚后再开启新事务数据持久化:提交前修改仅存在于内存,系统崩溃可能导致数据丢失性能影响:长时间事务会占用更多内存(保持数据快照)错误处理:建议使用try-exatch块包裹事务操作

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部