SQLite3封装类教程

avatar
作者
猴君
阅读量:3

SQLite3封装类教程

SQLite是一种轻量级的数据库,它不需要一个独立的服务器进程。SQLite数据库存储在一个单一的磁盘文件中,这使得它非常适合小型到中型的应用程序,例如移动应用、桌面应用和小型的Web应用。以下是使用Python封装SQLite3数据库操作的一个简单教程。

1. 环境准备

首先,确保你的Python环境中已经安装了sqlite3模块。sqlite3模块是Python的标准库的一部分,因此通常不需要额外安装。

2. 理解SQLite3模块

sqlite3模块提供了一个轻量级的磁盘基的数据库,不需要配置,不需要运行时的进程。它支持SQL语句,可以执行CRUD(创建、读取、更新、删除)操作。

3. 创建SQLiteDB封装类

在Python中,我们可以通过定义一个类来封装对SQLite数据库的操作。这不仅使得代码更加模块化,而且提高了代码的可重用性。以下是一个简单的SQLiteDB类示例:

import sqlite3 from collections import namedtuple from math import ceil  class SQLiteDB:     def __init__(self, db_file):         """         初始化SQLite数据库连接和游标         :param db_file: 数据库文件路径         """         self.conn = sqlite3.connect(db_file)         self.cursor = self.conn.cursor()      # 这里可以添加更多的方法来执行SQL操作,例如创建表、插入数据、查询数据等。 

4. 使用SQLiteDB类

创建了SQLiteDB类后,你可以在你的应用程序中使用这个类来执行数据库操作。以下是一个使用示例:

if __name__ == '__main__':     db = SQLiteDB('example.db')      # 接下来可以调用db对象的方法来执行数据库操作     # 例如,创建一个表:     db.cursor.execute('''         CREATE TABLE IF NOT EXISTS users (             id INTEGER PRIMARY KEY,             name TEXT NOT NULL,             age INTEGER         )     ''')     db.conn.commit()      # 插入数据:     db.cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 30))     db.conn.commit()      # 查询数据:     for user in db.cursor.execute('SELECT * FROM users'):         print(user)      # 不要忘记在最后关闭数据库连接     db.conn.close() 

5. 扩展SQLiteDB类

随着应用程序的发展,你可能需要添加更多的功能到SQLiteDB类中。例如,你可以添加方法来处理事务、执行复杂的查询、或者实现自动的数据库迁移。

6. 错误处理

在实际的应用程序中,处理数据库操作时的错误是非常重要的。你应该在SQLiteDB类中添加适当的异常处理逻辑,以确保应用程序的健壮性。

7. 性能优化

对于更复杂的应用程序,你可能需要考虑数据库操作的性能。这可能包括使用索引、优化查询语句、或者使用批处理来减少数据库的I/O操作。

结语

通过封装SQLite3操作,你可以创建一个强大且灵活的数据库交互层,这将有助于简化你的应用程序的数据库逻辑,并提高代码的可维护性。希望这篇教程能帮助你开始使用Python和SQLite3来构建你的应用程序。

完整代码

