1 Star 2 Fork 3

myPorject/FastapiMysqlDemo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

FastApi+mysql 快速搭建web后端接口框架

项目地址:

https://github.com/zhongzhongBaby/FastapiMysqlDemo

项目描述:

最近从java转来学习FastApi,想要快速搭建一个FastApi+mysql的后端接口框架,但是网上没有找到demo,所以自己简单写了一个,做了一些简单的封装,希望对和我一样的新学者有所帮助。

功能涵盖:

1.使用数据源访问mysql
2.实现中文api文档
3.请求参数数据验证
4.使用jwt实现Token登录验证
5.不同环境(生产、测试),配置环境参数

1.数据源访问工具类

import pymysql
from DBUtils.PooledDB import PooledDB


class MySQLHelper(object):
    def __init__(self, host, port, dbuser, password, database):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            maxshared=3,
            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host=host,
            port=int(port),
            user=dbuser,
            password=password,
            database=database,
            charset='utf8'
        )

    def create_conn_cursor(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn, cursor

    def fetch_one(self, sql):
        conn, cursor = self.create_conn_cursor()
        cursor.execute(sql)
        result = cursor.fetchone()
        cursor.close()
        conn.close()
        return result

    def fetch_all(self, sql, args):
        conn, cursor = self.create_conn_cursor()
        cursor.execute(sql, args)
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result

    def fetch_all2(self, sql):
        conn, cursor = self.create_conn_cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result

    def insert_one(self, sql, args):
        conn, cursor = self.create_conn_cursor()
        res = cursor.execute(sql, args)
        conn.commit()
        print(res)
        conn.close()
        return res

    def update(self, sql, args):
        conn, cursor = self.create_conn_cursor()
        res = cursor.execute(sql, args)
        conn.commit()
        print(res)
        conn.close()
        return res


sql_helper = MySQLHelper("127.0.0.1", 3306, "root", "123456", "demo")


2.api文档实现效果

image

3.请求参数数据验证

//使用pydantic.Field方法数据验证
class FindCourseListRequest(FindRequestBase):
    course_id: str = Field(None, title="课程id", max_length=300)
    name: str = Field(None, title="名称", max_length=300)
    subject: str = Field(..., title="科目", max_length=300) 

以上是对参数的基本校验,如:参数类型,是否必传,参数长度等。 如果需要进行更多自定义的校验可以使用pydantic.@validator。

4.使用jwt实现Token登录验证

from datetime import datetime, timedelta
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt import *
from passlib.context import CryptContext
from pydantic import BaseModel
from utils.mysql_utils import sql_helper
from fastapi import APIRouter
from pydantic import Field

login_router = APIRouter()

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24  # 会话超时时间 默认1天

# 系统默认的登录用户,如果不去数据库进行验证用户信息,可在这儿配置
fake_users_db = {
    "pxxAdmin": {
        "username": "pxxAdmin",
        "hashed_password": "$2b$12$Y26vyX0FkZHBq3T57GzdwOd4WxJDoHV0PckspBfKbZ4LkDPOc1A4y",
        "disabled": False,
    }
}


class Token(BaseModel):
    user_name: str = Field(None, description="token 类型")
    access_token: str = Field(..., description="访问token密文,其他接口访问时在header上加参数(Authorization:密文) ")
    token_type: str = Field(..., description="token 类型")


class TokenData(BaseModel):
    username: str = None


class User(BaseModel):
    username: str
    disabled: bool = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    user_dict = get_user_from_db(username)
    if user_dict is not None:
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(*, data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@login_router.post("/login", response_model=Token, summary="登录接口")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    '''
    默认账号  密码  pxxAdmin   123456

    '''
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"user_name": user.username, "access_token": "Bearer " + str(access_token, encoding="utf8"),
            "token_type": "bearer2"}


# 从数据库获取用户信息
def get_user_from_db(username):
    sql = "SELECT username,hashed_password, disabled FROM  sys_user where username = '%s' " % username
    user = sql_helper.fetch_one(sql)
    return user

访问login接口,登录成功,可以返回token密文

{
    "user_name": "Admin",
    "access_token": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJBZG1pbiIsImV4cCI6MTU4NzgwNTcxMH0.qwahQ22GaMGOpoE_xHr_Dg-i8VjO1bwEl2uJSz1ry6U",
    "token_type": "bearer2"
}

之后在访问其他需要登录验证的接口的时候,只需要在接口请求header上添加token密文参数

Authorization :Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJBZG1pbiIsImV4cCI6MTU4NzgwNTcxMH0.qwahQ22GaMGOpoE_xHr_Dg-i8VjO1bwEl2uJSz1ry6U

设置接口登录验证只需要在接口方法上添加参数:
current_user: User = Depends(get_current_active_user)

async def find_course_selective(current_user: User = Depends(get_current_active_user),
                                request: FindCourseListRequest = Body(None, title="课程筛选条件")):

5.不同环境(生产、测试),配置环境参数

新建几个环境参数文件:prod.env,test.env

from pydantic import BaseSettings
from functools import lru_cache


@lru_cache()
def get_settings():
    return Settings(_env_file='prod.env')


class Settings(BaseSettings):
    app_name: str = "FastApiMysqlDemo"
    admin_email: str = "844383583@qq.com"
    database: dict = {
        "url": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "password": "12346",
        "database_name": "demo"
    }

    class Config:
        env_file = "prod.env"

@lru_cache()目的:因为环境参数要从文件中读取,这样存于内存可以避免IO次数。 使用:

import config
database = config.get_settings().database

sql_helper = MySQLHelper(database["url"], database["port"],
                         database["user"], database["password"], database["database_name"])

项目启动

- 项目启动

        uvicorn main:app --reload
        # 指定端口和主机
        uvicorn main:app --reload  --port 8000 --host 192.168.1.24

空文件

简介

FastApi+mysql 快速搭建web后端接口框架,希望对和我一样的新学者有所帮助。 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/my-porject/FastapiMysqlDemo.git
git@gitee.com:my-porject/FastapiMysqlDemo.git
my-porject
FastapiMysqlDemo
FastapiMysqlDemo
master

搜索帮助