# 基本介绍

# 特点

# 快速使用

  • 安装
pip install fastapi
pip install uvicorn
  • 相关代码
# FastAPI 是一个为你的 API 提供了所有功能的 Python 类
from fastapi import FastAPI  

# 这个实例将是创建你所有 API 的主要交互对象。这个 app 同样在如下命令中被 uvicorn 所引用
app = FastAPI()  

@app.get("/")
async def root():
    return {"message": "Hello yuan"}
  • 通过以下命令运行服务器
uvicorn main:app --port=8090 --reload
  • 也可以直接运行
if __name__ == '__main__':
    import uvicorn
    # main:app" 对应 main.py 文件,app 是 FastAPI 应用实例名
    uvicorn.run("main:app", host="127.0.0.1", port=8080, debug=True, reload=True)
  • 自动生成的交互式 API 文档 http://127.0.0.1:8080/docs

# 路径操作

# 路径操作装饰器

  • fastapi支持各种请求方式
@app.get()
@app.post()
@app.put()
@app.patch()
@app.delete()
@app.options()
@app.head()
@app.trace()
  • 路径操作装饰器参数
@app.get("/get/{id}",
         status_code=200,
         tags=["get方法"],
         deprecated=False,
         summary="get方法",
         description="get方法描述",
         response_model=str, # 响应模型
         response_description="get方法响应描述",
         )

# include_router 的使用

# quick.py
from fastapi import FastAPI

from app1 import app01
from app2 import app02

app = FastAPI()

app.include_router(app01, prefix="/app01", tags=["第一章节:商城接口",])
app.include_router(app02, prefix="/app02", tags=["第二章节:用户中心接口",])

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("quick:app", host="127.0.0.1", port=8030, reload=True)
# app1.__init__.py
from .app1 import app01

# app2.__init__.py
from .app2 import app02
from fastapi import APIRouter
app01 = APIRouter()

@app01.get("/shop/food")
def shop_food():
    return {"shop": "food"}


@app01.get("/shop/bed")
def shop_food():
    return {"shop": "bed"}
from fastapi import APIRouter
app02 = APIRouter()

@app02.post("/user/login")
def user_login():
    return {"user": "login"}


@app02.post("/user/reg")
def user_reg():
    return {"user": "reg"}

# 请求数据

# 路径参数

  • 基本用法
@app01.get("/shop/food/{food_id}")
def shop_food(food_id):
    return {"shop": food_id}
  • 有类型的路径参数
@app01.get("/shop/food/{food_id}")
def shop_food(food_id:int):
    return {"shop": f"{type(food_id)}"}
  • 注意顺序
# 要确保路径 /user/me 声明在路径 /user/{username}之前
@app.get("/user/me")
async def read_user_me():
    return {"username": "the current user"}

@app.get("/user/{username}")
async def read_user(username: str):
    return {"username": username}

# 查询参数(请求参数)

路径函数中声明不属于路径参数的其他函数参数时,它们将被自动解释为"查询字符串"参数,就是 url? 之后用&分割的 key-value 键值对

@app01.get("/shop/food/{food_id}")
def shop_food(food_id: int, food_name: Union[str, None] = None):
    if food_name and food_price:
        return {"food_id": food_id, "food_name": food_name}
    else:
        return {"food_id": food_id}

# 请求体数据

FastAPI 基于 Pydantic,Pydantic 主要用来做类型强制检查(校验数据)。不符合类型要求就会抛出异常

# 安装 pydantic
pip install pydantic
from datetime import date
from typing import Optional, Union, List

from fastapi import FastAPI
from pydantic import BaseModel, Field, field_validator, ValidationError


class Address(BaseModel):
    city: str
    state: str

