# 基本介绍
# 特点
- 依赖:Python 3.6 及更高版本
- Starlette (opens new window) 是一个轻量级的ASGI框架/工具包,用于构建Python异步网络服务
- Pydantic (opens new window) 负责数据部分(类型提示)
# 快速使用
- 安装
pip install fastapi
- 安装 ASGI 服务器,生产环境可以使用 Uvicorn (opens new window)
pip install uvicorn
- 相关代码
# FastAPI 是一个为你的 API 提供了所有功能的 Python 类
from fastapi import FastAPI
# 这个实例将是创建你所有 API 的主要交互对象。这个 app 同样在如下命令中被 uvicorn 所引用
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello yuan"}
- 通过以下命令运行服务器
uvicorn main:app --port=8090 --reload
- 也可以直接运行
if __name__ == '__main__':
import uvicorn
# main:app" 对应 main.py 文件,app 是 FastAPI 应用实例名
uvicorn.run("main:app", host="127.0.0.1", port=8080, debug=True, reload=True)
- 自动生成的交互式 API 文档 http://127.0.0.1:8080/docs
# 路径操作
# 路径操作装饰器
- fastapi支持各种请求方式
@app.get()
@app.post()
@app.put()
@app.patch()
@app.delete()
@app.options()
@app.head()
@app.trace()
- 路径操作装饰器参数
@app.get("/get/{id}",
status_code=200,
tags=["get方法"],
deprecated=False,
summary="get方法",
description="get方法描述",
response_model=str, # 响应模型
response_description="get方法响应描述",
)
# include_router 的使用
# quick.py
from fastapi import FastAPI
from app1 import app01
from app2 import app02
app = FastAPI()
app.include_router(app01, prefix="/app01", tags=["第一章节:商城接口",])
app.include_router(app02, prefix="/app02", tags=["第二章节:用户中心接口",])
if __name__ == "__main__":
import uvicorn
uvicorn.run("quick:app", host="127.0.0.1", port=8030, reload=True)
# app1.__init__.py
from .app1 import app01
# app2.__init__.py
from .app2 import app02
from fastapi import APIRouter
app01 = APIRouter()
@app01.get("/shop/food")
def shop_food():
return {"shop": "food"}
@app01.get("/shop/bed")
def shop_food():
return {"shop": "bed"}
from fastapi import APIRouter
app02 = APIRouter()
@app02.post("/user/login")
def user_login():
return {"user": "login"}
@app02.post("/user/reg")
def user_reg():
return {"user": "reg"}
# 请求数据
# 路径参数
- 基本用法
@app01.get("/shop/food/{food_id}")
def shop_food(food_id):
return {"shop": food_id}
- 有类型的路径参数
@app01.get("/shop/food/{food_id}")
def shop_food(food_id:int):
return {"shop": f"{type(food_id)}"}
- 注意顺序
# 要确保路径 /user/me 声明在路径 /user/{username}之前
@app.get("/user/me")
async def read_user_me():
return {"username": "the current user"}
@app.get("/user/{username}")
async def read_user(username: str):
return {"username": username}
# 查询参数(请求参数)
路径函数中声明不属于路径参数的其他函数参数时,它们将被自动解释为"查询字符串"参数,就是 url? 之后用&分割的 key-value 键值对
@app01.get("/shop/food/{food_id}")
def shop_food(food_id: int, food_name: Union[str, None] = None):
if food_name and food_price:
return {"food_id": food_id, "food_name": food_name}
else:
return {"food_id": food_id}
# 请求体数据
FastAPI 基于 Pydantic,Pydantic 主要用来做类型强制检查(校验数据)。不符合类型要求就会抛出异常
# 安装 pydantic
pip install pydantic
from datetime import date
from typing import Optional, Union, List
from fastapi import FastAPI
from pydantic import BaseModel, Field, field_validator, ValidationError
class Address(BaseModel):
city: str
state: str
class User(BaseModel):
name: str = "sylone"
# 当一个模型属性具有默认值时,它不是必需的。否则它是一个必需属性
age: int = Field(default=0, ge=18, le=60)
# 表明 birthday 属性的类型可以是 date 对象,也能为 None
birthday: Optional[date] = None
# Mutable default '[]' is not allowed. Use 'default_factory'
friends: List[int] = Field(default_factory=[]) # []
# 将默认值设为 None 可使其成为可选属性
decription: Union[str, None] = None
address: Union[Address, None] = None
@field_validator("name")
def name_must_alpha(cls, v):
assert v.isalpha(), "name must be alpha"
return v
class Item(BaseModel):
users: List[User]
app = FastAPI()
# 测试
# {
# "users": [
# {
# "name": "rain",
# "age": 32,
# "birthday": "2022-09-29",
# "friends": [1],
# "description": "最帅的讲fastapi的老师",
# "address":{
# "city":"111",
# "state":"1111"
# }
# }
# ]
# }
# FastAPI 会自动将定义的模型类转化为JSON
@app.post("/data")
async def create_data(data: Item):
return data
if __name__ == "__main__":
try:
User(name="zhangsan", age=50, friends=[1, 2, 3], decription="123",
address=Address(city="1", state="2"))
except ValidationError as e:
print(e.json())
# form表单数据
FastAPI 可以使用Form组件来接收表单数据,需要先安装 python-multipart
python install python-multipart
@app.post("/reg")
# ... 是 Python 的 Ellipsis 对象
# 它表示一个占位符,用于表示一个参数或变量的值是必须的,但没有具体的值
# 在 FastAPI 里,Ellipsis 对象常被用于 Form、Query、Path 等函数,表明某个参数是必需的
def user_reg(username:str = Form(..., min_length=2, max_length=10, regex="^[a-zA-Z]"),
password:str = Form(..., min_length=6, max_length=10, regex="^[0-9_]")):
print(f"username:{username}, password:{password}")
return {"user": "reg"}
# 验证不通过的返回结果
# {
# "detail": [
# {
# "type": "string_pattern_mismatch",
# "loc": [
# "body",
# "username"
# ],
# "msg": "String should match pattern '^[a-zA-Z]'",
# "input": "1212",
# "ctx": {
# "pattern": "^[a-zA-Z]"
# }
# }
# ]
# }
# 文件上传
- 适合小文件上传
@app.post("/file")
async def file_upload(file: bytes = File()):
print("file", file) # <class 'bytes'>
return {"file_size": len(file)}
@app.post("/multiFile")
async def multi_file_upload(files: list[bytes] = File()):
return {"file_size": [len(file) for file in files]}
- 适合大文件上传
@app.post("/files")
async def files_upload(file: UploadFile = File()):
with open(file.filename, "wb") as f:
f.write(file.file.read())
return {"file_name": file.filename}
@app.post("/multiFiles")
async def multi_files_upload(files: list[UploadFile] = File()):
return {"file_size": [file.filename for file in files]}
# Reqeust对象
在函数中声明Request类型的参数,FastAPI 就会自动传递 Request 对象给这个参数
@app.get("/test")
async def test(request: Request):
return {
"query_params": request.query_params,
"headers": request.headers,
"agent": request.headers.get("user-agent"),
"cookies": request.cookies,
"method": request.method,
"url": request.url, # "http://127.0.0.1:8030/test"
"path": request.url.path, # "/test"
"IP": request.client.host, # "127.0.0.1"
}
# 请求静态文件
请求如 css/js 和图片文件等
from fastapi import FastAPI, Request, File, UploadFile
from starlette.staticfiles import StaticFiles
app = FastAPI()
app.mount("/imgs", StaticFiles(directory="imgs"))
# 响应数据
# response_model
FastAPI将使用response_model进行以下操作:
# 将输出数据转换为response_model中声明的数据类型。
# 验证数据结构和类型
# 将输出数据限制为该model定义的
# 添加到OpenAPI中
# 在自动文档系统中使用
class UserIn(BaseModel):
username: str
password: str
email: EmailStr
full_name: Union[str, None] = None
class UserOut(BaseModel):
username: str
email: EmailStr
full_name: Union[str, None] = None
@app.post("/user/", response_model=UserOut)
async def create_user(user: UserIn):
return user
# 数据过滤
- response_model_exclude_unset:仅返回显式设定的值
- response_model_exclude_defaults:不返回是默认值的字段
- response_model_exclude_none:不返回是None的字段
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
tags: list[str] = []
Items = {
"foo": {"name": "Foo", "price": 50.2},
"bar": {"name": "Bar", "description": "The Bar fighters", "price": 62, "tax": 20.2},
"baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}
@app.get("/items/{item_id}", response_model=Item, response_model_exclude_unset=True)
async def read_item(item_id: str):
return Items[item_id]
# {
# "name": "Foo",
# "price": 50.2
# }
@app.get("/items/{item_id}/name", response_model=Item,
response_model_include=["name", "description"])
async def read_item_name(item_id: str):
return Items[item_id]
# {
# "name": "Foo",
# "description": null
# }
@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude=["tax"])
async def read_item_public_data(item_id: str):
return Items[item_id]
# {
# "name": "Foo",
# "price": 50.2,
# "description": null,
# "tags": []
# }
# jinja2模板
jinja2是Flask作者开发的一个模板系统,起初是仿django模板的一个模板引擎,为Flask提供模板支持,由于其灵活,快速和安全等优点被广泛使用。
# jinja2 的变量
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
import uvicorn
app = FastAPI()
# 实例化Jinja2对象,并将文件夹路径设置为以templates命令的文件夹
templates = Jinja2Templates(directory="templates")
@app.get('/')
def hello(request: Request):
return templates.TemplateResponse('index.html',
{
# 注意,返回模板响应时,必须有request键值对,且值为Request请求对象
'request': request,
'user': 'yuan',
"books": ["水浒传", "西游记", "三国演义", "红楼梦"],
"booksDict": {
"水浒传": {"price": 100, "author": "施耐庵"},
"西游记": {"price": 200, "author": "吴承恩"},
}
}
)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>{{ user}}</h1>
<p>{{ books.0 }}</p>
<p>{{ books.1 }}</p>
<p>{{ books.2 }}</p>
<p>{{ books.3 }}</p>
<p>{{ booksDict.水浒传.price }}</p>
</body>
</html>
# jinja2 的过滤器
变量可以通过“过滤器”进⾏修改,过滤器是jinja2里面的内置函数和字符串处理函数。常用的过滤器有:
过滤器名称 | 说明 |
---|---|
capitialize | 把值的首字母转换成大写,其他字母转换为小写 |
lower | 把值转换成小写形式 |
title | 把值中每个单词的首字母都转换成大写 |
trim | 把值的首尾空格去掉 |
striptags | 渲染之前把值中所有的HTML标签都删掉 |
join | 拼接多个值为字符串 |
round | 默认对数字进⾏四舍五⼊,也可以用参数进⾏控制 |
safe | 渲染时值不转义 |
需要在变量后面使用管道 | 分割,多个过滤器可以链式调用
{{ 'abc'| captialize }} # Abc
{{ 'abc'| upper }} # ABC
{{ 'hello world'| title }} # Hello World
{{ "hello world"| replace('world','yuan') | upper }} # HELLO YUAN
{{ 18.18 | round | int }} # 18
# jinja2 的控制结构
- 条件语句不需要使用冒号结尾,而结束控制语句,需要使用endif关键字
{% if age > 18 %}
<p>成年区</p>
{% else %}
<p>未成年区</p>
{% endif %}
- for循环用于迭代Python的数据类型,包括列表,元组和字典。在jinja2中不存在while循环
{% for book in books %}
<p>{{ book }}</p>
{% endfor %}
# ORM操作
fastapi是一个很优秀的框架,但是缺少一个合适的orm,官方代码里面使用的是sqlalchemy
Tortoise ORM (opens new window) 是受 Django 启发的易于使用的异步 ORM
pip install tortoise-orm
# Tortoise ORM 支持的数据库 settings.py
- PostgreSQL >= 9.4(使用asyncpg)
from tortoise import Tortoise
# 配置 PostgreSQL 数据库
config = {
'db_url': 'postgres://user:password@localhost/dbname',
'modules': {
'tortoise.backends.postgres': {
'sslmode': 'disable' # 可选的,根据你的 PostgreSQL 配置调整
}
},
'generate_schemas': True # 自动创建表结构
}
# 初始化 Tortoise ORM
Tortoise.init(**config)
# 创建表
Tortoise.generate_schemas()
- SQLite(使用aiosqlite)
from tortoise import Tortoise
# 配置 SQLite 数据库
config = {
# 使用内存中的 SQLite 数据库,也可以指定文件路径
'db_url': 'sqlite://:memory:',
'modules': {
'tortoise.backends.sqlite': {
'_fk': True, # 开启外键支持
}
},
'generate_schemas': True # 自动创建表结构
}
# 初始化 Tortoise ORM
Tortoise.init(**config)
# 创建表
Tortoise.generate_schemas()
- MySQL/MariaDB(使用aiomysql或使用asyncmy)
from tortoise import Tortoise
# 配置 MySQL 数据库
TORTOISE_ORM = {
"connections": {
"default": {
# "engine": "tortoise.backends.asyncpg", # 数据库引擎 PostgresQL
"engine": "tortoise.backends.mysql", # 数据库引擎 Mysql or Mariadb
"credentials": {
"host": "127.0.0.1", # 地址
"port": "3306", # 端口
"user": "root", # 用户名
"password": "root", # 密码
"database": "fastapi", # 数据库名称(需要提前创建数据库)
"minsize": 1, # 最少连接
"maxsize": 5, # 最大连接
"charset": "utf8mb4", # 编码
"echo": True # 是否反馈SQL语句
}
}
},
"apps": {
"models": {
# Aerich 需要在 TORTOISE_ORM 配置里注册其模型,否则无法找到默认连接
"models": ["models", "aerich.models"],
"default_connection": "default"
}
},
"use_tz": False,
"timezone": "Asia/Shanghai"
}
# 以下的可以在项目启动中配置
# 初始化 Tortoise ORM
# Tortoise.init(**TORTOISE_ORM)
# 创建表
# Tortoise.generate_schemas()
- Oracle
from tortoise import Tortoise
config = {
'db_url': 'oracle://username:password@host:port/service_name',
'modules': {
'tortoise.backends.oracle': {
# 这里可以配置其他 Oracle 特定的设置,如使用钱包等
}
},
'generate_schemas': True # 如果需要自动创建表结构,设置为 True
}
Tortoise.init(**config)
# 创建模型文件 models.py
from tortoise import fields, Model
class Classes(Model):
name = fields.CharField(max_length=255, description="班级名称")
class Teacher(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255, description="老师名称")
tno = fields.IntField(description="帐号")
pwd = fields.CharField(max_length=255, description="密码")
class Course(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255, description="课程名称")
# 多对一
teacher = fields.ForeignKeyField("models.Teacher", related_name="courses")
class Student(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255, description="学生名称")
sno = fields.IntField(description="帐号")
pwd = fields.CharField(max_length=255, description="密码")
# 一对多
classes = fields.ForeignKeyField("models.Classes", related_name="students")
# 多对多
courses = fields.ManyToManyField("models.Course", related_name="students")
# aerich 迁移工具
aerich是一种ORM迁移工具,需要结合tortoise异步orm框架使用
pip install aerich
- 初始化配置,只需要使用一次
aerich init -t settings.TORTOISE_ORM # TORTOISE_ORM 配置的位置
# 初始化完会在当前目录生成一个文件:pyproject.toml和一个文件夹:migrations
# pyproject.toml:保存配置文件路径,低版本可能是aerich.ini
# migrations:存放迁移文件
- 初始化数据库,一般情况下只用一次
aerich init-db
# 此时数据库中就有相应的表格
# 如果TORTOISE_ORM配置文件中的models改了名,则执行这条命令时需要增加--app参数,来指定你修改的名字
- 更新模型并进行迁移
# 修改model类,重新生成迁移文件,比如添加一个字段
aerich migrate [--name] (标记修改操作) # aerich migrate
# 注意,此时sql并没有执行,数据库中表中没有更新
- 重新执行迁移,写入数据库
aerich upgrade
- 回到上一个版本
aerich downgrade
- 查看历史迁移记录
aerich history
# 项目启动
import uvicorn
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
import orm
from settings import TORTOISE_ORM
app = FastAPI()
app.include_router(orm.api_student, prefix='/orm', tags=['orm'])
# 该方法会在fastapi启动时触发,内部通过传递进去的app对象,监听服务启动和终止事件
# 当检测到启动事件时,会初始化Tortoise对象,如果generate_schemas为True则还会进行数据库迁移
# 当检测到终止事件时,会关闭连接
register_tortoise(
app,
config=TORTOISE_ORM,
# generate_schemas=True, # 如果数据库为空,则自动生成对应表单,生产环境不要开
# add_exception_handlers=True, # 生产环境不要开,会泄露调试信息
)
if __name__ == '__main__':
uvicorn.run('quick:app', host='127.0.0.1', port=8020, reload=True, workers=1)
# 增删改查
from typing import List
from fastapi import APIRouter
from pydantic import BaseModel
from models import Student, Course
api_student = APIRouter()
class StudentModel(BaseModel):
name: str
sno: int
pwd: str
phone: str
classes_id: int
courses: List[int]
@api_student.get("/students")
async def getAllStudent():
students = await Student.all().values("name", "classes_id")
for student in students:
print(student)
return students
@api_student.get("/student")
async def getStudent():
student = await Student.filter(name__icontains='三').values("name", "classes_id")
# student = await Student.get(name__icontains='一') # 不存在会报错
student = await Student.get_or_none(name__icontains='一') # 不存在返回None
return student
@api_student.post("/student")
async def addStudent(stu: StudentModel):
# 方式一
# student = Student(
# name=stu.name,
# sno=stu.sno,
# pwd=stu.pwd,
# phone=stu.phone,
# classes_id=stu.classes_id
# )
# await student.save()
# 方式二
student = await Student.create(
name=stu.name,
sno=stu.sno,
pwd=stu.pwd,
phone=stu.phone,
classes_id=stu.classes_id
)
# 添加多对多关系
courses = await Course.filter(id__in=stu.courses).all()
await student.courses.add(*courses)
return student # 返回的是新增的对象
@api_student.put("/student/{id}")
async def updateStudent(id: int, stu: StudentModel):
# 方式一
# student = await Student.get(id=id)
# student.name = stu.name
# student.sno = stu.sno
# student.pwd = stu.pwd
# student.phone = stu.phone
# student.classes_id = stu.classes_id
# await student.save()
# 方式二
student = await Student.filter(id=id).update(
name=stu.name,
sno=stu.sno,
pwd=stu.pwd,
phone=stu.phone,
classes_id=stu.classes_id
)
# 更新多对多关系
courses = await Course.filter(id__in=stu.courses).all()
await student.courses.clear()
await student.courses.add(*courses)
return student
@api_student.delete("/student/{id}")
async def deleteStudent(id: int):
# 方式一
# student = await Student.get(id=id)
# await student.delete()
# return student # 返回的是删除的对象
# 方式二
student = await Student.filter(id=id).delete()
return student # 返回的是删除的行数
# 这两种方式都可以删除多对多关系
# 过滤条件
- 比较运算符
# 获取id等于1的数据
students = await Student.filter(id=1).all()
# 获取id不等于1的数据
students = await Student.filter(id__not=1).all()
# 获取id大于1的数据
students = await Student.filter(id__gt=1).all()
# 获取id大于等于1的数据
students = await Student.filter(id__gte=1).all()
# 获取id小于5的数据
students = await Student.filter(id__lt=1).all()
# 获取id小于等于5的数据
students = await Student.filter(id__lte=1).all()
- 成员运算符
# 获取姓名 在 指定列表中的数据
names = ['赵德柱', '李铁柱']
students = await Student.filter(name__in=names).all()
for student in students:
print(student.id, student.name)
# 获取姓名 不在 指定列表中的数据
names = ['赵德柱', '李铁柱']
students = await Student.filter(name__nin=names).all()
for student in students:
print(student.id, student.name)
- 模糊查询
# Tortoise ORM 不直接支持SQL中的LIKE模糊查询,
# 但可以使用`icontains`、`istartswith`、`iendswith`等操作符进行模糊查询。
# 学号包含200
student = await Student.filter(sno__icontains='200')
# 学号是200开头
student = await Student.filter(sno__istartswith='200')
# 学号是200结尾
student = await Student.filter(sno__iendswith='200')
# exclude() 方法用于排除满足条件的数据,返回不满足条件的数据集
# 获取名字不是'赵德柱'的所有数据
students = await Student.exclude(name='赵德柱').all()
- 查询在指定范围之间
students = await Student.filter(sno__range=[2001, 2003]).all()
for student in students:
print(student.id, student.name)
- 是否为空
# 查询学生姓名为空的数据
students = await Student.filter(name__isnull=True).all()
- 正则表达式匹配
# 查询名字匹配正则表达式的数据
pattern = r'^赵.*' # 以 赵 开头的名字
students = await Student.filter(name__regex=pattern).all()
# 查询名字不区分大小写匹配正则表达式的用户
pattern = r'^a.*' # 以 a(iregex 不区分大小写)开头的名字
students = await Student.filter(name__iregex=pattern).all()
- 分页查询
# 按id升序获取所有数据
students = await Student.all().order_by("id")
# 按id降序获取所有数据
students = await Student.all().order_by("id")
# 获取前5个用户
first_five_sutdents = await Student.all().limit(5)
# 跳过前5个用户,再获取5个用户
next_five_students = await Student.all().offset(5).limit(5)
# 中间件的
- "中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应之后工作
- 要创建中间件你可以在函数的顶部使用装饰器 @app.middleware("http")
- 中间件参数接收如下参数:
request
call_next # 它将接收request,作为参数
# 这个函数将 request 传递给相应的 路径操作
# 然后它将返回由相应的路径操作生成的 response
# 然后你可以在返回 response 前进一步修改它
- 函数中间件
import uvicorn
from fastapi import FastAPI
from fastapi import Request
import time
app = FastAPI()
@app.middleware("http")
async def m2(request: Request, call_next):
# 请求代码块
print("m2 request")
response = await call_next(request)
# 响应代码块
response.headers["author"] = "yuan"
print("m2 response")
return response
@app.middleware("http")
async def m1(request: Request, call_next):
# 请求代码块
print("m1 request")
# if request.client.host in ["127.0.0.1", ]: # 黑名单
# return Response(content="visit forbidden")
# if request.url.path in ["/user"]:
# return Response(content="visit forbidden")
start = time.time()
response = await call_next(request)
# 响应代码块
print("m1 response")
end = time.time()
response.headers["ProcessTimer"] = str(end - start)
return response
@app.get("/user")
def get_user():
time.sleep(3)
print("get_user函数执行")
return {
"user": "current user"
}
# 输出
# m1 request
# m2 request
# get_user函数执行
# m2 response
# m1 response
if __name__ == '__main__':
uvicorn.run('main:app', host='127.0.0.1', port=8030, reload=True, workers=1)
- 类中间件
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
import time
import uvicorn
import os
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# 记录请求和响应的详细信息
log_message = (
f"请求方法: {request.method}\n"
f"请求路径: {request.url.path}\n"
f"响应状态: {response.status_code}\n"
f"处理时间: {process_time:.4f} 秒"
)
# 在生产环境中,可以将这些信息写入日志文件
print(log_message)
return response
app = FastAPI()
# 添加中间件到应用中
app.add_middleware(LoggingMiddleware)
# 使用 PlainTextResponse 返回简单文本响应
@app.get("/", response_class=PlainTextResponse)
async def read_root():
# 可以直接返回字符串,FastAPI 会自动封装为 PlainTextResponse
return "Hello, World!"
- SessionMiddleware
# 使用 SessionMiddleware 来管理用户会话,这对于需要追踪用户状态或者保持登录状态的应用尤为重要
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from starlette.middleware.sessions import SessionMiddleware
import uvicorn
import os
app = FastAPI()
# 设置 SessionMiddleware,secret_key 应该是一个长随机字符串
# 使用 SessionMiddleware 来管理会话。secret_key 用于签名和/或加密会话 cookie,确保它的安全性
app.add_middleware(
SessionMiddleware, secret_key="!se1cret2-ke3y-sh4ould-b5e-ve8ry-se8cure!"
)
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
# 访问会话中的数据
count = request.session.get("count", 0)
count += 1
request.session["count"] = count # 更新会话数据
return f"<html><body><h1>Visit Count: {count}</h1></body></html>"
@app.get("/reset")
async def reset_count(request: Request):
request.session.pop("count", None) # 重置会话数据
return {"status": "session reset"}
@app.get("/logout")
async def logout(request: Request):
request.session.clear() # 清空所有会话数据
return {"status": "logged out"}
if __name__ == "__main__":
uvicorn.run(
f"{os.path.basename(__file__).split('.')[0]}:app",
host="127.0.0.1",
port=8000,
reload=True,
)
- TrustedHostMiddleware
# 使用 TrustedHostMiddleware 可以提高应用的安全性,限制哪些主机名可以访问应用
from fastapi import FastAPI
from starlette.middleware.trustedhost import TrustedHostMiddleware
app = FastAPI()
# 添加 TrustedHostMiddleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "www.example.com", "*.example.com"]
)
@app.get("/")
async def read_root():
return {"message": "Hello, World!"}
- CORSMiddleware
# 方式一
@app.middleware("http")
async def CORSMiddleware(request: Request, call_next):
response = await call_next(request)
print(response.headers)
return response
# 方式二
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
"http://localhost:63342"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins, # *:代表所有客户端
allow_credentials=True,
allow_methods=["GET"],
allow_headers=["*"],
)
@app.get("/")
def main():
return {"message": "Hello World"}
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app", host="127.0.0.1", port=8080, debug=True, reload=True)