阅读量:3
SQLite3封装类教程
SQLite是一种轻量级的数据库,它不需要一个独立的服务器进程。SQLite数据库存储在一个单一的磁盘文件中,这使得它非常适合小型到中型的应用程序,例如移动应用、桌面应用和小型的Web应用。以下是使用Python封装SQLite3数据库操作的一个简单教程。
1. 环境准备
首先,确保你的Python环境中已经安装了sqlite3
模块。sqlite3
模块是Python的标准库的一部分,因此通常不需要额外安装。
2. 理解SQLite3模块
sqlite3
模块提供了一个轻量级的磁盘基的数据库,不需要配置,不需要运行时的进程。它支持SQL语句,可以执行CRUD(创建、读取、更新、删除)操作。
3. 创建SQLiteDB封装类
在Python中,我们可以通过定义一个类来封装对SQLite数据库的操作。这不仅使得代码更加模块化,而且提高了代码的可重用性。以下是一个简单的SQLiteDB
类示例:
import sqlite3 from collections import namedtuple from math import ceil class SQLiteDB: def __init__(self, db_file): """ 初始化SQLite数据库连接和游标 :param db_file: 数据库文件路径 """ self.conn = sqlite3.connect(db_file) self.cursor = self.conn.cursor() # 这里可以添加更多的方法来执行SQL操作,例如创建表、插入数据、查询数据等。
4. 使用SQLiteDB类
创建了SQLiteDB
类后,你可以在你的应用程序中使用这个类来执行数据库操作。以下是一个使用示例:
if __name__ == '__main__': db = SQLiteDB('example.db') # 接下来可以调用db对象的方法来执行数据库操作 # 例如,创建一个表: db.cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER ) ''') db.conn.commit() # 插入数据: db.cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 30)) db.conn.commit() # 查询数据: for user in db.cursor.execute('SELECT * FROM users'): print(user) # 不要忘记在最后关闭数据库连接 db.conn.close()
5. 扩展SQLiteDB类
随着应用程序的发展,你可能需要添加更多的功能到SQLiteDB
类中。例如,你可以添加方法来处理事务、执行复杂的查询、或者实现自动的数据库迁移。
6. 错误处理
在实际的应用程序中,处理数据库操作时的错误是非常重要的。你应该在SQLiteDB
类中添加适当的异常处理逻辑,以确保应用程序的健壮性。
7. 性能优化
对于更复杂的应用程序,你可能需要考虑数据库操作的性能。这可能包括使用索引、优化查询语句、或者使用批处理来减少数据库的I/O操作。
结语
通过封装SQLite3操作,你可以创建一个强大且灵活的数据库交互层,这将有助于简化你的应用程序的数据库逻辑,并提高代码的可维护性。希望这篇教程能帮助你开始使用Python和SQLite3来构建你的应用程序。
完整代码
import sqlite3 from collections import namedtuple from math import ceil class SQLiteDB: def __init__(self, db_file): """ 初始化SQLite数据库连接和游标 :param db_file: 数据库文件路径 """ self.conn = sqlite3.connect(db_file) self.cursor = self.conn.cursor() def create_table(self, table_name, columns): """ 创建表格 :param table_name: 表格名称 :param columns: 列定义,格式为 "列名1 数据类型1, 列名2 数据类型2, ..." """ self.cursor.execute(f''' CREATE TABLE IF NOT EXISTS {table_name} ( {columns} ) ''') self.conn.commit() def insert(self, table_name, data): """ 插入数据 :param table_name: 表格名称 :param data: 要插入的数据,格式为 {"列名1": 值1, "列名2": 值2, ...} :return: 插入的数据的所有行 """ columns = ', '.join(data.keys()) placeholders = ', '.join(['?'] * len(data)) values = tuple(data.values()) self.cursor.execute(f''' INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) ''', values) self.conn.commit() return self.cursor.fetchall() def update(self, table_name, data, condition): """ 更新数据 :param table_name: 表格名称 :param data: 要更新的数据,格式为 {"列名1": 值1, "列名2": 值2, ...} :param condition: 更新条件 """ columns = ', '.join([f'{column}=?' for column in data.keys()]) values = tuple(data.values()) self.cursor.execute(f''' UPDATE {table_name} SET {columns} WHERE {condition} ''', values) self.conn.commit() def delete(self, table_name, condition): """ 删除数据 :param table_name: 表格名称 :param condition: 删除条件 """ self.cursor.execute(f''' DELETE FROM {table_name} WHERE {condition} ''') self.conn.commit() def query_all(self, table_name): """ 查询表格中的所有数据 :param table_name: 表格名称 :return: 所有数据的所有行 """ self.cursor.execute(f''' SELECT * FROM {table_name} ''') return self.cursor.fetchall() def query_page(self, table_name, page_size, page_number, condition=None): """ 分页查询表格中的数据 :param table_name: 表格名称 :param page_size: 每页数据数量 :param page_number: 页码 :param condition: 查询条件 ”age > 10“ :return: 分页数据的所有行、总数和总页数 """ if condition: query = f''' SELECT * FROM {table_name} WHERE {condition} LIMIT {page_size} OFFSET {page_size * (page_number - 1)} ''' else: query = f''' SELECT * FROM {table_name} LIMIT {page_size} OFFSET {page_size * (page_number - 1)} ''' self.cursor.execute(query) results = self.cursor.fetchall() column_names = [desc[0] for desc in self.cursor.description] Row = namedtuple('Row', column_names) rows = [Row(*row) for row in results] # 查询总数 count_query = f'SELECT COUNT(*) FROM {table_name}' if condition: count_query += f' WHERE {condition}' self.cursor.execute(count_query) total_count = self.cursor.fetchone()[0] # 计算总页数 total_pages = ceil(total_count / page_size) return rows, total_count, total_pages def execute_sql(self, sql, params=None): """ 执行自定义的SQL语句 :param sql: SQL语句 :param params: SQL语句中的参数,可选 :return: 执行结果的所有行 """ if params is None: self.cursor.execute(sql) else: self.cursor.execute(sql, params) # self.conn.commit() return self.cursor.fetchall() def query_join(self, table1_name, table2_name, on_condition): """ 执行表格的连接查询 :param table1_name: 第一个表格名称 :param table2_name: 第二个表格名称 :param on_condition: 连接条件 :return: 连接查询结果的所有行 """ self.cursor.execute(f''' SELECT * FROM {table1_name} INNER JOIN {table2_name} ON {on_condition} ''') return self.cursor.fetchall() def close(self): """ 关闭数据库连接 """ self.conn.close() # 使用示例: if __name__ == '__main__': db = SQLiteDB('example.db') # 创建表格 db.create_table('students', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER, gender TEXT') db.create_table('courses', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, teacher TEXT') # 插入数据 db.insert('students', {'name': 'Alice', 'age': 20, 'gender': 'Female'}) db.insert('students', {'name': 'Bob', 'age': 22, 'gender': 'Male'}) db.insert('courses', {'name': 'Math', 'teacher': 'Mr. Smith'}) db.insert('courses', {'name': 'English', 'teacher': 'Ms. Johnson'}) # 查询所有数据 print('All Students:') print(db.query_all('students')) # 两表联查 print('Join Students and Courses:') print(db.query_join('students', 'courses', 'students.id = courses.id')) # 分页查询 rows, total_count, total_pages = db.query_page('users', 10, 1) for row in rows: print(row.name, row.age, row.email) print(total_count) print(total_pages) # 修改数据 db.update('students', {'age': 21}, 'name = "Alice"') # 新增数据 db.insert('students', {'name': 'Charlie', 'age': 19, 'gender': 'Male'}) # 删除数据 db.delete('students', 'name = "Bob"') # 自定义SQL执行 db.execute_sql('DELETE FROM students WHERE name = "Charlie"') db.close()