class User(BaseModel):
    name: str = "sylone"
	# 当一个模型属性具有默认值时,它不是必需的。否则它是一个必需属性
    age: int = Field(default=0, ge=18, le=60)
    # 表明 birthday 属性的类型可以是 date 对象,也能为 None
    birthday: Optional[date] = None
	# Mutable default '[]' is not allowed. Use 'default_factory'
    friends: List[int] = Field(default_factory=[]) # []
	# 将默认值设为 None 可使其成为可选属性
    decription: Union[str, None] = None

    address: Union[Address, None] = None

    @field_validator("name")
    def name_must_alpha(cls, v):
        assert v.isalpha(), "name must be alpha"
        return v

class Item(BaseModel):
    users: List[User]

app = FastAPI()

# 测试
# {
#   "users": [
#     {
# 	  "name": "rain",
# 	  "age": 32,
# 	  "birthday": "2022-09-29",
# 	  "friends": [1],
# 	  "description": "最帅的讲fastapi的老师",
# 	   "address":{
# 		 "city":"111",
# 		 "state":"1111"
# 	   }
# 	}
#   ]
# }
# FastAPI 会自动将定义的模型类转化为JSON
@app.post("/data")
async def create_data(data: Item):
    return data


if __name__ == "__main__":
    try:
        User(name="zhangsan", age=50, friends=[1, 2, 3], decription="123", 
		     address=Address(city="1", state="2"))
			 
    except ValidationError as e:
        print(e.json())

# form表单数据

FastAPI 可以使用Form组件来接收表单数据,需要先安装 python-multipart

python install python-multipart
@app.post("/reg")
# ... 是 Python 的 Ellipsis 对象
# 它表示一个占位符,用于表示一个参数或变量的值是必须的,但没有具体的值
# 在 FastAPI 里,Ellipsis 对象常被用于 Form、Query、Path 等函数,表明某个参数是必需的
def user_reg(username:str = Form(..., min_length=2, max_length=10, regex="^[a-zA-Z]"),
             password:str = Form(..., min_length=6, max_length=10, regex="^[0-9_]")):
    print(f"username:{username}, password:{password}")
    return {"user": "reg"}

# 验证不通过的返回结果
# {
#     "detail": [
#         {
#             "type": "string_pattern_mismatch",
#             "loc": [
#                 "body",
#                 "username"
#             ],
#             "msg": "String should match pattern '^[a-zA-Z]'",
#             "input": "1212",
#             "ctx": {
#                 "pattern": "^[a-zA-Z]"
#             }
#         }
#     ]
# }

# 文件上传

  • 适合小文件上传
@app.post("/file")
async def file_upload(file: bytes = File()):
    print("file", file) # <class 'bytes'>
    return {"file_size": len(file)}

@app.post("/multiFile")
async def multi_file_upload(files: list[bytes] = File()):
    return {"file_size": [len(file) for file in files]}
  • 适合大文件上传
@app.post("/files")
async def files_upload(file: UploadFile = File()):
    with open(file.filename, "wb") as f:
        f.write(file.file.read())
    return {"file_name": file.filename}

@app.post("/multiFiles")
async def multi_files_upload(files: list[UploadFile] = File()):
    return {"file_size": [file.filename for file in files]}

# Reqeust对象

在函数中声明Request类型的参数,FastAPI 就会自动传递 Request 对象给这个参数

@app.get("/test")
async def test(request: Request):
    return {
        "query_params": request.query_params,
        "headers": request.headers,
        "agent": request.headers.get("user-agent"),
        "cookies": request.cookies,
        "method": request.method,
        "url": request.url, # "http://127.0.0.1:8030/test"
        "path": request.url.path, # "/test"
        "IP": request.client.host, # "127.0.0.1"
    }

# 请求静态文件

请求如 css/js 和图片文件等

from fastapi import FastAPI, Request, File, UploadFile
from starlette.staticfiles import StaticFiles

app = FastAPI()
app.mount("/imgs", StaticFiles(directory="imgs"))

# 响应数据

# response_model

FastAPI将使用response_model进行以下操作:

# 将输出数据转换为response_model中声明的数据类型。
# 验证数据结构和类型
# 将输出数据限制为该model定义的
# 添加到OpenAPI中
# 在自动文档系统中使用
class UserIn(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Union[str, None] = None
    
class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: Union[str, None] = None

@app.post("/user/", response_model=UserOut)
async def create_user(user: UserIn):
    return user

# 数据过滤

  • response_model_exclude_unset:仅返回显式设定的值
  • response_model_exclude_defaults:不返回是默认值的字段
  • response_model_exclude_none:不返回是None的字段
class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
    tags: list[str] = []

Items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The Bar fighters", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}
@app.get("/items/{item_id}", response_model=Item, response_model_exclude_unset=True)
async def read_item(item_id: str):
    return Items[item_id] 
	 
# {
#     "name": "Foo",
#     "price": 50.2
# }

@app.get("/items/{item_id}/name", response_model=Item, 
                                  response_model_include=["name", "description"])
async def read_item_name(item_id: str):
    return Items[item_id]
# {
#     "name": "Foo",
#     "description": null
# }

@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude=["tax"])
async def read_item_public_data(item_id: str):
    return Items[item_id]

# {
#     "name": "Foo",
#     "price": 50.2,
#     "description": null,
#     "tags": []
# }

# jinja2模板

jinja2是Flask作者开发的一个模板系统,起初是仿django模板的一个模板引擎,为Flask提供模板支持,由于其灵活,快速和安全等优点被广泛使用。

# jinja2 的变量

from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
import uvicorn

app = FastAPI()

# 实例化Jinja2对象,并将文件夹路径设置为以templates命令的文件夹
templates = Jinja2Templates(directory="templates")


@app.get('/')
def hello(request: Request):
    return templates.TemplateResponse('index.html',
        {
			# 注意,返回模板响应时,必须有request键值对,且值为Request请求对象
            'request': request,  
            'user': 'yuan',
            "books": ["水浒传", "西游记", "三国演义", "红楼梦"],
            "booksDict": {
                "水浒传": {"price": 100, "author": "施耐庵"},
                "西游记": {"price": 200, "author": "吴承恩"},
            }
        }
    )
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>{{ user}}</h1>

<p>{{ books.0 }}</p>
<p>{{ books.1 }}</p>
<p>{{ books.2 }}</p>
<p>{{ books.3 }}</p>

<p>{{ booksDict.水浒传.price }}</p>
</body>
</html>

# jinja2 的过滤器

变量可以通过“过滤器”进⾏修改,过滤器是jinja2里面的内置函数和字符串处理函数。常用的过滤器有:

过滤器名称 说明
capitialize 把值的首字母转换成大写,其他字母转换为小写
lower 把值转换成小写形式
title 把值中每个单词的首字母都转换成大写
trim 把值的首尾空格去掉
striptags 渲染之前把值中所有的HTML标签都删掉
join 拼接多个值为字符串
round 默认对数字进⾏四舍五⼊,也可以用参数进⾏控制
safe 渲染时值不转义

需要在变量后面使用管道 | 分割,多个过滤器可以链式调用

{{ 'abc'| captialize  }}  # Abc
{{ 'abc'| upper  }} # ABC
{{ 'hello world'| title  }} # Hello World
{{ "hello world"| replace('world','yuan') | upper }} # HELLO YUAN
{{ 18.18 | round | int }} # 18

# jinja2 的控制结构

  • 条件语句不需要使用冒号结尾,而结束控制语句,需要使用endif关键字
{% if age > 18 %}
    <p>成年区</p>
{% else %}
    <p>未成年区</p>
{% endif %}
  • for循环用于迭代Python的数据类型,包括列表,元组和字典。在jinja2中不存在while循环
{% for book in books %}
    <p>{{ book }}</p>
{% endfor %}

# ORM操作

