Spring AI 如何集成Spring Cloud Function构建ETL数据处理管道

后端 潘老师 2个月前 (02-21) 36 ℃ (0) 扫码查看

在开发Spring AI数据摄取微服务时,搭建高效的ETL(提取、转换、加载)管道是关键一环。本文将详细介绍如何利用Spring Cloud Function和Spring AI来构建这样的管道,通过一步步的操作指南,带你从环境搭建到功能实现,最后进行演示验证,帮助你轻松掌握这一技术,为实际项目开发提供有力支持。

一、Maven依赖配置

要基于Spring Cloud Function配置ETL管道,首先得添加Spring AI和Spring Cloud Function模块的相关依赖。在项目的pom.xml文件中进行如下配置:

<properties>
    <!-- 设置Spring Cloud版本 -->
    <spring.cloud.version>2023.0.1</spring.cloud.version> 
    <!-- 设置Spring Functions Catalog版本 -->
    <spring.functions.catalog.version>5.0.0-SNAPSHOT</spring.functions.catalog.version> 
</properties>
<dependencies>
    <!-- Spring Cloud Function上下文依赖 -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-context</artifactId>
    </dependency>
    <!-- 文件供应商依赖,用于提供文件流 -->
    <dependency>
        <groupId>org.springframework.cloud.fn</groupId>
        <artifactId>spring-file-supplier</artifactId>
    </dependency>
    <!-- Spring AI的Tika文档读取器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-tika-document-reader</artifactId>
    </dependency>
    <!-- Spring AI与OpenAI集成的启动器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-openai-spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring AI与Chroma数据库集成的启动器依赖 -->
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-chroma-store-spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring Boot与Docker Compose集成的依赖,用于运行时管理容器 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-docker-compose</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>
<dependencyManagement>
    <dependencies>
        <!-- Spring Cloud依赖管理 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!-- Spring Functions Catalog的BOM依赖管理 -->
        <dependency>
            <groupId>org.springframework.cloud.fn</groupId>
            <artifactId>spring-functions-catalog-bom</artifactId>
            <version>${spring.functions.catalog.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

通过这些依赖,项目就能引入所需的功能模块,为后续构建ETL管道打下基础。

二、文件供应商与文档读取器

spring-file-supplier模块提供了一个fileSupplier,它可以在其他应用中复用和组合。这个供应商能从指定目录生成一个文件的响应式流,开发者需要订阅这个Flux来获取数据。可以通过配置file.supplier.*相关属性,指定输入目录的位置和支持的文件格式。

# 设置输入目录路径
file.supplier.directory=c:/temp/ingestion-files 
# 设置文件名匹配正则表达式,只处理符合格式的文件
file.supplier.filename-regex=.*\.(pdf|docx|txt|pages|csv) 

当Spring检测到Maven坐标和配置属性后,会自动启用一个函数,该函数读取文件内容并返回Flux<Message<byte[]>>。但在Spring AI的ETL管道中,我们需要的是org.springframework.ai.document.Document对象的Flux。所以,要创建一个新函数documentReader来进行格式转换。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.ai.document.Document;
import org.springframework.ai.reader.tika.TikaDocumentReader;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.stream.Collectors;

@Configuration
public class CloudFunctionConfig {
    // 定义一个名为documentReader的Bean,用于将文件流转换为Document对象列表的流
    @Bean
    public Function<Flux<Message<byte[]>>, Flux<List<Document>>> documentReader() {
        // 输入是包含文件字节数据的消息流,输出是Document对象列表的流
        return resourceFlux -> resourceFlux
               // 对每个消息进行转换操作
               .map(message -> {
                    // 使用TikaDocumentReader将字节数据转换为Document对象列表
                    List<Document> documents = new TikaDocumentReader(new ByteArrayResource(message.getPayload()))
                           .get()
                           .stream()
                           // 为每个Document对象添加文件来源元数据
                           .peek(document -> {
                                document.getMetadata().put("source", message.getHeaders().get("file_name"));
                            })
                           .collect(Collectors.toList());
                    return documents;
                });
    }
    // 其他可能的配置方法
    //... 
}

三、文档转换器

文档转换器的功能是使用TokenTextSplitter将读取到的Document对象分割成文本块。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.transformer.splitter.TokenTextSplitter;
import reactor.core.publisher.Flux;
import java.util.List;

@Configuration
public class CloudFunctionConfig {
    // 定义一个名为documentTransformer的Bean,用于分割Document对象列表
    @Bean
    public Function<Flux<List<Document>>, Flux<List<Document>>> documentTransformer() {
        // 输入是Document对象列表的流,输出也是Document对象列表的流
        return documentListFlux -> documentListFlux
               // 对每个Document对象列表进行转换操作
               .map(unsplitList -> new TokenTextSplitter().apply(unsplitList));
    }
    // 其他可能的配置方法
    //... 
}

四、文档写入器

文档写入器负责接收分割后的Document列表,并将其文本和嵌入向量存储到向量数据库中。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.VectorStore;
import reactor.core.publisher.Flux;
import java.util.List;

@Configuration
public class CloudFunctionConfig {
    // 定义日志记录器
    private static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfig.class); 

    // 定义一个名为documentWriter的Bean,用于将Document对象列表写入向量数据库
    @Bean
    public Consumer<Flux<List<Document>>> documentWriter(VectorStore vectorStore) {
        // 输入是Document对象列表的流
        return documentFlux -> documentFlux
               // 对每个Document对象列表进行操作
               .doOnNext(documents -> {
                    // 记录写入向量数据库的文档数量
                    LOGGER.info("Writing {} documents to vector store.", documents.size()); 
                    // 将Document对象列表写入向量数据库
                    vectorStore.accept(documents); 
                    // 记录已写入向量数据库的文档数量
                    LOGGER.info("{} documents have been written to vector store.", documents.size()); 
                })
               // 订阅流,开始处理数据
               .subscribe(); 
    }
    // 其他可能的配置方法
    //... 
}

