Commit 4509dcad authored by 林洋洋's avatar 林洋洋

添加异步向量化

parent 1d749be5
package com.ask.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 异步线程池配置
*
* @author ai
* @date 2024/12/20
*/
@Slf4j
@Configuration
@EnableAsync
public class AsyncConfig {
/**
* 向量化专用线程池
*
* @return 向量化任务执行器
*/
@Bean("vectorizationExecutor")
public Executor vectorizationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(2);
// 最大线程数
executor.setMaxPoolSize(3);
// 队列大小
executor.setQueueCapacity(100);
// 线程名称前缀
executor.setThreadNamePrefix("vectorization-");
// 拒绝策略:调用者运行策略,当线程池满了之后,由调用线程来执行任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程空闲时间(秒)
executor.setKeepAliveSeconds(60);
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间(秒)
executor.setAwaitTerminationSeconds(60);
executor.initialize();
log.info("向量化线程池初始化完成:核心线程={}, 最大线程={}, 队列容量={}",
executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity());
return executor;
}
// /**
// * 通用异步线程池
// *
// * @return 通用任务执行器
// */
// @Bean("taskExecutor")
// public Executor taskExecutor() {
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//
// // 异步线程配置
// executor.setCorePoolSize(5);
// executor.setMaxPoolSize(5);
// executor.setQueueCapacity(99999);
// executor.setThreadNamePrefix("async-service-");
//
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// executor.setKeepAliveSeconds(60);
// executor.setWaitForTasksToCompleteOnShutdown(true);
// executor.setAwaitTerminationSeconds(60);
//
// executor.initialize();
//
// log.info("通用线程池初始化完成:核心线程={}, 最大线程={}, 队列容量={}",
// executor.getCorePoolSize(), executor.getMaxPoolSize(), executor.getQueueCapacity());
//
// return executor;
// }
}
\ No newline at end of file
package com.ask.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 向量化配置属性
*
* @author ai
* @date 2024/12/20
*/
@Data
@Component
@ConfigurationProperties(prefix = "vectorization")
public class VectorizationProperties {
/**
* 启动时处理器配置
*/
private Startup startup = new Startup();
@Data
public static class Startup {
/**
* 是否启用启动时向量化处理器
*/
private boolean enabled = true;
/**
* 启动延迟时间(秒),避免启动时立即执行
*/
private int delaySeconds = 30;
/**
* 每次提交任务间的延迟(毫秒)
*/
private int taskIntervalMs = 100;
/**
* 最大处理文档数量限制
*/
private int maxDocuments = 100;
}
}
\ No newline at end of file
package com.ask.config;
import com.ask.api.entity.KnowledgeDocument;
import com.ask.service.AsyncVectorizationService;
import com.ask.service.KnowledgeDocumentService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 项目启动时的向量化处理器
* 自动处理未完成的文档向量化任务
*
* @author ai
* @date 2024/12/20
*/
@Slf4j
@Component
@Order(1) // 设置执行顺序,数字越小优先级越高
@RequiredArgsConstructor
public class VectorizationStartupProcessor implements ApplicationRunner {
private final KnowledgeDocumentService knowledgeDocumentService;
private final AsyncVectorizationService asyncVectorizationService;
private final VectorizationProperties vectorizationProperties;
@Override
public void run(ApplicationArguments args) throws Exception {
// 检查是否启用
if (!vectorizationProperties.getStartup().isEnabled()) {
log.info("启动向量化处理器已禁用,跳过执行");
return;
}
log.info("=== 启动向量化处理器 ===");
// 延迟启动,避免启动时立即执行,等待系统完全启动
int delaySeconds = vectorizationProperties.getStartup().getDelaySeconds();
if (delaySeconds > 0) {
log.info("延迟 {} 秒后启动向量化处理器...", delaySeconds);
// 使用新线程异步执行,避免阻塞主启动流程
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(delaySeconds * 1000L);
executeVectorization();
} catch (Exception e) {
log.error("延迟启动向量化处理器时发生异常:{}", e.getMessage(), e);
}
});
} else {
// 立即执行
executeVectorization();
}
}
/**
* 执行向量化处理
*/
private void executeVectorization() {
try {
log.info("开始执行向量化处理检查...");
// 查询需要处理的文档
List<KnowledgeDocument> pendingDocuments = findPendingDocuments();
if (pendingDocuments.isEmpty()) {
log.info("没有找到需要处理的文档,向量化处理器执行完成");
return;
}
log.info("找到 {} 个需要处理的文档,开始启动向量化任务", pendingDocuments.size());
// 异步处理所有待处理的文档
processPendingDocuments(pendingDocuments);
log.info("=== 向量化处理器执行完成,已提交 {} 个异步任务 ===", pendingDocuments.size());
} catch (Exception e) {
log.error("执行向量化处理器时发生异常:{}", e.getMessage(), e);
}
}
/**
* 查询需要处理的文档
* 状态:0-待处理,1-处理中
*
* @return 待处理的文档列表
*/
private List<KnowledgeDocument> findPendingDocuments() {
LambdaQueryWrapper<KnowledgeDocument> wrapper = Wrappers.<KnowledgeDocument>lambdaQuery()
.in(KnowledgeDocument::getStatus, 0, 1) // 0-待处理,1-处理中
.eq(KnowledgeDocument::getIsEnabled, 1) // 启用状态
.isNull(KnowledgeDocument::getDelFlag) // 未删除
.orderByAsc(KnowledgeDocument::getCreateTime); // 按创建时间升序
List<KnowledgeDocument> documents = knowledgeDocumentService.list(wrapper);
if (!documents.isEmpty()) {
log.info("查询到需要处理的文档:");
for (KnowledgeDocument doc : documents) {
String statusText = getStatusText(doc.getStatus());
log.info(" - 文档ID: {}, 文档名: {}, 状态: {} ({}), 创建时间: {}",
doc.getId(), doc.getName(), doc.getStatus(), statusText, doc.getCreateTime());
}
}
return documents;
}
/**
* 处理待处理的文档
*
* @param documents 待处理的文档列表
*/
private void processPendingDocuments(List<KnowledgeDocument> documents) {
int taskInterval = vectorizationProperties.getStartup().getTaskIntervalMs();
for (int i = 0; i < documents.size(); i++) {
KnowledgeDocument document = documents.get(i);
try {
log.info("提交向量化任务 [{}/{}]:文档ID={}, 文档名={}",
i + 1, documents.size(), document.getId(), document.getName());
// 异步提交向量化任务
asyncVectorizationService.vectorizeByDocumentIdAsync(document);
// // 添加完成回调
// future.whenComplete((successCount, throwable) -> {
// if (throwable != null) {
// log.error("启动时向量化失败 - 文档ID: {}, 文档名: {}, 错误: {}",
// document.getId(), document.getName(), throwable.getMessage());
// } else {
// log.info("启动时向量化完成 - 文档ID: {}, 文档名: {}, 成功处理: {} 个向量",
// document.getId(), document.getName(), successCount);
// }
// });
// // 添加任务间隔,避免同时启动过多任务
// if (i < documents.size() - 1 && taskInterval > 0) {
// Thread.sleep(taskInterval);
// }
} catch (Exception e) {
log.error("提交向量化任务失败 - 文档ID: {}, 文档名: {}, 错误: {}",
document.getId(), document.getName(), e.getMessage(), e);
}
}
}
/**
* 获取状态文本描述
*
* @param status 状态码
* @return 状态描述
*/
private String getStatusText(Integer status) {
if (status == null) {
return "未知";
}
switch (status) {
case 0:
return "待处理";
case 1:
return "处理中";
case 2:
return "处理完成";
case 3:
return "处理失败";
default:
return "未知状态";
}
}
}
\ No newline at end of file
package com.ask.service;
import com.ask.api.entity.AskVectorStore;
import com.ask.api.entity.KnowledgeDocument;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 异步向量化服务接口
*
* @author ai
* @date 2024/12/20
*/
public interface AsyncVectorizationService {
/**
* 根据文档异步向量化处理
*
* @param document 文档对象
* @return 异步任务结果
*/
CompletableFuture<Integer> vectorizeByDocumentIdAsync(KnowledgeDocument document);
}
\ No newline at end of file
package com.ask.service.impl;
import com.ask.api.entity.AskVectorStore;
import com.ask.api.entity.KnowledgeDocument;
import com.ask.mapper.KnowledgeDocumentMapper;
import com.ask.service.AskVectorStoreService;
import com.ask.service.AsyncVectorizationService;
import com.ask.service.KnowledgeDocumentService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 异步向量化服务实现类
*
* @author ai
* @date 2024/12/20
*/
@Slf4j
@Service
public class AsyncVectorizationServiceImpl implements AsyncVectorizationService {
@Autowired
private AskVectorStoreService askVectorStoreService;
@Autowired
private KnowledgeDocumentMapper knowledgeDocumentMapper;
/**
* 根据文档异步向量化处理
*
* @param document 文档对象
* @return 异步任务结果
*/
@Override
@Async("vectorizationExecutor")
public CompletableFuture<Integer> vectorizeByDocumentIdAsync(KnowledgeDocument document) {
if (document == null) {
log.warn("异步向量化:文档对象为空");
return CompletableFuture.completedFuture(0);
}
try {
log.info("开始根据文档异步向量化,线程:{},文档ID:{}",
Thread.currentThread().getName(), document.getId());
long startTime = System.currentTimeMillis();
// 更新文档状态为处理中
knowledgeDocumentMapper.update(Wrappers.<KnowledgeDocument>lambdaUpdate()
.eq(KnowledgeDocument::getId, document.getId())
.set(KnowledgeDocument::getStatus, 1));
// 查询该文档下所有未向量化的数据
LambdaQueryWrapper<AskVectorStore> wrapper = new LambdaQueryWrapper<AskVectorStore>()
.apply("metadata::jsonb ->> 'documentId' = {0}", document.getId())
.isNull(AskVectorStore::getEmbedding); // 假设embedding为null表示未向量化
List<AskVectorStore> vectorStores = askVectorStoreService.list(wrapper);
if (CollectionUtils.isEmpty(vectorStores)) {
log.info("文档ID:{}下没有需要向量化的数据", document.getId());
// 更新文档状态为处理失败
knowledgeDocumentMapper.update(Wrappers.<KnowledgeDocument>lambdaUpdate()
.eq(KnowledgeDocument::getId, document.getId())
.set(KnowledgeDocument::getStatus, 3));
return CompletableFuture.completedFuture(0);
}
// 批量处理向量化
int successCount = askVectorStoreService.batchUpdateVectorEmbedding(vectorStores);
long endTime = System.currentTimeMillis();
log.info("根据文档异步向量化完成,线程:{},文档ID:{},耗时:{}ms,成功:{}/{}",
Thread.currentThread().getName(), document.getId(), (endTime - startTime),
successCount, vectorStores.size());
// 更新文档状态为处理完成
knowledgeDocumentMapper.update(Wrappers.<KnowledgeDocument>lambdaUpdate()
.eq(KnowledgeDocument::getId, document.getId())
.set(KnowledgeDocument::getStatus, 2));
return CompletableFuture.completedFuture(successCount);
} catch (Exception e) {
// 更新文档状态为处理失败
knowledgeDocumentMapper.update(Wrappers.<KnowledgeDocument>lambdaUpdate()
.eq(KnowledgeDocument::getId, document.getId())
.set(KnowledgeDocument::getStatus, 3));
log.error("根据文档异步向量化失败,线程:{},文档ID:{},错误:{}",
Thread.currentThread().getName(), document.getId(), e.getMessage(), e);
return CompletableFuture.completedFuture(0);
}
}
}
\ No newline at end of file
...@@ -7,6 +7,7 @@ import com.ask.api.entity.KnowledgeDocument; ...@@ -7,6 +7,7 @@ import com.ask.api.entity.KnowledgeDocument;
import com.ask.api.entity.SysFile; import com.ask.api.entity.SysFile;
import com.ask.mapper.KnowledgeDocumentMapper; import com.ask.mapper.KnowledgeDocumentMapper;
import com.ask.service.AskVectorStoreService; import com.ask.service.AskVectorStoreService;
import com.ask.service.AsyncVectorizationService;
import com.ask.service.KnowledgeDocumentService; import com.ask.service.KnowledgeDocumentService;
import com.ask.service.SysFileService; import com.ask.service.SysFileService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
...@@ -27,6 +28,7 @@ import org.springframework.util.StringUtils; ...@@ -27,6 +28,7 @@ import org.springframework.util.StringUtils;
import java.io.InputStream; import java.io.InputStream;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
/** /**
* 知识库文档服务实现类 * 知识库文档服务实现类
...@@ -41,6 +43,7 @@ public class KnowledgeDocumentServiceImpl extends ServiceImpl<KnowledgeDocumentM ...@@ -41,6 +43,7 @@ public class KnowledgeDocumentServiceImpl extends ServiceImpl<KnowledgeDocumentM
private final SysFileService sysFileService; private final SysFileService sysFileService;
private final AskVectorStoreService askVectorStoreService; private final AskVectorStoreService askVectorStoreService;
private final AsyncVectorizationService asyncVectorizationService;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
/** /**
...@@ -471,7 +474,11 @@ public class KnowledgeDocumentServiceImpl extends ServiceImpl<KnowledgeDocumentM ...@@ -471,7 +474,11 @@ public class KnowledgeDocumentServiceImpl extends ServiceImpl<KnowledgeDocumentM
}); });
askVectorStoreService.saveBatch(askVectorStores); askVectorStoreService.saveBatch(askVectorStores);
//需要异步 执行向量化 // 异步执行向量化处理
log.info("开始异步向量化处理,文档ID:{},分段数量:{}", document.getId(), askVectorStores.size());
// 提交异步向量化任务,状态管理由异步服务处理
asyncVectorizationService.vectorizeByDocumentIdAsync(document);
} }
return true; return true;
} }
......
package com.ask; package com.ask;
import com.ask.config.VectorizationProperties;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
/** /**
...@@ -11,6 +13,7 @@ import org.springframework.scheduling.annotation.EnableAsync; ...@@ -11,6 +13,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
*/ */
@EnableAsync @EnableAsync
@SpringBootApplication @SpringBootApplication
@EnableConfigurationProperties({VectorizationProperties.class})
public class AskDataAiApplication { public class AskDataAiApplication {
public static void main(String[] args) { public static void main(String[] args) {
......
...@@ -49,15 +49,31 @@ mybatis-plus: ...@@ -49,15 +49,31 @@ mybatis-plus:
global-config: global-config:
banner: false # 是否打印 mybatis-plus banner banner: false # 是否打印 mybatis-plus banner
db-config: db-config:
id-type: auto # 主键类型 table-prefix: ask_ # 表名前缀
where-strategy: not_empty # where 条件策略 logic-delete-field: delFlag # 全局逻辑删除字段
insert-strategy: not_empty # 插入策略 logic-delete-value: 1 # 逻辑已删除值(默认为 1)
update-strategy: not_null # 更新策略 logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
type-handlers-package: com.ask.common.mybatis.handler # 类型处理器包 insert-strategy: not_null # 插入策略
configuration: update-strategy: not_null # 更新策略
jdbc-type-for-null: 'null' # 是否设置字段为null where-strategy: not_null # 查询策略
call-setters-on-nulls: true # 是否调用set方法时传入null值 id-type: assign_uuid # 主键策略
shrink-whitespaces-in-sql: true # 去掉sql中多余的空格报错 configuration:
map-underscore-to-camel-case: true # 开启驼峰命名
cache-enabled: false # 是否开启二级缓存
call-setters-on-nulls: true # 是否在字段为 null 时调用 setter 方法
jdbc-type-for-null: 'null' # 指定当结果集中值为 null 的时候如何处理
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 日志实现
# 启动时处理器配置
startup:
# 是否启用启动时向量化处理器
enabled: true
# 启动延迟时间(秒),避免启动时立即执行
delay_seconds: 30
# 每次提交任务间的延迟(毫秒)
task_interval_ms: 100
# 最大处理文档数量限制
max_documents: 100
# springdoc-openapi项目配置 # springdoc-openapi项目配置
springdoc: springdoc:
...@@ -105,9 +121,13 @@ logging: ...@@ -105,9 +121,13 @@ logging:
level: level:
root: INFO root: INFO
com.ask: DEBUG com.ask: DEBUG
org.springframework.ai: INFO
org.postgresql: WARN
pattern:
console: "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
# 本地文件系统 # 本地文件系统
file: file:
local: local:
enable: true enable: true
base-path: /app/upFiles base-path: /app/upFiles
\ No newline at end of file
...@@ -49,15 +49,31 @@ mybatis-plus: ...@@ -49,15 +49,31 @@ mybatis-plus:
global-config: global-config:
banner: false # 是否打印 mybatis-plus banner banner: false # 是否打印 mybatis-plus banner
db-config: db-config:
id-type: auto # 主键类型 table-prefix: ask_ # 表名前缀
where-strategy: not_empty # where 条件策略 logic-delete-field: delFlag # 全局逻辑删除字段
insert-strategy: not_empty # 插入策略 logic-delete-value: 1 # 逻辑已删除值(默认为 1)
update-strategy: not_null # 更新策略 logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
type-handlers-package: com.ask.common.mybatis.handler # 类型处理器包 insert-strategy: not_null # 插入策略
configuration: update-strategy: not_null # 更新策略
jdbc-type-for-null: 'null' # 是否设置字段为null where-strategy: not_null # 查询策略
call-setters-on-nulls: true # 是否调用set方法时传入null值 id-type: assign_uuid # 主键策略
shrink-whitespaces-in-sql: true # 去掉sql中多余的空格报错 configuration:
map-underscore-to-camel-case: true # 开启驼峰命名
cache-enabled: false # 是否开启二级缓存
call-setters-on-nulls: true # 是否在字段为 null 时调用 setter 方法
jdbc-type-for-null: 'null' # 指定当结果集中值为 null 的时候如何处理
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 日志实现
# 启动时处理器配置
startup:
# 是否启用启动时向量化处理器
enabled: true
# 启动延迟时间(秒),避免启动时立即执行
delay_seconds: 30
# 每次提交任务间的延迟(毫秒)
task_interval_ms: 100
# 最大处理文档数量限制
max_documents: 100
# springdoc-openapi项目配置 # springdoc-openapi项目配置
springdoc: springdoc:
...@@ -105,9 +121,13 @@ logging: ...@@ -105,9 +121,13 @@ logging:
level: level:
root: INFO root: INFO
com.ask: DEBUG com.ask: DEBUG
org.springframework.ai: INFO
org.postgresql: WARN
pattern:
console: "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
# 本地文件系统 # 本地文件系统
file: file:
local: local:
enable: true enable: true
base-path: /app/upFiles base-path: /app/upFiles
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment