Package com.bytedesk.kbase.llm_chunk.mq
Class ChunkIndexConsumer
java.lang.Object
com.bytedesk.kbase.llm_chunk.mq.ChunkIndexConsumer
Chunk索引消费者
用于处理Chunk索引队列中的消息
解决大文件切分时的并发问题和重复内容问题
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final ChunkElasticServiceprivate final ChunkRestServiceprivate ChunkVectorServiceprivate org.springframework.jms.core.JmsTemplateprivate final Random -
Constructor Summary
ConstructorsConstructorDescriptionChunkIndexConsumer(ChunkElasticService chunkElasticService, ChunkRestService chunkRestService) -
Method Summary
Modifier and TypeMethodDescriptionprivate voidacknowledgeMessage(jakarta.jms.Message message) 安全地确认消息 只有在消息处理成功后才调用此方法private StringgetBatchInfo(jakarta.jms.Message jmsMessage) 从JMS消息中获取批处理信息private voidhandleDeleteOperation(ChunkEntity chunk, ChunkIndexMessage message, String batchInfo) 处理删除操作 将全文索引和向量索引的删除操作分开处理private voidhandleIndexOperation(ChunkEntity chunk, ChunkIndexMessage message, String batchInfo) 处理索引操作 将全文索引和向量索引的操作分开处理,避免一个操作失败影响另一个操作private booleanisDuplicateContent(ChunkEntity chunk) 检查是否为重复内容 简单的重复检查策略:基于内容哈希值voidmarkChunkAsProcessed(ChunkEntity chunk) 标记chunk为已处理processElasticDelete(String chunkUid) 在单独事务中处理全文索引删除voidprocessElasticIndex(ChunkEntity chunk) 在单独事务中处理全文索引voidprocessIndexMessage(jakarta.jms.Message jmsMessage, ChunkIndexMessage message) 处理Chunk索引队列中的消息 使用客户端确认模式,增强错误处理和重试机制processVectorDelete(ChunkEntity chunk) 在单独事务中处理向量索引删除voidprocessVectorIndex(ChunkEntity chunk) 处理向量索引(使用独立的事务管理策略)private voidsendChunkCompleteNotification(ChunkEntity chunk, String status, String errorMessage, String processType) 发送chunk处理完成通知
-
Field Details
-
chunkElasticService
-
chunkRestService
-
random
-
chunkVectorService
-
jmsTemplate
@Autowired private org.springframework.jms.core.JmsTemplate jmsTemplate
-
-
Constructor Details
-
ChunkIndexConsumer
public ChunkIndexConsumer(ChunkElasticService chunkElasticService, ChunkRestService chunkRestService)
-
-
Method Details
-
processIndexMessage
@JmsListener(destination="bytedesk.queue.chunk.index", containerFactory="jmsArtemisQueueFactory", concurrency="2-5") public void processIndexMessage(jakarta.jms.Message jmsMessage, ChunkIndexMessage message) 处理Chunk索引队列中的消息 使用客户端确认模式,增强错误处理和重试机制- Parameters:
jmsMessage- JMS原始消息message- Chunk索引消息
-
getBatchInfo
从JMS消息中获取批处理信息 -
handleIndexOperation
处理索引操作 将全文索引和向量索引的操作分开处理,避免一个操作失败影响另一个操作 -
processElasticIndex
在单独事务中处理全文索引 -
processVectorIndex
处理向量索引(使用独立的事务管理策略) -
isDuplicateContent
检查是否为重复内容 简单的重复检查策略:基于内容哈希值 -
markChunkAsProcessed
标记chunk为已处理 -
sendChunkCompleteNotification
private void sendChunkCompleteNotification(ChunkEntity chunk, String status, String errorMessage, String processType) 发送chunk处理完成通知 -
handleDeleteOperation
处理删除操作 将全文索引和向量索引的删除操作分开处理 -
processElasticDelete
在单独事务中处理全文索引删除 -
processVectorDelete
在单独事务中处理向量索引删除 -
acknowledgeMessage
private void acknowledgeMessage(jakarta.jms.Message message) 安全地确认消息 只有在消息处理成功后才调用此方法- Parameters:
message- JMS消息
-