import sqlite3 from collections import namedtuple from math import ceil  class SQLiteDB:     def __init__(self, db_file):         """         初始化SQLite数据库连接和游标         :param db_file: 数据库文件路径         """         self.conn = sqlite3.connect(db_file)         self.cursor = self.conn.cursor()      def create_table(self, table_name, columns):         """         创建表格         :param table_name: 表格名称         :param columns: 列定义,格式为 "列名1 数据类型1, 列名2 数据类型2, ..."         """         self.cursor.execute(f'''             CREATE TABLE IF NOT EXISTS {table_name} (                 {columns}             )         ''')         self.conn.commit()      def insert(self, table_name, data):         """         插入数据         :param table_name: 表格名称         :param data: 要插入的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}         :return: 插入的数据的所有行         """         columns = ', '.join(data.keys())         placeholders = ', '.join(['?'] * len(data))         values = tuple(data.values())          self.cursor.execute(f'''             INSERT INTO {table_name} ({columns}) VALUES ({placeholders})         ''', values)         self.conn.commit()         return self.cursor.fetchall()      def update(self, table_name, data, condition):         """         更新数据         :param table_name: 表格名称         :param data: 要更新的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}         :param condition: 更新条件         """         columns = ', '.join([f'{column}=?' for column in data.keys()])         values = tuple(data.values())          self.cursor.execute(f'''             UPDATE {table_name} SET {columns} WHERE {condition}         ''', values)         self.conn.commit()      def delete(self, table_name, condition):         """         删除数据         :param table_name: 表格名称         :param condition: 删除条件         """         self.cursor.execute(f'''             DELETE FROM {table_name} WHERE {condition}         ''')         self.conn.commit()      def query_all(self, table_name):         """         查询表格中的所有数据         :param table_name: 表格名称         :return: 所有数据的所有行         """         self.cursor.execute(f'''             SELECT * FROM {table_name}         ''')         return self.cursor.fetchall()      def query_page(self, table_name, page_size, page_number, condition=None):         """         分页查询表格中的数据         :param table_name: 表格名称         :param page_size: 每页数据数量         :param page_number: 页码         :param condition: 查询条件 ”age > 10“         :return: 分页数据的所有行、总数和总页数         """         if condition:             query = f'''                 SELECT * FROM {table_name}                 WHERE {condition}                 LIMIT {page_size} OFFSET {page_size * (page_number - 1)}             '''         else:             query = f'''                 SELECT * FROM {table_name}                 LIMIT {page_size} OFFSET {page_size * (page_number - 1)}             '''          self.cursor.execute(query)         results = self.cursor.fetchall()         column_names = [desc[0] for desc in self.cursor.description]          Row = namedtuple('Row', column_names)         rows = [Row(*row) for row in results]          # 查询总数         count_query = f'SELECT COUNT(*) FROM {table_name}'         if condition:             count_query += f' WHERE {condition}'         self.cursor.execute(count_query)         total_count = self.cursor.fetchone()[0]          # 计算总页数         total_pages = ceil(total_count / page_size)          return rows, total_count, total_pages       def execute_sql(self, sql, params=None):         """         执行自定义的SQL语句         :param sql: SQL语句         :param params: SQL语句中的参数,可选         :return: 执行结果的所有行         """         if params is None:             self.cursor.execute(sql)         else:             self.cursor.execute(sql, params)         # self.conn.commit()         return self.cursor.fetchall()      def query_join(self, table1_name, table2_name, on_condition):         """         执行表格的连接查询         :param table1_name: 第一个表格名称         :param table2_name: 第二个表格名称         :param on_condition: 连接条件         :return: 连接查询结果的所有行         """         self.cursor.execute(f'''                SELECT * FROM {table1_name}                 INNER JOIN {table2_name} ON {on_condition}            ''')         return self.cursor.fetchall()      def close(self):         """         关闭数据库连接         """         self.conn.close()   # 使用示例: if __name__ == '__main__':     db = SQLiteDB('example.db')      # 创建表格     db.create_table('students', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER, gender TEXT')     db.create_table('courses', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, teacher TEXT')      # 插入数据     db.insert('students', {'name': 'Alice', 'age': 20, 'gender': 'Female'})     db.insert('students', {'name': 'Bob', 'age': 22, 'gender': 'Male'})     db.insert('courses', {'name': 'Math', 'teacher': 'Mr. Smith'})     db.insert('courses', {'name': 'English', 'teacher': 'Ms. Johnson'})      # 查询所有数据     print('All Students:')     print(db.query_all('students'))      # 两表联查     print('Join Students and Courses:')     print(db.query_join('students', 'courses', 'students.id = courses.id'))      # 分页查询     rows, total_count, total_pages = db.query_page('users', 10, 1)     for row in rows:         print(row.name, row.age, row.email)     print(total_count)     print(total_pages)      # 修改数据     db.update('students', {'age': 21}, 'name = "Alice"')      # 新增数据     db.insert('students', {'name': 'Charlie', 'age': 19, 'gender': 'Male'})      # 删除数据     db.delete('students', 'name = "Bob"')      # 自定义SQL执行     db.execute_sql('DELETE FROM students WHERE name = "Charlie"')      db.close() 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!