fastapi是一个很优秀的框架,但是缺少一个合适的orm,官方代码里面使用的是sqlalchemy
Tortoise ORM (opens new window) 是受 Django 启发的易于使用的异步 ORM

pip install tortoise-orm

# Tortoise ORM 支持的数据库 settings.py

  • PostgreSQL >= 9.4(使用asyncpg)
from tortoise import Tortoise

# 配置 PostgreSQL 数据库
config = {
    'db_url': 'postgres://user:password@localhost/dbname',
    'modules': {
        'tortoise.backends.postgres': {
            'sslmode': 'disable'  # 可选的,根据你的 PostgreSQL 配置调整
        }
    },
    'generate_schemas': True  # 自动创建表结构
}

# 初始化 Tortoise ORM
Tortoise.init(**config)

# 创建表
Tortoise.generate_schemas()
  • SQLite(使用aiosqlite)
from tortoise import Tortoise

# 配置 SQLite 数据库
config = {
	# 使用内存中的 SQLite 数据库,也可以指定文件路径
    'db_url': 'sqlite://:memory:',  
    'modules': {
        'tortoise.backends.sqlite': {
            '_fk': True,  # 开启外键支持
        }
    },
    'generate_schemas': True  # 自动创建表结构
}

# 初始化 Tortoise ORM
Tortoise.init(**config)

# 创建表
Tortoise.generate_schemas()
  • MySQL/MariaDB(使用aiomysql或使用asyncmy)
from tortoise import Tortoise

# 配置 MySQL 数据库
TORTOISE_ORM = {
    "connections": {
        "default": {
            # "engine": "tortoise.backends.asyncpg",   # 数据库引擎 PostgresQL
            "engine": "tortoise.backends.mysql",  # 数据库引擎 Mysql or Mariadb
            "credentials": {
                "host": "127.0.0.1",    # 地址
                "port": "3306",         # 端口
                "user": "root",         # 用户名
                "password": "root",     # 密码
                "database": "fastapi",  # 数据库名称(需要提前创建数据库)
                "minsize": 1,           # 最少连接
                "maxsize": 5,           # 最大连接
                "charset": "utf8mb4",   # 编码
                "echo": True            # 是否反馈SQL语句
            }
        }
    },
    "apps": {
        "models": {
			# Aerich 需要在 TORTOISE_ORM 配置里注册其模型,否则无法找到默认连接
            "models": ["models", "aerich.models"],
            "default_connection": "default"
        }
    },
    "use_tz": False,
    "timezone": "Asia/Shanghai"
}

# 以下的可以在项目启动中配置

# 初始化 Tortoise ORM
# Tortoise.init(**TORTOISE_ORM)

# 创建表
# Tortoise.generate_schemas()
  • Oracle
from tortoise import Tortoise

config = {
    'db_url': 'oracle://username:password@host:port/service_name',
    'modules': {
        'tortoise.backends.oracle': {
            # 这里可以配置其他 Oracle 特定的设置,如使用钱包等
        }
    },
    'generate_schemas': True  # 如果需要自动创建表结构,设置为 True
}

Tortoise.init(**config)

# 创建模型文件 models.py

from tortoise import fields, Model


class Classes(Model):
    name = fields.CharField(max_length=255, description="班级名称")

class Teacher(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=255, description="老师名称")
    tno = fields.IntField(description="帐号")
    pwd = fields.CharField(max_length=255, description="密码")

class Course(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=255, description="课程名称")
    # 多对一
    teacher = fields.ForeignKeyField("models.Teacher", related_name="courses")

class Student(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=255, description="学生名称")
    sno = fields.IntField(description="帐号")
    pwd = fields.CharField(max_length=255, description="密码")
    # 一对多
    classes = fields.ForeignKeyField("models.Classes", related_name="students")
    # 多对多
    courses = fields.ManyToManyField("models.Course", related_name="students")

# aerich 迁移工具

