Class ChunkIndexConsumer

java.lang.Object
com.bytedesk.kbase.llm_chunk.mq.ChunkIndexConsumer

@Component public class ChunkIndexConsumer extends Object
Chunk索引消费者 用于处理Chunk索引队列中的消息 解决大文件切分时的并发问题和重复内容问题
  • Field Details

    • chunkElasticService

      private final ChunkElasticService chunkElasticService
    • chunkRestService

      private final ChunkRestService chunkRestService
    • random

      private final Random random
    • chunkVectorService

      @Autowired(required=false) private ChunkVectorService chunkVectorService
    • jmsTemplate

      @Autowired private org.springframework.jms.core.JmsTemplate jmsTemplate
  • Constructor Details

  • Method Details

    • processIndexMessage

      @JmsListener(destination="bytedesk.queue.chunk.index", containerFactory="jmsArtemisQueueFactory", concurrency="2-5") public void processIndexMessage(jakarta.jms.Message jmsMessage, ChunkIndexMessage message)
      处理Chunk索引队列中的消息 使用客户端确认模式,增强错误处理和重试机制
      Parameters:
      jmsMessage - JMS原始消息
      message - Chunk索引消息
    • getBatchInfo

      private String getBatchInfo(jakarta.jms.Message jmsMessage)
      从JMS消息中获取批处理信息
    • handleIndexOperation

      private void handleIndexOperation(ChunkEntity chunk, ChunkIndexMessage message, String batchInfo)
      处理索引操作 将全文索引和向量索引的操作分开处理,避免一个操作失败影响另一个操作
    • processElasticIndex

      @Transactional(propagation=REQUIRES_NEW) public void processElasticIndex(ChunkEntity chunk)
      在单独事务中处理全文索引
    • processVectorIndex

      public void processVectorIndex(ChunkEntity chunk)
      处理向量索引(使用独立的事务管理策略)
    • isDuplicateContent

      private boolean isDuplicateContent(ChunkEntity chunk)
      检查是否为重复内容 简单的重复检查策略:基于内容哈希值
    • markChunkAsProcessed

      @Transactional(propagation=REQUIRES_NEW) public void markChunkAsProcessed(ChunkEntity chunk)
      标记chunk为已处理
    • sendChunkCompleteNotification

      private void sendChunkCompleteNotification(ChunkEntity chunk, String status, String errorMessage, String processType)
      发送chunk处理完成通知
    • handleDeleteOperation

      private void handleDeleteOperation(ChunkEntity chunk, ChunkIndexMessage message, String batchInfo)
      处理删除操作 将全文索引和向量索引的删除操作分开处理
    • processElasticDelete

      @Transactional(propagation=REQUIRES_NEW) public Boolean processElasticDelete(String chunkUid)
      在单独事务中处理全文索引删除
    • processVectorDelete

      @Transactional(propagation=REQUIRES_NEW) public Boolean processVectorDelete(ChunkEntity chunk)
      在单独事务中处理向量索引删除
    • acknowledgeMessage

      private void acknowledgeMessage(jakarta.jms.Message message)
      安全地确认消息 只有在消息处理成功后才调用此方法
      Parameters:
      message - JMS消息