章
目
录
在开发Spring AI数据摄取微服务时,搭建高效的ETL(提取、转换、加载)管道是关键一环。本文将详细介绍如何利用Spring Cloud Function和Spring AI来构建这样的管道,通过一步步的操作指南,带你从环境搭建到功能实现,最后进行演示验证,帮助你轻松掌握这一技术,为实际项目开发提供有力支持。
一、Maven依赖配置
要基于Spring Cloud Function配置ETL管道,首先得添加Spring AI和Spring Cloud Function模块的相关依赖。在项目的pom.xml
文件中进行如下配置:
<properties>
<!-- 设置Spring Cloud版本 -->
<spring.cloud.version>2023.0.1</spring.cloud.version>
<!-- 设置Spring Functions Catalog版本 -->
<spring.functions.catalog.version>5.0.0-SNAPSHOT</spring.functions.catalog.version>
</properties>
<dependencies>
<!-- Spring Cloud Function上下文依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
</dependency>
<!-- 文件供应商依赖,用于提供文件流 -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-file-supplier</artifactId>
</dependency>
<!-- Spring AI的Tika文档读取器依赖 -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-tika-document-reader</artifactId>
</dependency>
<!-- Spring AI与OpenAI集成的启动器依赖 -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- Spring AI与Chroma数据库集成的启动器依赖 -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-chroma-store-spring-boot-starter</artifactId>
</dependency>
<!-- Spring Boot与Docker Compose集成的依赖,用于运行时管理容器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- Spring Cloud依赖管理 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Functions Catalog的BOM依赖管理 -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-functions-catalog-bom</artifactId>
<version>${spring.functions.catalog.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
通过这些依赖,项目就能引入所需的功能模块,为后续构建ETL管道打下基础。
二、文件供应商与文档读取器
spring-file-supplier
模块提供了一个fileSupplier
,它可以在其他应用中复用和组合。这个供应商能从指定目录生成一个文件的响应式流,开发者需要订阅这个Flux
来获取数据。可以通过配置file.supplier.*
相关属性,指定输入目录的位置和支持的文件格式。
# 设置输入目录路径
file.supplier.directory=c:/temp/ingestion-files
# 设置文件名匹配正则表达式,只处理符合格式的文件
file.supplier.filename-regex=.*\.(pdf|docx|txt|pages|csv)
当Spring检测到Maven坐标和配置属性后,会自动启用一个函数,该函数读取文件内容并返回Flux<Message<byte[]>>
。但在Spring AI的ETL管道中,我们需要的是org.springframework.ai.document.Document
对象的Flux
。所以,要创建一个新函数documentReader
来进行格式转换。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.ai.document.Document;
import org.springframework.ai.reader.tika.TikaDocumentReader;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.stream.Collectors;
@Configuration
public class CloudFunctionConfig {
// 定义一个名为documentReader的Bean,用于将文件流转换为Document对象列表的流
@Bean
public Function<Flux<Message<byte[]>>, Flux<List<Document>>> documentReader() {
// 输入是包含文件字节数据的消息流,输出是Document对象列表的流
return resourceFlux -> resourceFlux
// 对每个消息进行转换操作
.map(message -> {
// 使用TikaDocumentReader将字节数据转换为Document对象列表
List<Document> documents = new TikaDocumentReader(new ByteArrayResource(message.getPayload()))
.get()
.stream()
// 为每个Document对象添加文件来源元数据
.peek(document -> {
document.getMetadata().put("source", message.getHeaders().get("file_name"));
})
.collect(Collectors.toList());
return documents;
});
}
// 其他可能的配置方法
//...
}
三、文档转换器
文档转换器的功能是使用TokenTextSplitter
将读取到的Document
对象分割成文本块。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.transformer.splitter.TokenTextSplitter;
import reactor.core.publisher.Flux;
import java.util.List;
@Configuration
public class CloudFunctionConfig {
// 定义一个名为documentTransformer的Bean,用于分割Document对象列表
@Bean
public Function<Flux<List<Document>>, Flux<List<Document>>> documentTransformer() {
// 输入是Document对象列表的流,输出也是Document对象列表的流
return documentListFlux -> documentListFlux
// 对每个Document对象列表进行转换操作
.map(unsplitList -> new TokenTextSplitter().apply(unsplitList));
}
// 其他可能的配置方法
//...
}
四、文档写入器
文档写入器负责接收分割后的Document
列表,并将其文本和嵌入向量存储到向量数据库中。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.VectorStore;
import reactor.core.publisher.Flux;
import java.util.List;
@Configuration
public class CloudFunctionConfig {
// 定义日志记录器
private static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfig.class);
// 定义一个名为documentWriter的Bean,用于将Document对象列表写入向量数据库
@Bean
public Consumer<Flux<List<Document>>> documentWriter(VectorStore vectorStore) {
// 输入是Document对象列表的流
return documentFlux -> documentFlux
// 对每个Document对象列表进行操作
.doOnNext(documents -> {
// 记录写入向量数据库的文档数量
LOGGER.info("Writing {} documents to vector store.", documents.size());
// 将Document对象列表写入向量数据库
vectorStore.accept(documents);
// 记录已写入向量数据库的文档数量
LOGGER.info("{} documents have been written to vector store.", documents.size());
})
// 订阅流,开始处理数据
.subscribe();
}
// 其他可能的配置方法
//...
}
五、配置ETL管道
当所有的Bean都添加到@Configuration
类中后,就可以在属性文件中组合函数,形成ETL管道。在属性文件中添加如下配置:
# 定义ETL管道中函数的执行顺序
spring.cloud.function.definition=fileSupplier|documentReader|documentTransformer|documentWriter
这样就指定了文件供应商、文档读取器、文档转换器和文档写入器的执行顺序,构建起了完整的ETL管道。
六、执行ETL管道
要执行一个函数,可以通过FunctionCatalog.lookup()
方法查找其实例,然后调用run()
方法来执行。如果上下文中只有一个组合函数,catalog.lookup(null)
就能返回这个组合函数;否则,需要在lookup()
方法中指定函数名。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.catalog.FunctionCatalog;
import org.springframework.stereotype.Service;
@Service
public class IngestionService {
// 注入FunctionCatalog实例
private final FunctionCatalog catalog;
// 构造函数,初始化FunctionCatalog实例
public IngestionService(FunctionCatalog catalog) {
this.catalog = catalog;
}
// 定义数据摄取方法
public void ingest() {
// 获取组合函数实例
Runnable composedFunction = catalog.lookup(null);
// 执行组合函数
composedFunction.run();
}
}
七、演示
完成上述配置后,可以从多种方式调用ingest()
方法,比如通过REST端点、批处理定时任务,或者任何支持流处理的应用。在本文示例中,通过REST端点来调用。
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class IngestionController {
// 注入IngestionService实例
@Resource
IngestionService ingestionService;
// 处理POST请求,执行数据摄取操作
@PostMapping("run-ingestion")
public ResponseEntity<?> run() {
// 调用IngestionService的ingest方法执行数据摄取
ingestionService.ingest();
// 返回表示请求已被接受的响应
return ResponseEntity.accepted().build();
}
}
在本地运行应用,调用/run-ingestion
端点,它会按顺序执行所有函数,并将文件内容的嵌入向量存储到向量数据库中。通过查看控制台日志,可以验证操作是否成功。例如,日志中会显示类似如下信息:
Writing 7 documents to vector store.
7 documents have been written to vector store.
八、总结
通过这个Spring Cloud Function的教程,我们成功构建了一个适用于Spring AI应用的数据摄取ETL管道。该应用利用内置的供应商函数,从指定的文件系统目录读取多种格式的文档,然后通过自定义的函数将文档内容处理成文本块,并把嵌入向量存储到Chroma向量数据库中。
文章目录 Spring AI是什么?有啥优势? 如何在项目中使用Spring AI? Spring AI详细功 […]