aerich是一种ORM迁移工具,需要结合tortoise异步orm框架使用

pip install aerich
  • 初始化配置,只需要使用一次
aerich init -t settings.TORTOISE_ORM # TORTOISE_ORM 配置的位置

# 初始化完会在当前目录生成一个文件:pyproject.toml和一个文件夹:migrations
# pyproject.toml:保存配置文件路径,低版本可能是aerich.ini
# migrations:存放迁移文件
  • 初始化数据库,一般情况下只用一次
aerich init-db

# 此时数据库中就有相应的表格
# 如果TORTOISE_ORM配置文件中的models改了名,则执行这条命令时需要增加--app参数,来指定你修改的名字
  • 更新模型并进行迁移
# 修改model类,重新生成迁移文件,比如添加一个字段

aerich migrate [--name] (标记修改操作) #  aerich migrate
# 注意,此时sql并没有执行,数据库中表中没有更新
  • 重新执行迁移,写入数据库
aerich upgrade
  • 回到上一个版本
aerich downgrade
  • 查看历史迁移记录
aerich history

# 项目启动

import uvicorn
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise

import orm
from settings import TORTOISE_ORM


app = FastAPI()
app.include_router(orm.api_student, prefix='/orm', tags=['orm'])

# 该方法会在fastapi启动时触发,内部通过传递进去的app对象,监听服务启动和终止事件
# 当检测到启动事件时,会初始化Tortoise对象,如果generate_schemas为True则还会进行数据库迁移
# 当检测到终止事件时,会关闭连接
register_tortoise(
    app,
    config=TORTOISE_ORM,
    # generate_schemas=True,  # 如果数据库为空,则自动生成对应表单,生产环境不要开
    # add_exception_handlers=True,  # 生产环境不要开,会泄露调试信息
)

if __name__ == '__main__':
    uvicorn.run('quick:app', host='127.0.0.1', port=8020, reload=True, workers=1)

# 增删改查

from typing import List
from fastapi import APIRouter
from pydantic import BaseModel
from models import Student, Course

api_student = APIRouter()

class StudentModel(BaseModel):
    name: str
    sno: int
    pwd: str
    phone: str
    classes_id: int
    courses: List[int]


@api_student.get("/students")
async def getAllStudent():
    students = await Student.all().values("name", "classes_id")
    for student in students:
        print(student)
    return students

@api_student.get("/student")
async  def getStudent():
    student = await Student.filter(name__icontains='三').values("name", "classes_id")
    # student = await Student.get(name__icontains='一') # 不存在会报错
    student = await Student.get_or_none(name__icontains='一') # 不存在返回None
    return student

@api_student.post("/student")
async def addStudent(stu: StudentModel):
    # 方式一
    # student = Student(
    #     name=stu.name,
    #     sno=stu.sno,
    #     pwd=stu.pwd,
    #     phone=stu.phone,
    #     classes_id=stu.classes_id
    # )
    # await student.save()

    # 方式二
    student = await Student.create(
        name=stu.name,
        sno=stu.sno,
        pwd=stu.pwd,
        phone=stu.phone,
        classes_id=stu.classes_id
    )

    # 添加多对多关系
    courses = await Course.filter(id__in=stu.courses).all()
    await student.courses.add(*courses)

    return student # 返回的是新增的对象

@api_student.put("/student/{id}")
async def updateStudent(id: int, stu: StudentModel):
    # 方式一
    # student = await Student.get(id=id)
    # student.name = stu.name
    # student.sno = stu.sno
    # student.pwd = stu.pwd
    # student.phone = stu.phone
    # student.classes_id = stu.classes_id
    # await student.save()

    # 方式二
    student = await Student.filter(id=id).update(
        name=stu.name,
        sno=stu.sno,
        pwd=stu.pwd,
        phone=stu.phone,
        classes_id=stu.classes_id
    )

    # 更新多对多关系
    courses = await Course.filter(id__in=stu.courses).all()
    await student.courses.clear()
    await student.courses.add(*courses)
    return student

