# mysql_generator_python **Repository Path**: ayowin/mysql_generator_python ## Basic Information - **Project Name**: mysql_generator_python - **Description**: A python code generator project for mysql. - **Primary Language**: Python - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-26 - **Last Updated**: 2025-09-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MySQL 代码生成器 一个基于 Python 的 MySQL 数据库表结构到 Python 代码的自动生成工具,可以将数据库表自动转换为 Python 模型类和数据库操作 Mapper 类。 ## 功能特点 - 自动连接 MySQL 数据库并获取表结构 - 自动生成 Python 模型类(Model) - 自动生成数据库操作类(Mapper) - 支持类型提示和字段注释 - 使用 Jinja2 模板引擎,可自定义模板 - 支持表名前缀过滤 - 生成完整的代码结构,包括 `__init__.py` 文件 ## 项目结构 ``` mysql_generator/ ├── BaseModel.py # 基础模型类 ├── Config.py # 配置文件 ├── FieldInfo.py # 字段信息类 ├── MySQLGenerator.py # 核心生成器类 ├── TableInfo.py # 表信息类 ├── main.py # 主入口 ├── requirements.txt # 依赖包 ├── templates/ # 模板目录 │ ├── MapperTemplate.py # Mapper 类模板 │ └── ModelTemplate.py # 模型类模板 └── test/ # 测试目录 ``` ## 安装依赖 ```bash pip install -r requirements.txt ``` ## 配置 在使用前,请修改 `Config.py` 文件中的数据库配置和代码生成配置: ```python # 数据库配置 DB_CONFIG = { 'host': 'localhost', # 数据库主机 'port': 3306, # 数据库端口 'user': 'root', # 数据库用户名 'password': '123456', # 数据库密码 'database': 'test_db', # 要连接的数据库名 'charset': 'utf8mb4' # 数据库字符集 } # 代码生成配置 GENERATE_CONFIG = { 'output_dir': 'generated_classes', # 生成的类文件存放目录 'table_prefix': '', # 表名前缀,生成类名时会去掉 'use_type_hints': True, # 是否使用类型提示 'base_class': 'BaseModel' # 所有映射类的基类 } ``` ## 使用方法 1. 修改 `Config.py` 中的数据库配置,确保连接信息正确 2. 运行 `main.py`: ```bash python main.py ``` 程序将自动连接到 MySQL 数据库,读取所有表的结构信息,并生成对应的 Python 模型类和 Mapper 类。 ## 生成的代码结构 生成的代码将保存在 `Config.py` 中配置的 `output_dir` 目录下,结构如下: ``` generated_classes/ ├── __init__.py # 包初始化文件,包含数据库连接创建函数 ├── models/ # 模型类目录 │ ├── __init__.py # 模型包初始化文件 │ ├── User.py # User 模型类 │ ├── Product.py # Product 模型类 │ └── ... # 其他模型类 └── mappers/ # Mapper 类目录 ├── __init__.py # Mapper 包初始化文件 ├── UserMapper.py # User 数据操作类 ├── ProductMapper.py # Product 数据操作类 └── ... # 其他 Mapper 类 ``` ## 模型类特性 生成的模型类具有以下特性: - 继承自 `BaseModel` 基类 - 包含所有表字段的属性和 setter/getter 方法 - 支持类型提示 - 包含字段注释 - 实现了 `to_dict()` 和 `from_dict()` 方法,方便数据转换 - 实现了 `to_json()` 和 `from_json()` 方法,方便 JSON 序列化 - 实现了 `__str__()` 和 `__repr__()` 方法,方便调试 ## Mapper 类特性 生成的 Mapper 类具有以下特性: - 封装了基本的数据库操作方法 - 支持单条记录的增删改查 - 支持批量操作 - 支持条件查询和分页 - 支持记录统计 - 使用参数化查询,防止 SQL 注入 ### Mapper 类主要方法 - `insert(entity)` - 插入一条记录 - `update(entity)` - 更新一条记录 - `delete(id)` - 删除一条记录 - `get_by_id(id)` - 根据ID查询记录 - `find_all(limit=None, offset=None)` - 查询所有记录 - `find_by_condition(conditions, limit=None, offset=None)` - 根据条件查询记录 - `count_by_condition(conditions)` - 根据条件统计记录数 - `count_all()` - 统计所有记录数 - `batch_insert(entities)` - 批量插入记录 - `batch_delete(ids)` - 批量删除记录 - `get_by_ids(ids)` - 根据ID列表查询记录 ## 自定义模板 如果需要自定义生成的代码,可以修改 `templates` 目录下的模板文件: - `ModelTemplate.py` - 模型类模板 - `MapperTemplate.py` - Mapper 类模板 模板使用 Jinja2 语法,可以通过修改这些文件来调整生成的代码结构和风格。 ## 示例 假设数据库中有 `users` 表,结构如下: ```sql CREATE TABLE `users` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL, `email` varchar(100) DEFAULT NULL, `age` int(11) DEFAULT NULL, `created_at` datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `idx_username` (`username`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` 运行生成器后,将生成以下文件: ### models/User.py ```python from typing import Optional, Any from datetime import datetime, date import json from BaseModel import BaseModel class User(BaseModel): """ users 表的映射类 """ @property def id(self) -> int: """id""" return self._id @id.setter def id(self, value: int): self._id = value @property def username(self) -> str: """username""" return self._username @username.setter def username(self, value: str): self._username = value @property def email(self) -> Optional[str]: """email""" return self._email @email.setter def email(self, value: Optional[str]): self._email = value @property def age(self) -> Optional[int]: """age""" return self._age @age.setter def age(self, value: Optional[int]): self._age = value @property def created_at(self) -> datetime: """created_at""" return self._created_at @created_at.setter def created_at(self, value: datetime): self._created_at = value def __init__(self, id: int = None, username: str = None, email: Optional[str] = None, age: Optional[int] = None, created_at: datetime = None): """初始化User实例""" super().__init__() self._id = id self._username = username self._email = email self._age = age self._created_at = created_at def to_dict(self) -> dict: """将对象转换为字典""" return { "id": self.id, "username": self.username, "email": self.email, "age": self.age, "created_at": self.created_at } @classmethod def from_dict(cls, data: dict) -> 'User': """从字典创建对象""" return cls( id=data.get("id"), username=data.get("username"), email=data.get("email"), age=data.get("age"), created_at=data.get("created_at") ) def to_json(self) -> str: """将对象转换为JSON字符串""" return json.dumps(self.to_dict(), default=str) @classmethod def from_json(cls, json_str: str) -> 'User': """从JSON字符串创建对象""" data = json.loads(json_str) return cls.from_dict(data) def __str__(self) -> str: # 手动构建字段字符串 field_strings = [] field_strings.append("id=" + str(self._id)) field_strings.append("username=" + str(self._username)) field_strings.append("email=" + str(self._email)) field_strings.append("age=" + str(self._age)) field_strings.append("created_at=" + str(self._created_at)) return "User(" + ", ".join(field_strings) + ")" def __repr__(self) -> str: return self.__str__() ``` ### mappers/UserMapper.py ```python from typing import List, Optional, Any, Union from datetime import datetime, date import pymysql from ..models.User import User class UserMapper: """ users 表的数据库操作类 """ def __init__(self, connection: pymysql.connections.Connection): """ 初始化UserMapper Args: connection: 数据库连接对象 """ self.connection = connection def insert(self, entity: User) -> int: """ 插入一条记录 Args: entity: 要插入的User对象 Returns: 插入成功的记录ID """ # 构建字段名和占位符 field_names = [] placeholders = [] params = [] field_names.append("`username`") placeholders.append("%s") params.append(entity.username) field_names.append("`email`") placeholders.append("%s") params.append(entity.email) field_names.append("`age`") placeholders.append("%s") params.append(entity.age) field_names.append("`created_at`") placeholders.append("%s") params.append(entity.created_at) sql = f""" INSERT INTO users ({", ".join(field_names)}) VALUES ({", ".join(placeholders)}) """ with self.connection.cursor() as cursor: cursor.execute(sql, params) self.connection.commit() return cursor.lastrowid def update(self, entity: User) -> int: """ 更新一条记录 Args: entity: 要更新的User对象 Returns: 更新的记录数 """ # 构建SET子句 set_clauses = [] params = [] set_clauses.append(f"`username` = %s") params.append(entity.username) set_clauses.append(f"`email` = %s") params.append(entity.email) set_clauses.append(f"`age` = %s") params.append(entity.age) set_clauses.append(f"`created_at` = %s") params.append(entity.created_at) # 构建WHERE子句 where_clauses = [] where_clauses.append(f"`id` = %s") params.append(entity.id) sql = f""" UPDATE users SET {", ".join(set_clauses)} WHERE {" AND ".join(where_clauses)} """ with self.connection.cursor() as cursor: cursor.execute(sql, params) self.connection.commit() return cursor.rowcount def delete(self, id: int) -> int: """ 删除一条记录 Args: id: 主键值 Returns: 删除的记录数 """ # 构建WHERE子句 where_clauses = [] params = [] where_clauses.append(f"`id` = %s") params.append(id) sql = f""" DELETE FROM users WHERE {" AND ".join(where_clauses)} """ with self.connection.cursor() as cursor: cursor.execute(sql, params) self.connection.commit() return cursor.rowcount def get_by_id(self, id: int) -> Optional[User]: """ 根据ID查询记录 Args: id: 主键值 Returns: 查询到的User对象,如果不存在则返回None """ # 构建WHERE子句 where_clauses = [] params = [] where_clauses.append(f"`id` = %s") params.append(id) sql = f""" SELECT * FROM users WHERE {" AND ".join(where_clauses)} """ with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) result = cursor.fetchone() if result: return User.from_dict(result) return None def find_all(self, limit: int = None, offset: int = None) -> List[User]: """ 查询所有记录 Args: limit: 限制返回的记录数 offset: 偏移量 Returns: 查询到的User对象列表 """ sql = "SELECT * FROM users" if limit is not None: sql += f" LIMIT {limit}" if offset is not None: sql += f" OFFSET {offset}" with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql) results = cursor.fetchall() return [User.from_dict(result) for result in results] def find_by_condition(self, conditions: dict, limit: int = None, offset: int = None) -> List[User]: """ 根据条件查询记录 Args: conditions: 查询条件字典 limit: 限制返回的记录数 offset: 偏移量 Returns: 查询到的User对象列表 """ sql = "SELECT * FROM users WHERE " where_clauses = [] params = [] for field, value in conditions.items(): where_clauses.append(f"`{field}` = %s") params.append(value) sql += " AND ".join(where_clauses) if limit is not None: sql += f" LIMIT {limit}" if offset is not None: sql += f" OFFSET {offset}" with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) results = cursor.fetchall() return [User.from_dict(result) for result in results] def count_by_condition(self, conditions: dict) -> int: """ 根据条件统计记录数 Args: conditions: 查询条件字典 Returns: 记录数 """ sql = "SELECT COUNT(*) as count FROM users WHERE " where_clauses = [] params = [] for field, value in conditions.items(): where_clauses.append(f"`{field}` = %s") params.append(value) sql += " AND ".join(where_clauses) with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) result = cursor.fetchone() return result['count'] def count_all(self) -> int: """ 统计所有记录数 Returns: 记录数 """ sql = "SELECT COUNT(*) as count FROM users" with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql) result = cursor.fetchone() return result['count'] def batch_insert(self, entities: List[User]) -> List[int]: """ 批量插入记录 Args: entities: 要插入的User对象列表 Returns: 插入成功的记录ID列表 """ if not entities: return [] # 构建字段名和占位符 field_names = [] placeholders = [] field_names.append("`username`") field_names.append("`email`") field_names.append("`age`") field_names.append("`created_at`") # 为每个实体构建参数集 params = [] for entity in entities: param_set = [] param_set.append(entity.username) param_set.append(entity.email) param_set.append(entity.age) param_set.append(entity.created_at) params.append(param_set) sql = f""" INSERT INTO users ({", ".join(field_names)}) VALUES ({", ".join(["%s"] * len(field_names))}) """ ids = [] with self.connection.cursor() as cursor: for param_set in params: cursor.execute(sql, param_set) ids.append(cursor.lastrowid) self.connection.commit() return ids def batch_delete(self, ids: List[int]) -> int: """ 批量删除记录 Args: ids: 要删除的记录ID列表 Returns: 删除的记录数 """ if not ids: return 0 # 构建WHERE子句 - 假设只有一个主键 primary_key_name = "id" sql = f""" DELETE FROM users WHERE `{primary_key_name}` IN ({", ".join(["%s"] * len(ids))}) """ params = ids with self.connection.cursor() as cursor: cursor.execute(sql, params) self.connection.commit() return cursor.rowcount def get_by_ids(self, ids: List[int]) -> List[User]: """ 根据ID列表查询记录 Args: ids: 主键值列表 Returns: 查询到的User对象列表 """ if not ids: return [] # 构建WHERE子句 where_clauses = [] params = [] where_clauses.append(f"`id` IN ({", ".join(["%s"] * len(ids))})") params.extend(ids) sql = f""" SELECT * FROM users WHERE {" AND ".join(where_clauses)} """ with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(sql, params) results = cursor.fetchall() return [User.from_dict(result) for result in results] ``` ## 注意事项 1. 确保已安装 MySQL 服务,并且配置正确 2. 确保已安装 Python 3.6+ 3. 确保已安装项目依赖(PyMySQL 和 jinja2) 4. 在运行前,请检查并修改 `Config.py` 中的数据库配置 5. 生成的代码需要 Python 3.6+ 环境运行
---
***以下是关于测试示例的说明*** ## 测试目录 项目包含一个完整的测试目录 `test/`,用于验证生成的代码是否正常工作。测试目录包含以下文件: - `setup_database_for_test.py` - 测试数据库初始化脚本 - `test.py` - 测试套件 ### 测试数据库初始化 在运行测试之前,需要先创建测试数据库和表。运行以下命令: ```bash python test/setup_database_for_test.py ``` 该脚本将: 1. 检查 MySQL 服务器是否运行 2. 创建名为 `test_db` 的数据库(如果不存在) 3. 创建测试所需的表: - `users` - 用户信息表 - `departments` - 部门信息表 - `products` - 产品信息表 4. 插入一些测试数据 注意:请确保 `Config.py` 中的数据库配置与 `setup_database_for_test.py` 中的配置一致。 ### 运行测试 初始化数据库后,可以运行测试套件: ```bash python test/test.py ``` 测试套件包含以下测试: 1. **模型类测试**:测试模型的序列化和反序列化功能 - `test_users_model()` - 测试用户模型 - `test_departments_model()` - 测试部门模型 - `test_products_model()` - 测试产品模型 2. **Mapper类CRUD测试**:测试Mapper类的增删改查操作 - `test_users_crud()` - 测试用户Mapper的CRUD操作 - `test_departments_crud()` - 测试部门Mapper的CRUD操作 - `test_products_crud()` - 测试产品Mapper的CRUD操作 3. **批量操作测试**:测试Mapper类的批量操作功能 - `test_users_batch_operations()` - 测试用户Mapper的批量操作 - `test_departments_batch_operations()` - 测试部门Mapper的批量操作 - `test_products_batch_operations()` - 测试产品Mapper的批量操作 ### 测试覆盖的功能 测试套件验证了以下功能: - 模型类的序列化和反序列化(to_dict/from_dict, to_json/from_json) - 单条记录的增删改查(Create, Read, Update, Delete) - 批量操作(批量插入、批量查询、批量删除) - 条件查询和分页功能 - 记录统计功能 通过运行测试,可以确保生成的代码在真实数据库环境中正常工作。