五、配置ETL管道

当所有的Bean都添加到@Configuration类中后,就可以在属性文件中组合函数,形成ETL管道。在属性文件中添加如下配置:

# 定义ETL管道中函数的执行顺序
spring.cloud.function.definition=fileSupplier|documentReader|documentTransformer|documentWriter 

这样就指定了文件供应商、文档读取器、文档转换器和文档写入器的执行顺序,构建起了完整的ETL管道。

六、执行ETL管道

要执行一个函数,可以通过FunctionCatalog.lookup()方法查找其实例,然后调用run()方法来执行。如果上下文中只有一个组合函数,catalog.lookup(null)就能返回这个组合函数;否则,需要在lookup()方法中指定函数名。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.catalog.FunctionCatalog;
import org.springframework.stereotype.Service;

@Service
public class IngestionService {
    // 注入FunctionCatalog实例
    private final FunctionCatalog catalog; 

    // 构造函数,初始化FunctionCatalog实例
    public IngestionService(FunctionCatalog catalog) {
        this.catalog = catalog;
    }

    // 定义数据摄取方法
    public void ingest() {
        // 获取组合函数实例
        Runnable composedFunction = catalog.lookup(null); 
        // 执行组合函数
        composedFunction.run(); 
    }
}

七、演示

完成上述配置后,可以从多种方式调用ingest()方法,比如通过REST端点、批处理定时任务,或者任何支持流处理的应用。在本文示例中,通过REST端点来调用。

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class IngestionController {
    // 注入IngestionService实例
    @Resource
    IngestionService ingestionService; 

    // 处理POST请求,执行数据摄取操作
    @PostMapping("run-ingestion")
    public ResponseEntity<?> run() {
        // 调用IngestionService的ingest方法执行数据摄取
        ingestionService.ingest(); 
        // 返回表示请求已被接受的响应
        return ResponseEntity.accepted().build(); 
    }
}

在本地运行应用,调用/run-ingestion端点,它会按顺序执行所有函数,并将文件内容的嵌入向量存储到向量数据库中。通过查看控制台日志,可以验证操作是否成功。例如,日志中会显示类似如下信息:

Writing 7 documents to vector store.
7 documents have been written to vector store.

八、总结

通过这个Spring Cloud Function的教程,我们成功构建了一个适用于Spring AI应用的数据摄取ETL管道。该应用利用内置的供应商函数,从指定的文件系统目录读取多种格式的文档,然后通过自定义的函数将文档内容处理成文本块,并把嵌入向量存储到Chroma向量数据库中。

归属教程 Spring AI 快速入门教程汇总

文章目录 Spring AI是什么?有啥优势? 如何在项目中使用Spring AI? Spring AI详细功 […]


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/14872.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】