@api_student.delete("/student/{id}")
async def deleteStudent(id: int):
    # 方式一
    # student = await Student.get(id=id)
    # await student.delete()
    # return student # 返回的是删除的对象

    # 方式二
    student = await Student.filter(id=id).delete()
    return student # 返回的是删除的行数

    # 这两种方式都可以删除多对多关系

# 过滤条件

  • 比较运算符
# 获取id等于1的数据
students = await Student.filter(id=1).all()

# 获取id不等于1的数据
students = await Student.filter(id__not=1).all()

# 获取id大于1的数据
students = await Student.filter(id__gt=1).all()

# 获取id大于等于1的数据
students = await Student.filter(id__gte=1).all()

# 获取id小于5的数据
students = await Student.filter(id__lt=1).all()

# 获取id小于等于5的数据
students = await Student.filter(id__lte=1).all()
  • 成员运算符
# 获取姓名 在 指定列表中的数据
names = ['赵德柱', '李铁柱']
students = await Student.filter(name__in=names).all()
for student in students:
    print(student.id, student.name)
 
# 获取姓名 不在 指定列表中的数据
names = ['赵德柱', '李铁柱']
students = await Student.filter(name__nin=names).all()
for student in students:
    print(student.id, student.name)
  • 模糊查询
# Tortoise ORM 不直接支持SQL中的LIKE模糊查询,
# 但可以使用`icontains`、`istartswith`、`iendswith`等操作符进行模糊查询。

# 学号包含200
student = await Student.filter(sno__icontains='200')

# 学号是200开头
student = await Student.filter(sno__istartswith='200')

# 学号是200结尾
student = await Student.filter(sno__iendswith='200')

# exclude() 方法用于排除满足条件的数据,返回不满足条件的数据集
# 获取名字不是'赵德柱'的所有数据
students = await Student.exclude(name='赵德柱').all()
  • 查询在指定范围之间
students = await Student.filter(sno__range=[2001, 2003]).all()
for student in students:
    print(student.id, student.name)
  • 是否为空
# 查询学生姓名为空的数据
students = await Student.filter(name__isnull=True).all()
  • 正则表达式匹配
# 查询名字匹配正则表达式的数据
pattern = r'^赵.*'  # 以 赵 开头的名字
students = await Student.filter(name__regex=pattern).all()

# 查询名字不区分大小写匹配正则表达式的用户
pattern = r'^a.*'  # 以 a(iregex 不区分大小写)开头的名字
students = await Student.filter(name__iregex=pattern).all()
  • 分页查询
# 按id升序获取所有数据
students = await Student.all().order_by("id")

# 按id降序获取所有数据
students = await Student.all().order_by("id")

# 获取前5个用户
first_five_sutdents = await Student.all().limit(5)

# 跳过前5个用户,再获取5个用户
next_five_students = await Student.all().offset(5).limit(5)

# 中间件的

  • "中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应之后工作
  • 要创建中间件你可以在函数的顶部使用装饰器 @app.middleware("http")
  • 中间件参数接收如下参数:
request
call_next # 它将接收request,作为参数
          # 这个函数将 request 传递给相应的 路径操作
          # 然后它将返回由相应的路径操作生成的 response
          # 然后你可以在返回 response 前进一步修改它
  • 函数中间件
import uvicorn
from fastapi import FastAPI

from fastapi import Request
import time

app = FastAPI()


@app.middleware("http")
async def m2(request: Request, call_next):
    # 请求代码块
    print("m2 request")
    response = await call_next(request)
    # 响应代码块
    response.headers["author"] = "yuan"
    print("m2 response")
    return response


