输入“/”快速插入内容

5 - Dify中高质量索引模式时,通过线程池处理chunk过程

2024年12月10日修改
本文讨论了Dify中高质量索引模式下,利用线程池执行器处理chunk的过程及相关方法。关键要点包括:
1.
线程池处理chunk核心思想:假设文档拆分为12个段,chunk_size=10时,分2批提交给线程池执行器处理。
2.
线程池处理chunk判断条件:仅当数据集索引技术为 "high_quality" 时,才执行并行处理代码。
3.
线程池执行器创建:使用ThreadPoolExecutor创建最多10个并发线程的线程池执行器。
4.
任务提交与结果处理:遍历文档按chunk_size拆分,提交任务到线程池,最后累加任务结果到tokens变量。
5.
_process_chunk方法定义:在Flask应用上下文中运行,处理文档块,计算tokens数量、加载索引、更新文档段状态。
6.
_process_chunk方法参数:包含类实例、Flask应用对象、索引处理器等多个参数。
7.
_process_chunk方法操作:检查文档是否暂停,计算tokens,加载索引,提取文档ID,更新文档段状态并提交事务,返回tokens数量 。
本文主要介绍了Dify中高质量索引模式时,如何通过线程池执行器来处理chunk的过程。源码位置:dify\api\core\indexing_runner.py\IndexingRunner._load。核心思想:假设一个数据集中有一个文档,该文档可以拆分为12个段(segment)。如果chunk_size=10,那么分为2批提交给线程池执行器进行处理。
一.线程池处理chunk
1.方法处理过程
这段代码的目的是通过多线程并发处理文档集合中的每个块,提高处理效率。它创建了一个包含最多10个线程的线程池,并将文档集合按块拆分后提交给线程池执行器处理。最终,它收集所有任务的结果并累加到 tokens 变量中。这种方式可以显著加快大规模文档集合的处理速度。
if dataset.indexing_technique == 'high_quality':
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # 线程池执行器
futures = []
for i in range(0, len(documents), chunk_size): # 遍历文档
chunk_documents = documents[i:i + chunk_size] # 块文档
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance)) # 提交任务
for future in futures: # 遍历futures
tokens += future.result() # 令牌
2.判断条件
这段代码是用来并行处理文档集合的一部分。它使用了Python的 concurrent.futures 模块来创建一个线程池执行器,以便在多个线程中并发执行任务。下面是详细解释每一行代码的作用:
if dataset.indexing_technique == 'high_quality':
检查数据集的索引技术是否为 "high_quality"。只有在这种情况下,下面的并行处理代码才会被执行。
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
3.创建线程池执行器
使用 ThreadPoolExecutor 创建一个包含最多10个线程的线程池执行器。max_workers=10 表示线程池中最多可以有10个并发线程。
futures = []
4.初始化 futures 列表
用于存储每个提交的任务的 future 对象。
for i in range(0, len(documents), chunk_size):
5.遍历文档
通过步长 chunk_size 遍历文档集合 documentsi 是起始索引。
chunk_documents = documents[i:i + chunk_size]
6.块文档
从文档集合中提取一块文档,这块文档的大小为 chunk_size。这部分文档会被单独处理。
futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
chunk_documents, dataset,
dataset_document, embedding_model_instance,
embedding_model_type_instance))
7.提交任务
使用 executor.submit() 方法提交一个任务给线程池执行器。每个任务调用 self._process_chunk 方法,并传入一系列参数。返回 future 对象会被添加到 futures 列表中。传递给 _process_chunk 参数包括:
current_app._get_current_object():获取当前应用对象。
index_processor:索引处理器。
chunk_documents:当前块的文档。