# mysql_generator_python
**Repository Path**: ayowin/mysql_generator_python
## Basic Information
- **Project Name**: mysql_generator_python
- **Description**: A python code generator project for mysql.
- **Primary Language**: Python
- **License**: Not specified
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-09-26
- **Last Updated**: 2025-09-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# MySQL 代码生成器
一个基于 Python 的 MySQL 数据库表结构到 Python 代码的自动生成工具,可以将数据库表自动转换为 Python 模型类和数据库操作 Mapper 类。
## 功能特点
- 自动连接 MySQL 数据库并获取表结构
- 自动生成 Python 模型类(Model)
- 自动生成数据库操作类(Mapper)
- 支持类型提示和字段注释
- 使用 Jinja2 模板引擎,可自定义模板
- 支持表名前缀过滤
- 生成完整的代码结构,包括 `__init__.py` 文件
## 项目结构
```
mysql_generator/
├── BaseModel.py # 基础模型类
├── Config.py # 配置文件
├── FieldInfo.py # 字段信息类
├── MySQLGenerator.py # 核心生成器类
├── TableInfo.py # 表信息类
├── main.py # 主入口
├── requirements.txt # 依赖包
├── templates/ # 模板目录
│ ├── MapperTemplate.py # Mapper 类模板
│ └── ModelTemplate.py # 模型类模板
└── test/ # 测试目录
```
## 安装依赖
```bash
pip install -r requirements.txt
```
## 配置
在使用前,请修改 `Config.py` 文件中的数据库配置和代码生成配置:
```python
# 数据库配置
DB_CONFIG = {
'host': 'localhost', # 数据库主机
'port': 3306, # 数据库端口
'user': 'root', # 数据库用户名
'password': '123456', # 数据库密码
'database': 'test_db', # 要连接的数据库名
'charset': 'utf8mb4' # 数据库字符集
}
# 代码生成配置
GENERATE_CONFIG = {
'output_dir': 'generated_classes', # 生成的类文件存放目录
'table_prefix': '', # 表名前缀,生成类名时会去掉
'use_type_hints': True, # 是否使用类型提示
'base_class': 'BaseModel' # 所有映射类的基类
}
```
## 使用方法
1. 修改 `Config.py` 中的数据库配置,确保连接信息正确
2. 运行 `main.py`:
```bash
python main.py
```
程序将自动连接到 MySQL 数据库,读取所有表的结构信息,并生成对应的 Python 模型类和 Mapper 类。
## 生成的代码结构
生成的代码将保存在 `Config.py` 中配置的 `output_dir` 目录下,结构如下:
```
generated_classes/
├── __init__.py # 包初始化文件,包含数据库连接创建函数
├── models/ # 模型类目录
│ ├── __init__.py # 模型包初始化文件
│ ├── User.py # User 模型类
│ ├── Product.py # Product 模型类
│ └── ... # 其他模型类
└── mappers/ # Mapper 类目录
├── __init__.py # Mapper 包初始化文件
├── UserMapper.py # User 数据操作类
├── ProductMapper.py # Product 数据操作类
└── ... # 其他 Mapper 类
```
## 模型类特性
生成的模型类具有以下特性:
- 继承自 `BaseModel` 基类
- 包含所有表字段的属性和 setter/getter 方法
- 支持类型提示
- 包含字段注释
- 实现了 `to_dict()` 和 `from_dict()` 方法,方便数据转换
- 实现了 `to_json()` 和 `from_json()` 方法,方便 JSON 序列化
- 实现了 `__str__()` 和 `__repr__()` 方法,方便调试
## Mapper 类特性
生成的 Mapper 类具有以下特性:
- 封装了基本的数据库操作方法
- 支持单条记录的增删改查
- 支持批量操作
- 支持条件查询和分页
- 支持记录统计
- 使用参数化查询,防止 SQL 注入
### Mapper 类主要方法
- `insert(entity)` - 插入一条记录
- `update(entity)` - 更新一条记录
- `delete(id)` - 删除一条记录
- `get_by_id(id)` - 根据ID查询记录
- `find_all(limit=None, offset=None)` - 查询所有记录
- `find_by_condition(conditions, limit=None, offset=None)` - 根据条件查询记录
- `count_by_condition(conditions)` - 根据条件统计记录数
- `count_all()` - 统计所有记录数
- `batch_insert(entities)` - 批量插入记录
- `batch_delete(ids)` - 批量删除记录
- `get_by_ids(ids)` - 根据ID列表查询记录
## 自定义模板
如果需要自定义生成的代码,可以修改 `templates` 目录下的模板文件:
- `ModelTemplate.py` - 模型类模板
- `MapperTemplate.py` - Mapper 类模板
模板使用 Jinja2 语法,可以通过修改这些文件来调整生成的代码结构和风格。
## 示例
假设数据库中有 `users` 表,结构如下:
```sql
CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL,
`email` varchar(100) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`created_at` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```
运行生成器后,将生成以下文件:
### models/User.py
```python
from typing import Optional, Any
from datetime import datetime, date
import json
from BaseModel import BaseModel
class User(BaseModel):
"""
users 表的映射类
"""
@property
def id(self) -> int:
"""id"""
return self._id
@id.setter
def id(self, value: int):
self._id = value
@property
def username(self) -> str:
"""username"""
return self._username
@username.setter
def username(self, value: str):
self._username = value
@property
def email(self) -> Optional[str]:
"""email"""
return self._email
@email.setter
def email(self, value: Optional[str]):
self._email = value
@property
def age(self) -> Optional[int]:
"""age"""
return self._age
@age.setter
def age(self, value: Optional[int]):
self._age = value
@property
def created_at(self) -> datetime:
"""created_at"""
return self._created_at
@created_at.setter
def created_at(self, value: datetime):
self._created_at = value
def __init__(self, id: int = None, username: str = None, email: Optional[str] = None, age: Optional[int] = None, created_at: datetime = None):
"""初始化User实例"""
super().__init__()
self._id = id
self._username = username
self._email = email
self._age = age
self._created_at = created_at
def to_dict(self) -> dict:
"""将对象转换为字典"""
return {
"id": self.id,
"username": self.username,
"email": self.email,
"age": self.age,
"created_at": self.created_at
}
@classmethod
def from_dict(cls, data: dict) -> 'User':
"""从字典创建对象"""
return cls(
id=data.get("id"),
username=data.get("username"),
email=data.get("email"),
age=data.get("age"),
created_at=data.get("created_at")
)
def to_json(self) -> str:
"""将对象转换为JSON字符串"""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_json(cls, json_str: str) -> 'User':
"""从JSON字符串创建对象"""
data = json.loads(json_str)
return cls.from_dict(data)
def __str__(self) -> str:
# 手动构建字段字符串
field_strings = []
field_strings.append("id=" + str(self._id))
field_strings.append("username=" + str(self._username))
field_strings.append("email=" + str(self._email))
field_strings.append("age=" + str(self._age))
field_strings.append("created_at=" + str(self._created_at))
return "User(" + ", ".join(field_strings) + ")"
def __repr__(self) -> str:
return self.__str__()
```
### mappers/UserMapper.py
```python
from typing import List, Optional, Any, Union
from datetime import datetime, date
import pymysql
from ..models.User import User
class UserMapper:
"""
users 表的数据库操作类
"""
def __init__(self, connection: pymysql.connections.Connection):
"""
初始化UserMapper
Args:
connection: 数据库连接对象
"""
self.connection = connection
def insert(self, entity: User) -> int:
"""
插入一条记录
Args:
entity: 要插入的User对象
Returns:
插入成功的记录ID
"""
# 构建字段名和占位符
field_names = []
placeholders = []
params = []
field_names.append("`username`")
placeholders.append("%s")
params.append(entity.username)
field_names.append("`email`")
placeholders.append("%s")
params.append(entity.email)
field_names.append("`age`")
placeholders.append("%s")
params.append(entity.age)
field_names.append("`created_at`")
placeholders.append("%s")
params.append(entity.created_at)
sql = f"""
INSERT INTO users ({", ".join(field_names)})
VALUES ({", ".join(placeholders)})
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
return cursor.lastrowid
def update(self, entity: User) -> int:
"""
更新一条记录
Args:
entity: 要更新的User对象
Returns:
更新的记录数
"""
# 构建SET子句
set_clauses = []
params = []
set_clauses.append(f"`username` = %s")
params.append(entity.username)
set_clauses.append(f"`email` = %s")
params.append(entity.email)
set_clauses.append(f"`age` = %s")
params.append(entity.age)
set_clauses.append(f"`created_at` = %s")
params.append(entity.created_at)
# 构建WHERE子句
where_clauses = []
where_clauses.append(f"`id` = %s")
params.append(entity.id)
sql = f"""
UPDATE users SET
{", ".join(set_clauses)}
WHERE {" AND ".join(where_clauses)}
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
return cursor.rowcount
def delete(self, id: int) -> int:
"""
删除一条记录
Args:
id: 主键值
Returns:
删除的记录数
"""
# 构建WHERE子句
where_clauses = []
params = []
where_clauses.append(f"`id` = %s")
params.append(id)
sql = f"""
DELETE FROM users
WHERE {" AND ".join(where_clauses)}
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
return cursor.rowcount
def get_by_id(self, id: int) -> Optional[User]:
"""
根据ID查询记录
Args:
id: 主键值
Returns:
查询到的User对象,如果不存在则返回None
"""
# 构建WHERE子句
where_clauses = []
params = []
where_clauses.append(f"`id` = %s")
params.append(id)
sql = f"""
SELECT * FROM users
WHERE {" AND ".join(where_clauses)}
"""
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, params)
result = cursor.fetchone()
if result:
return User.from_dict(result)
return None
def find_all(self, limit: int = None, offset: int = None) -> List[User]:
"""
查询所有记录
Args:
limit: 限制返回的记录数
offset: 偏移量
Returns:
查询到的User对象列表
"""
sql = "SELECT * FROM users"
if limit is not None:
sql += f" LIMIT {limit}"
if offset is not None:
sql += f" OFFSET {offset}"
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return [User.from_dict(result) for result in results]
def find_by_condition(self, conditions: dict, limit: int = None, offset: int = None) -> List[User]:
"""
根据条件查询记录
Args:
conditions: 查询条件字典
limit: 限制返回的记录数
offset: 偏移量
Returns:
查询到的User对象列表
"""
sql = "SELECT * FROM users WHERE "
where_clauses = []
params = []
for field, value in conditions.items():
where_clauses.append(f"`{field}` = %s")
params.append(value)
sql += " AND ".join(where_clauses)
if limit is not None:
sql += f" LIMIT {limit}"
if offset is not None:
sql += f" OFFSET {offset}"
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, params)
results = cursor.fetchall()
return [User.from_dict(result) for result in results]
def count_by_condition(self, conditions: dict) -> int:
"""
根据条件统计记录数
Args:
conditions: 查询条件字典
Returns:
记录数
"""
sql = "SELECT COUNT(*) as count FROM users WHERE "
where_clauses = []
params = []
for field, value in conditions.items():
where_clauses.append(f"`{field}` = %s")
params.append(value)
sql += " AND ".join(where_clauses)
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, params)
result = cursor.fetchone()
return result['count']
def count_all(self) -> int:
"""
统计所有记录数
Returns:
记录数
"""
sql = "SELECT COUNT(*) as count FROM users"
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql)
result = cursor.fetchone()
return result['count']
def batch_insert(self, entities: List[User]) -> List[int]:
"""
批量插入记录
Args:
entities: 要插入的User对象列表
Returns:
插入成功的记录ID列表
"""
if not entities:
return []
# 构建字段名和占位符
field_names = []
placeholders = []
field_names.append("`username`")
field_names.append("`email`")
field_names.append("`age`")
field_names.append("`created_at`")
# 为每个实体构建参数集
params = []
for entity in entities:
param_set = []
param_set.append(entity.username)
param_set.append(entity.email)
param_set.append(entity.age)
param_set.append(entity.created_at)
params.append(param_set)
sql = f"""
INSERT INTO users ({", ".join(field_names)})
VALUES ({", ".join(["%s"] * len(field_names))})
"""
ids = []
with self.connection.cursor() as cursor:
for param_set in params:
cursor.execute(sql, param_set)
ids.append(cursor.lastrowid)
self.connection.commit()
return ids
def batch_delete(self, ids: List[int]) -> int:
"""
批量删除记录
Args:
ids: 要删除的记录ID列表
Returns:
删除的记录数
"""
if not ids:
return 0
# 构建WHERE子句 - 假设只有一个主键
primary_key_name = "id"
sql = f"""
DELETE FROM users
WHERE `{primary_key_name}` IN ({", ".join(["%s"] * len(ids))})
"""
params = ids
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
self.connection.commit()
return cursor.rowcount
def get_by_ids(self, ids: List[int]) -> List[User]:
"""
根据ID列表查询记录
Args:
ids: 主键值列表
Returns:
查询到的User对象列表
"""
if not ids:
return []
# 构建WHERE子句
where_clauses = []
params = []
where_clauses.append(f"`id` IN ({", ".join(["%s"] * len(ids))})")
params.extend(ids)
sql = f"""
SELECT * FROM users
WHERE {" AND ".join(where_clauses)}
"""
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, params)
results = cursor.fetchall()
return [User.from_dict(result) for result in results]
```
## 注意事项
1. 确保已安装 MySQL 服务,并且配置正确
2. 确保已安装 Python 3.6+
3. 确保已安装项目依赖(PyMySQL 和 jinja2)
4. 在运行前,请检查并修改 `Config.py` 中的数据库配置
5. 生成的代码需要 Python 3.6+ 环境运行
---
***以下是关于测试示例的说明***
## 测试目录
项目包含一个完整的测试目录 `test/`,用于验证生成的代码是否正常工作。测试目录包含以下文件:
- `setup_database_for_test.py` - 测试数据库初始化脚本
- `test.py` - 测试套件
### 测试数据库初始化
在运行测试之前,需要先创建测试数据库和表。运行以下命令:
```bash
python test/setup_database_for_test.py
```
该脚本将:
1. 检查 MySQL 服务器是否运行
2. 创建名为 `test_db` 的数据库(如果不存在)
3. 创建测试所需的表:
- `users` - 用户信息表
- `departments` - 部门信息表
- `products` - 产品信息表
4. 插入一些测试数据
注意:请确保 `Config.py` 中的数据库配置与 `setup_database_for_test.py` 中的配置一致。
### 运行测试
初始化数据库后,可以运行测试套件:
```bash
python test/test.py
```
测试套件包含以下测试:
1. **模型类测试**:测试模型的序列化和反序列化功能
- `test_users_model()` - 测试用户模型
- `test_departments_model()` - 测试部门模型
- `test_products_model()` - 测试产品模型
2. **Mapper类CRUD测试**:测试Mapper类的增删改查操作
- `test_users_crud()` - 测试用户Mapper的CRUD操作
- `test_departments_crud()` - 测试部门Mapper的CRUD操作
- `test_products_crud()` - 测试产品Mapper的CRUD操作
3. **批量操作测试**:测试Mapper类的批量操作功能
- `test_users_batch_operations()` - 测试用户Mapper的批量操作
- `test_departments_batch_operations()` - 测试部门Mapper的批量操作
- `test_products_batch_operations()` - 测试产品Mapper的批量操作
### 测试覆盖的功能
测试套件验证了以下功能:
- 模型类的序列化和反序列化(to_dict/from_dict, to_json/from_json)
- 单条记录的增删改查(Create, Read, Update, Delete)
- 批量操作(批量插入、批量查询、批量删除)
- 条件查询和分页功能
- 记录统计功能
通过运行测试,可以确保生成的代码在真实数据库环境中正常工作。