@app.middleware("http")
async def m1(request: Request, call_next):
    # 请求代码块
    print("m1 request")
    # if request.client.host in ["127.0.0.1", ]:  # 黑名单
    #     return Response(content="visit forbidden")

    # if request.url.path in ["/user"]:
    #     return Response(content="visit forbidden")

    start = time.time()

    response = await call_next(request)
    # 响应代码块
    print("m1 response")
    end = time.time()
    response.headers["ProcessTimer"] = str(end - start)
    return response


@app.get("/user")
def get_user():
    time.sleep(3)
    print("get_user函数执行")
    return {
        "user": "current user"
    }

# 输出
# m1 request
# m2 request
# get_user函数执行
# m2 response
# m1 response

if __name__ == '__main__':
    uvicorn.run('main:app', host='127.0.0.1', port=8030, reload=True, workers=1)
  • 类中间件
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
import time
import uvicorn
import os


class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        response = await call_next(request)
        process_time = time.time() - start_time

        # 记录请求和响应的详细信息
        log_message = (
            f"请求方法: {request.method}\n"
            f"请求路径: {request.url.path}\n"
            f"响应状态: {response.status_code}\n"
            f"处理时间: {process_time:.4f} 秒"
        )
		
		# 在生产环境中,可以将这些信息写入日志文件
        print(log_message)  

        return response

app = FastAPI()

# 添加中间件到应用中
app.add_middleware(LoggingMiddleware)

# 使用 PlainTextResponse 返回简单文本响应
@app.get("/", response_class=PlainTextResponse)
async def read_root():
    # 可以直接返回字符串,FastAPI 会自动封装为 PlainTextResponse
    return "Hello, World!"
  • SessionMiddleware
# 使用 SessionMiddleware 来管理用户会话,这对于需要追踪用户状态或者保持登录状态的应用尤为重要
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from starlette.middleware.sessions import SessionMiddleware
import uvicorn
import os

app = FastAPI()

# 设置 SessionMiddleware,secret_key 应该是一个长随机字符串
# 使用 SessionMiddleware 来管理会话。secret_key 用于签名和/或加密会话 cookie,确保它的安全性
app.add_middleware(
    SessionMiddleware, secret_key="!se1cret2-ke3y-sh4ould-b5e-ve8ry-se8cure!"
)


@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    # 访问会话中的数据
    count = request.session.get("count", 0)
    count += 1
    request.session["count"] = count  # 更新会话数据
    return f"<html><body><h1>Visit Count: {count}</h1></body></html>"


@app.get("/reset")
async def reset_count(request: Request):
    request.session.pop("count", None)  # 重置会话数据
    return {"status": "session reset"}


@app.get("/logout")
async def logout(request: Request):
    request.session.clear()  # 清空所有会话数据
    return {"status": "logged out"}


if __name__ == "__main__":
    uvicorn.run(
        f"{os.path.basename(__file__).split('.')[0]}:app",
        host="127.0.0.1",
        port=8000,
        reload=True,
    )
  • TrustedHostMiddleware
# 使用 TrustedHostMiddleware 可以提高应用的安全性,限制哪些主机名可以访问应用
from fastapi import FastAPI
from starlette.middleware.trustedhost import TrustedHostMiddleware

app = FastAPI()

# 添加 TrustedHostMiddleware
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "www.example.com", "*.example.com"]
)

@app.get("/")
async def read_root():
    return {"message": "Hello, World!"}
  • CORSMiddleware
# 方式一
@app.middleware("http")
async def CORSMiddleware(request: Request, call_next):
    response = await call_next(request)
    print(response.headers)
    return response
# 方式二
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()
origins = [
    "http://localhost:63342"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,  # *:代表所有客户端
    allow_credentials=True,
    allow_methods=["GET"],
    allow_headers=["*"],
)

@app.get("/")
def main():
    return {"message": "Hello World"}

if __name__ == '__main__':
    import uvicorn
    uvicorn.run("main:app", host="127.0.0.1", port=8080, debug=True, reload=True)