Commit e85f3e8a authored by 林洋洋's avatar 林洋洋

添加知识库相关类

parent 2a74dc05
......@@ -194,7 +194,6 @@ class UserDal(DalBase):
await self.flush(user)
return "修改密码成功"
async def update_current_info(self, user: models.VadminUser, data: schemas.UserUpdateBaseInfo) -> Any:
"""
更新当前用户基本信息
......@@ -782,312 +781,320 @@ class DeptDal(DalBase):
return result
class TestDal(DalBase):
class KnowledgeDal(DalBase):
def __init__(self, db: AsyncSession):
super(TestDal, self).__init__()
super(KnowledgeDal, self).__init__()
self.db = db
self.model = models.VadminUser
async def test_session_cache(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
:return:
"""
print("==================================会话缓存====================================")
await self.test_session_cache1()
print("=============查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存=============")
await self.test_session_cache2()
print("=============================数据列表会话缓存==================================")
await self.test_session_cache3()
print("=============================expire 单个对象过期==============================")
await self.test_session_cache4()
print("=========expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系==========")
await self.test_session_cache5()
async def test_session_cache1(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
示例:会话缓存
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
async def test_session_cache2(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
示例:查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存
:return:
"""
# 第一次查询,并加载用户的所有关联部门项,但是不访问用户的属性
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"没有访问属性,也会产生缓存")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
async def test_session_cache3(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
示例:数据列表会话缓存
:return:
"""
# 第一次查询出所有用户,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
datas1 = queryset1.unique().all()
for data in datas1:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser)
queryset2 = await self.db.scalars(sql2)
datas2 = queryset2.all()
for data in datas2:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
async def test_session_cache4(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
示例:expire 单个对象过期
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
# 使当前会话(Session)中的 user1 对象过期,再次访问就会重新查询数据库数据
self.db.expire(user1)
# 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
try:
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
except StatementError:
print("访问部门报错了!!!!!")
async def test_session_cache5(self):
"""
SQLAlchemy 会话(Session)缓存机制:
当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
示例:expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset = await self.db.scalars(sql)
user = queryset.unique().first()
print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
# 使当前会话(Session)中的 user9 对象过期,再次访问就会重新查询数据库数据
self.db.expire(user)
# 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询,但是不会重新加载关系
try:
print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
except StatementError:
print("访问部门报错了!!!!!")
async def test_join_form(self):
"""
join_form 使用示例:通过关联表的查询条件反查询出主表的数据
官方描述:在当前 Select 的左侧不符合我们想要从中进行连接的情况下,可以使用 Select.join_from() 方法
官方文档:https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#setting-the-leftmost-from-clause-in-a-join
查询条件:获取指定用户所关联的所有部门列表数据,只返回关联的部门列表数据
:return:
"""
# 设定用户编号为:1
user_id = 1
sql = select(models.VadminDept).where(models.VadminDept.is_delete == false())
sql = sql.join_from(models.VadminUser, models.VadminUser.depts).where(models.VadminUser.id == user_id)
queryset = await self.db.scalars(sql)
result = queryset.unique().all()
for dept in result:
print(f"部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
# 转换后的 SQL:
# SELECT
# vadmin_auth_dept.NAME,
# vadmin_auth_dept.dept_key,
# vadmin_auth_dept.disabled,
# vadmin_auth_dept.order,
# vadmin_auth_dept.desc,
# vadmin_auth_dept.OWNER,
# vadmin_auth_dept.phone,
# vadmin_auth_dept.email,
# vadmin_auth_dept.parent_id,
# vadmin_auth_dept.id,
# vadmin_auth_dept.create_datetime,
# vadmin_auth_dept.update_datetime,
# vadmin_auth_dept.delete_datetime,
# vadmin_auth_dept.is_delete
# FROM
# vadmin_auth_user
# JOIN vadmin_auth_user_depts ON vadmin_auth_user.id = vadmin_auth_user_depts.user_id
# JOIN vadmin_auth_dept ON vadmin_auth_dept.id = vadmin_auth_user_depts.dept_id
# WHERE
# vadmin_auth_dept.is_delete = FALSE
# AND vadmin_auth_user.id = 1
async def test_left_join(self):
"""
多对多左连接查询示例:
查询出所有用户信息,并加载用户关联所有部门,左连接条件:只需要查询出该用户关联的部门负责人为"张伟"的部门即可,其他部门不需要显示,
:return:
"""
# 封装查询语句
dept_alias = aliased(models.VadminDept)
v_options = [contains_eager(self.model.depts, alias=dept_alias)]
v_outer_join = [
[models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id],
[dept_alias, and_(dept_alias.id == models.vadmin_auth_user_depts.c.dept_id, dept_alias.owner == "张伟")]
]
datas: list[models.VadminUser] = await self.get_datas(
limit=0,
v_outer_join=v_outer_join,
v_options=v_options,
v_return_objs=True,
v_expire_all=True
)
for data in datas:
print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
for dept in data.depts:
print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
# 原查询语句:
# DeptAlias = aliased(models.VadminDept)
# sql = select(self.model).where(self.model.is_delete == false())
# sql = sql.options(contains_eager(self.model.depts, alias=DeptAlias))
# sql = sql.outerjoin(models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id)
# sql = sql.outerjoin(
# DeptAlias,
# and_(DeptAlias.id == models.vadmin_auth_user_depts.c.dept_id, DeptAlias.owner == "张伟")
# )
# self.db.expire_all()
# queryset = await self.db.scalars(sql)
# result = queryset.unique().all()
# for data in result:
# print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
# for dept in data.depts:
# print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
async def get_user_depts(self):
"""
获取用户部门列表
:return:
"""
sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
datas1 = queryset1.unique().all()
for data in datas1:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
async def relationship_where_operations_any(self):
"""
关系运算符操作:any 方法使用示例
官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
any 方法用于一对多关系中,允许在 any 方法中指定一个条件,该条件会生成一个 SQL 表达式,只有满足该条件的元素才会被查询出来。
:return:
"""
print("==============================any 方法使用案例1=========================================")
# 用户表(models.VadminUser)与 部门表(VadminDept)为多对多关系
# 查找出只有满足关联了部门名称为 "人事一部" 的所有用户,没有关联的则不会查询出来
sql1 = select(models.VadminUser).where(models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
queryset1 = await self.db.scalars(sql1)
result1 = queryset1.unique().all()
for data in result1:
print(f"用户编号:{data.id} 用户名称:{data.name}")
print("==============================any 方法使用案例2=========================================")
# 案例1 取反,查找出只有满足没有关联了部门名称为 "人事一部" 的所有用户,关联的则不会查询出来
sql2 = select(models.VadminUser).where(~models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
queryset2 = await self.db.scalars(sql2)
result2 = queryset2.unique().all()
for data in result2:
print(f"用户编号:{data.id} 用户名称:{data.name}")
print("==============================any 方法使用案例3=========================================")
# 查询出没有关联部门的所有用户
sql3 = select(models.VadminUser).where(~models.VadminUser.depts.any())
queryset3 = await self.db.scalars(sql3)
result3 = queryset3.unique().all()
for data in result3:
print(f"用户编号:{data.id} 用户名称:{data.name}")
async def relationship_where_operations_has(self):
"""
关系运算符操作: has 方法使用示例
官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
has 方法用于多对一关系中,与 any 方法使用方式同理,只有满足条件的元素才会被查询出来。
对多关系中使用 has 方法会报错,报错内容如下:
sqlalchemy.exc.InvalidRequestError: 'has()' not implemented for collections. Use any().
:return:
"""
print("==============================has 方法使用案例1=========================================")
# 用户(models.VadminUser)与 帮助问题(models.VadminIssue)为多对一关系
# 查找出只有满足关联了用户名称为 "kinit" 的所有帮助问题,没有关联的则不会查询出来
sql1 = select(vadmin_help_models.VadminIssue).where(
vadmin_help_models.VadminIssue.create_user.has(models.VadminUser.name == "kinit")
)
queryset1 = await self.db.scalars(sql1)
result1 = queryset1.unique().all()
for data in result1:
print(f"问题编号:{data.id} 问题标题:{data.title}")
self.model = models.KnowledgeDept
self.schema = schemas.KnowledgeOut
# class TestDal(DalBase):
#
# def __init__(self, db: AsyncSession):
# super(TestDal, self).__init__()
# self.db = db
# self.model = models.VadminUser
#
# async def test_session_cache(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
# :return:
# """
# print("==================================会话缓存====================================")
# await self.test_session_cache1()
# print("=============查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存=============")
# await self.test_session_cache2()
# print("=============================数据列表会话缓存==================================")
# await self.test_session_cache3()
# print("=============================expire 单个对象过期==============================")
# await self.test_session_cache4()
# print("=========expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系==========")
# await self.test_session_cache5()
#
# async def test_session_cache1(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
#
# 示例:会话缓存
#
# :return:
# """
# # 第一次查询,并加载用户的所有关联部门项
# sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
# queryset1 = await self.db.scalars(sql1)
# user1 = queryset1.unique().first()
# print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
#
# # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
# sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
# queryset2 = await self.db.scalars(sql2)
# user2 = queryset2.first()
# print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
#
# async def test_session_cache2(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
#
# 示例:查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存
#
# :return:
# """
# # 第一次查询,并加载用户的所有关联部门项,但是不访问用户的属性
# sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
# queryset1 = await self.db.scalars(sql1)
# user1 = queryset1.unique().first()
# print(f"没有访问属性,也会产生缓存")
#
# # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
# sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
# queryset2 = await self.db.scalars(sql2)
# user2 = queryset2.first()
# print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
#
# async def test_session_cache3(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
#
# 示例:数据列表会话缓存
#
# :return:
# """
# # 第一次查询出所有用户,并加载用户的所有关联部门项
# sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
# queryset1 = await self.db.scalars(sql1)
# datas1 = queryset1.unique().all()
# for data in datas1:
# print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
#
# # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
# sql2 = select(models.VadminUser)
# queryset2 = await self.db.scalars(sql2)
# datas2 = queryset2.all()
# for data in datas2:
# print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
#
# async def test_session_cache4(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
#
# 示例:expire 单个对象过期
#
# :return:
# """
# # 第一次查询,并加载用户的所有关联部门项
# sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
# queryset1 = await self.db.scalars(sql1)
# user1 = queryset1.unique().first()
# print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
#
# # 使当前会话(Session)中的 user1 对象过期,再次访问就会重新查询数据库数据
# self.db.expire(user1)
#
# # 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询
# sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
# queryset2 = await self.db.scalars(sql2)
# user2 = queryset2.first()
# try:
# print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
# except StatementError:
# print("访问部门报错了!!!!!")
#
# async def test_session_cache5(self):
# """
# SQLAlchemy 会话(Session)缓存机制:
# 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。
# 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。
#
# 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059
#
# 示例:expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系
#
# :return:
# """
# # 第一次查询,并加载用户的所有关联部门项
# sql = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
# queryset = await self.db.scalars(sql)
# user = queryset.unique().first()
# print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
#
# # 使当前会话(Session)中的 user9 对象过期,再次访问就会重新查询数据库数据
# self.db.expire(user)
#
# # 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询,但是不会重新加载关系
# try:
# print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
# except StatementError:
# print("访问部门报错了!!!!!")
#
# async def test_join_form(self):
# """
# join_form 使用示例:通过关联表的查询条件反查询出主表的数据
#
# 官方描述:在当前 Select 的左侧不符合我们想要从中进行连接的情况下,可以使用 Select.join_from() 方法
# 官方文档:https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#setting-the-leftmost-from-clause-in-a-join
#
# 查询条件:获取指定用户所关联的所有部门列表数据,只返回关联的部门列表数据
# :return:
# """
# # 设定用户编号为:1
# user_id = 1
#
# sql = select(models.VadminDept).where(models.VadminDept.is_delete == false())
# sql = sql.join_from(models.VadminUser, models.VadminUser.depts).where(models.VadminUser.id == user_id)
# queryset = await self.db.scalars(sql)
# result = queryset.unique().all()
# for dept in result:
# print(f"部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
#
# # 转换后的 SQL:
# # SELECT
# # vadmin_auth_dept.NAME,
# # vadmin_auth_dept.dept_key,
# # vadmin_auth_dept.disabled,
# # vadmin_auth_dept.order,
# # vadmin_auth_dept.desc,
# # vadmin_auth_dept.OWNER,
# # vadmin_auth_dept.phone,
# # vadmin_auth_dept.email,
# # vadmin_auth_dept.parent_id,
# # vadmin_auth_dept.id,
# # vadmin_auth_dept.create_datetime,
# # vadmin_auth_dept.update_datetime,
# # vadmin_auth_dept.delete_datetime,
# # vadmin_auth_dept.is_delete
# # FROM
# # vadmin_auth_user
# # JOIN vadmin_auth_user_depts ON vadmin_auth_user.id = vadmin_auth_user_depts.user_id
# # JOIN vadmin_auth_dept ON vadmin_auth_dept.id = vadmin_auth_user_depts.dept_id
# # WHERE
# # vadmin_auth_dept.is_delete = FALSE
# # AND vadmin_auth_user.id = 1
#
# async def test_left_join(self):
# """
# 多对多左连接查询示例:
# 查询出所有用户信息,并加载用户关联所有部门,左连接条件:只需要查询出该用户关联的部门负责人为"张伟"的部门即可,其他部门不需要显示,
# :return:
# """
# # 封装查询语句
# dept_alias = aliased(models.VadminDept)
# v_options = [contains_eager(self.model.depts, alias=dept_alias)]
# v_outer_join = [
# [models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id],
# [dept_alias, and_(dept_alias.id == models.vadmin_auth_user_depts.c.dept_id, dept_alias.owner == "张伟")]
# ]
# datas: list[models.VadminUser] = await self.get_datas(
# limit=0,
# v_outer_join=v_outer_join,
# v_options=v_options,
# v_return_objs=True,
# v_expire_all=True
# )
# for data in datas:
# print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
# for dept in data.depts:
# print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
#
# # 原查询语句:
# # DeptAlias = aliased(models.VadminDept)
# # sql = select(self.model).where(self.model.is_delete == false())
# # sql = sql.options(contains_eager(self.model.depts, alias=DeptAlias))
# # sql = sql.outerjoin(models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id)
# # sql = sql.outerjoin(
# # DeptAlias,
# # and_(DeptAlias.id == models.vadmin_auth_user_depts.c.dept_id, DeptAlias.owner == "张伟")
# # )
# # self.db.expire_all()
# # queryset = await self.db.scalars(sql)
# # result = queryset.unique().all()
# # for data in result:
# # print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
# # for dept in data.depts:
# # print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
#
# async def get_user_depts(self):
# """
# 获取用户部门列表
# :return:
# """
# sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
# queryset1 = await self.db.scalars(sql1)
# datas1 = queryset1.unique().all()
# for data in datas1:
# print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
#
# async def relationship_where_operations_any(self):
# """
# 关系运算符操作:any 方法使用示例
# 官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
#
# any 方法用于一对多关系中,允许在 any 方法中指定一个条件,该条件会生成一个 SQL 表达式,只有满足该条件的元素才会被查询出来。
# :return:
# """
# print("==============================any 方法使用案例1=========================================")
# # 用户表(models.VadminUser)与 部门表(VadminDept)为多对多关系
# # 查找出只有满足关联了部门名称为 "人事一部" 的所有用户,没有关联的则不会查询出来
# sql1 = select(models.VadminUser).where(models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
# queryset1 = await self.db.scalars(sql1)
# result1 = queryset1.unique().all()
# for data in result1:
# print(f"用户编号:{data.id} 用户名称:{data.name}")
#
# print("==============================any 方法使用案例2=========================================")
# # 案例1 取反,查找出只有满足没有关联了部门名称为 "人事一部" 的所有用户,关联的则不会查询出来
# sql2 = select(models.VadminUser).where(~models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
# queryset2 = await self.db.scalars(sql2)
# result2 = queryset2.unique().all()
# for data in result2:
# print(f"用户编号:{data.id} 用户名称:{data.name}")
#
# print("==============================any 方法使用案例3=========================================")
# # 查询出没有关联部门的所有用户
# sql3 = select(models.VadminUser).where(~models.VadminUser.depts.any())
# queryset3 = await self.db.scalars(sql3)
# result3 = queryset3.unique().all()
# for data in result3:
# print(f"用户编号:{data.id} 用户名称:{data.name}")
#
# async def relationship_where_operations_has(self):
# """
# 关系运算符操作: has 方法使用示例
# 官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
#
# has 方法用于多对一关系中,与 any 方法使用方式同理,只有满足条件的元素才会被查询出来。
#
# 对多关系中使用 has 方法会报错,报错内容如下:
# sqlalchemy.exc.InvalidRequestError: 'has()' not implemented for collections. Use any().
# :return:
# """
# print("==============================has 方法使用案例1=========================================")
# # 用户(models.VadminUser)与 帮助问题(models.VadminIssue)为多对一关系
# # 查找出只有满足关联了用户名称为 "kinit" 的所有帮助问题,没有关联的则不会查询出来
# sql1 = select(vadmin_help_models.VadminIssue).where(
# vadmin_help_models.VadminIssue.create_user.has(models.VadminUser.name == "kinit")
# )
# queryset1 = await self.db.scalars(sql1)
# result1 = queryset1.unique().all()
# for data in result1:
# print(f"问题编号:{data.id} 问题标题:{data.title}")
......@@ -12,3 +12,4 @@ from .menu import VadminMenu
from .role import VadminRole
from .user import VadminUser
from .dept import VadminDept
from .knowledge import KnowledgeDept
......@@ -16,7 +16,7 @@ class VadminDept(BaseModel):
__table_args__ = ({'comment': '部门表'})
name: Mapped[str] = mapped_column(String(50), index=True, nullable=False, comment="部门名称")
dept_key: Mapped[str] = mapped_column(String(50), index=True, nullable=False, comment="部门标识")
dept_key: Mapped[str | None] = mapped_column(String(50), comment="部门标识")
disabled: Mapped[bool] = mapped_column(Boolean, default=False, comment="是否禁用")
order: Mapped[int | None] = mapped_column(Integer, comment="显示排序")
desc: Mapped[str | None] = mapped_column(String(255), comment="描述")
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/10/23 13:41
# @File : dept.py
# @IDE : PyCharm
# @desc : 部门模型
from sqlalchemy.orm import Mapped, mapped_column
from dbgpt.app.apps.db.db_base import BaseModel
from sqlalchemy import String, Integer, ForeignKey
class KnowledgeDept(BaseModel):
__tablename__ = "knowledge_space"
__table_args__ = ({'comment': '知识库表'})
name: Mapped[str] = mapped_column(String(100), index=True, nullable=False, comment="名称")
# vector_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="vector_type")
# domain_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="domain_type")
# desc: Mapped[int | str] = mapped_column(String(500), nullable=False,comment="desc")
......@@ -39,3 +39,10 @@ vadmin_auth_role_depts = Table(
Column("dept_id", Integer, ForeignKey("vadmin_auth_dept.id", ondelete="CASCADE")),
)
vadmin_auth_role_knowledge = Table(
"vadmin_auth_role_knowledge",
Base.metadata,
Column("role_id", Integer, ForeignKey("vadmin_auth_role.id", ondelete="CASCADE")),
Column("knowledge_id", Integer, ForeignKey("knowledge_space.id", ondelete="CASCADE")),
)
from fastapi import Depends, Query
from dbgpt.app.apps.core.dependencies import Paging, QueryParams
class KnowledgeParams(QueryParams):
"""
列表分页
"""
def __init__(
self,
name: str | None = Query(None, title="部门名称"),
params: Paging = Depends()
):
super().__init__(params)
self.name = ("like", name)
......@@ -2,3 +2,4 @@ from .user import UserOut, UserUpdate, User, UserIn, UserSimpleOut, ResetPwd, Us
from .role import Role, RoleOut, RoleIn, RoleOptionsOut, RoleSimpleOut
from .menu import Menu, MenuSimpleOut, RouterOut, Meta, MenuTreeListOut
from .dept import Dept, DeptSimpleOut, DeptTreeListOut
from .knowledge import Knowledge, KnowledgeOut
\ No newline at end of file
......@@ -13,7 +13,7 @@ from dbgpt.app.apps.core.data_types import DatetimeStr
class Dept(BaseModel):
name: str
dept_key: str
dept_key: str | None = None
disabled: bool = False
order: int | None = None
desc: str | None = None
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/10/25 12:19
# @File : dept.py
# @IDE : PyCharm
# @desc : pydantic 模型,用于数据库序列化操作
from pydantic import BaseModel, ConfigDict
from dbgpt.app.apps.core.data_types import DatetimeStr
class Knowledge(BaseModel):
name: str
class KnowledgeOut(Knowledge):
model_config = ConfigDict(from_attributes=True)
id: int
create_datetime: DatetimeStr
......@@ -106,7 +106,6 @@ async def user_current_reset_password(data: schemas.ResetPwd, auth: Auth = Depen
@router.post("/user/current/update/password", summary="修改用户密码")
async def user_current_update_password(data: schemas.UpdatePwd, auth: Auth = Depends(AllUserAuth())):
return SuccessResponse(await crud.UserDal(auth.db).update_password_id(data))
......@@ -327,3 +326,12 @@ async def put_dept(
auth: Auth = Depends(FullAdminAuth())
):
return SuccessResponse(await crud.DeptDal(auth.db).put_data(data_id, data))
@router.get("/knowledges", summary="获取全部知识库")
async def get_knowledges(auth: Auth = Depends(OpenAuth())):
schema = schemas.KnowledgeOut
knowledge_list = await crud.KnowledgeDal(auth.db).get_datas(v_schema=schema)
role_knowledge_list = await crud.KnowledgeDal(auth.db).get_datas(v_schema=schema)
return SuccessResponse({"role_knowledge_list": role_knowledge_list, "knowledge_list": knowledge_list})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment