ETL Pipeline for AI

    Transform raw documents into AI-ready embeddings. ETL pipelines are the foundation of every production RAG system—extracting content, chunking intelligently, and loading into vector stores.

    ETL (Extract, Transform, Load) in the AI context means taking your documents—PDFs, Word files, databases, web pages—and converting them into vector embeddings stored in a searchable database. The quality of your ETL pipeline directly determines the quality of your RAG answers. Poor chunking = poor retrieval.

    Spring AI provides document readers for common formats, text splitters for intelligent chunking, and seamless integration with vector stores. Build pipelines that handle thousands of documents with proper error handling and monitoring.

    The ETL Flow

    E

    Extract

    Load raw content from various sources into Spring AI's Document abstraction.

    • • PDF, Word, Markdown, HTML
    • • JSON, CSV, databases
    • • Web pages, APIs
    T

    Transform

    Clean, chunk, and enrich documents with metadata for better retrieval.

    • • Text cleaning & normalization
    • • Intelligent chunking
    • • Metadata enrichment
    L

    Load

    Generate embeddings and store in vector database for similarity search.

    • • Embedding generation
    • • Batch insertion
    • • Deduplication

    Document Readers (Extract)

    Built-in Readers

    Spring AI supports many document formats out of the box

    DocumentExtractor.java
    @ServicepublicclassDocumentExtractor{// PDF Documents (with page-level extraction)publicList<Document>extractPdf(Resource pdfResource){PagePdfDocumentReader reader =newPagePdfDocumentReader(
    pdfResource,PdfDocumentReaderConfig.builder().withPageTopMargin(50)// Skip headers.withPageBottomMargin(50)// Skip footers.withPagesPerDocument(1)// One doc per page.build());return reader.read();}// Markdown / Text filespublicList<Document>extractText(Resource textResource){TextReader reader =newTextReader(textResource);
    reader.getCustomMetadata().put("source", textResource.getFilename());return reader.read();}// JSON documents (specify content and metadata fields)publicList<Document>extractJson(Resource jsonResource){JsonReader reader =newJsonReader(
    jsonResource,newJsonReader.JsonMetadataGenerator(){@OverridepublicMap<String,Object>generate(Map<String,Object> jsonMap){returnMap.of("title", jsonMap.get("title"),"author", jsonMap.get("author"));}},"content"// Field containing main text);return reader.read();}// Tika for complex formats (Word, PowerPoint, etc.)publicList<Document>extractWithTika(Resource resource){TikaDocumentReader reader =newTikaDocumentReader(resource);return reader.read();}}

    Text Splitters (Transform)

    Chunking strategy massively impacts retrieval quality. Too small = lost context. Too large = irrelevant noise in results.

    TokenTextSplitter

    Splits by token count with overlap. Most common choice for general use.

    Token-based
    // 500 tokens per chunk, 100 token overlapTokenTextSplitter splitter =newTokenTextSplitter(500,100);List<Document> chunks = splitter.split(documents);

    CharacterTextSplitter

    Splits by character count at natural boundaries (sentences, paragraphs).

    Character-based
    // Split at 1000 chars with 200 overlapCharacterTextSplitter splitter =newCharacterTextSplitter();
    splitter.setChunkSize(1000);
    splitter.setChunkOverlap(200);
    splitter.setSeparator("\n\n");// Prefer paragraph breaks

    Rule of Thumb: Start with 500 tokens, 100 overlap. For code use 300 tokens. For narratives/legal docs try 800 tokens. Always test with real queries.

    Metadata Enrichment

    Add Context for Better Filtering

    Rich metadata enables precise filtering during retrieval

    MetadataEnricher.java
    @ServicepublicclassMetadataEnricher{publicvoidenrichDocument(Document doc,Resource source){Map<String,Object> metadata = doc.getMetadata();// Source information
    metadata.put("source", source.getFilename());
    metadata.put("source_type",getFileType(source));
    metadata.put("ingested_at",Instant.now().toString());// Content statisticsString content = doc.getContent();
    metadata.put("word_count", content.split("\\s+").length);
    metadata.put("char_count", content.length());// Content hash for deduplication
    metadata.put("content_hash",DigestUtils.sha256Hex(content));// Extract dates mentioned in contentList<String> dates =extractDates(content);if(!dates.isEmpty()){
    metadata.put("mentioned_dates", dates);}// Business-specific metadataif(isLegalDocument(source)){
    metadata.put("document_type","legal");
    metadata.put("requires_review",true);}}// Later, use metadata in queriespublicList<Document>searchLegalDocs(String query){FilterExpressionBuilder b =newFilterExpressionBuilder();return vectorStore.similaritySearch(SearchRequest.query(query).withFilterExpression(b.eq("document_type","legal").build()));}}

    Complete ETL Pipeline

    Production-Ready Implementation

    DocumentETLPipeline.java
    @Service@Slf4jpublicclassDocumentETLPipeline{privatefinalVectorStore vectorStore;privatefinalTokenTextSplitter splitter;publicDocumentETLPipeline(VectorStore vectorStore){this.vectorStore = vectorStore;this.splitter =newTokenTextSplitter(500,100);}@TransactionalpublicETLResultprocess(List<Resource> sources){int processed =0;int failed =0;List<String> errors =newArrayList<>();for(Resource source : sources){try{// EXTRACTList<Document> documents =extract(source);// TRANSFORMList<Document> chunks =transform(documents, source);// Deduplicate
    chunks =deduplicate(chunks);// LOADif(!chunks.isEmpty()){
    vectorStore.add(chunks);
    processed += chunks.size();}
    log.info("Processed {} chunks from {}", chunks.size(), source.getFilename());}catch(Exception e){
    failed++;
    errors.add(source.getFilename()+": "+ e.getMessage());
    log.error("Failed to process {}", source.getFilename(), e);}}returnnewETLResult(processed, failed, errors);}privateList<Document>extract(Resource source){String filename = source.getFilename().toLowerCase();if(filename.endsWith(".pdf")){returnnewPagePdfDocumentReader(source).read();}elseif(filename.endsWith(".json")){returnnewJsonReader(source).read();}else{returnnewTextReader(source).read();}}privateList<Document>transform(List<Document> docs,Resource source){return docs.stream().map(this::cleanDocument).flatMap(doc -> splitter.split(List.of(doc)).stream()).peek(doc ->enrichMetadata(doc, source)).toList();}privateList<Document>deduplicate(List<Document> docs){Set<String> seen =newHashSet<>();return docs.stream().filter(doc ->{String hash =(String) doc.getMetadata().get("content_hash");return seen.add(hash);}).toList();}}recordETLResult(int processed,int failed,List<String> errors){}

    Scheduling & Incremental Updates

    ScheduledETLJob.java
    @Component@Slf4jpublicclassScheduledETLJob{privatefinalDocumentETLPipeline pipeline;privatefinalDocumentRepository documentRepo;@Value("${etl.source.directory}")privateString sourceDirectory;// Full sync daily at 2 AM@Scheduled(cron ="0 0 2 * * *")publicvoidfullSync(){
    log.info("Starting full ETL sync");List<Resource> allFiles =loadAllFiles(sourceDirectory);ETLResult result = pipeline.process(allFiles);
    log.info("Full sync complete: {} processed, {} failed", 
    result.processed(), result.failed());}// Incremental sync every hour@Scheduled(fixedRate =3600000)publicvoidincrementalSync(){Instant lastRun =getLastRunTimestamp();List<Resource> newFiles =loadFilesSince(sourceDirectory, lastRun);if(newFiles.isEmpty()){
    log.debug("No new files since {}", lastRun);return;}
    log.info("Processing {} new files", newFiles.size());
    pipeline.process(newFiles);saveLastRunTimestamp(Instant.now());}// Handle deleted documents@Scheduled(cron ="0 0 3 * * *")publicvoidcleanupDeleted(){List<String> existingFiles =getExistingFileHashes(sourceDirectory);List<String> indexedHashes = documentRepo.findAllContentHashes();List<String> toDelete = indexedHashes.stream().filter(hash ->!existingFiles.contains(hash)).toList();if(!toDelete.isEmpty()){
    vectorStore.delete(toDelete);
    log.info("Removed {} stale documents", toDelete.size());}}}

    Monitoring & Observability

    Metrics to Track

    • • Documents processed per hour
    • • Chunks generated per document
    • • Embedding generation latency
    • • Vector store insertion time
    • • Error rate by document type

    Alerts to Configure

    • • ETL job failure
    • • Error rate exceeds threshold
    • • Processing queue backup
    • • Vector store connection issues
    • • Embedding API rate limits
    Micrometer Metrics
    @ComponentpublicclassETLMetrics{privatefinalMeterRegistry meterRegistry;privatefinalCounter documentsProcessed;privatefinalCounter documentsFailed;privatefinalTimer processingTime;publicETLMetrics(MeterRegistry meterRegistry){this.meterRegistry = meterRegistry;this.documentsProcessed =Counter.builder("etl.documents.processed").description("Number of documents successfully processed").register(meterRegistry);this.documentsFailed =Counter.builder("etl.documents.failed").description("Number of documents that failed processing").register(meterRegistry);this.processingTime =Timer.builder("etl.processing.time").description("Time to process each document").register(meterRegistry);}publicvoidrecordSuccess(int count){
    documentsProcessed.increment(count);}publicvoidrecordFailure(){
    documentsFailed.increment();}}

    Best Practices

    ✓ Do

    • • Process documents in batches (100-500 at a time)
    • • Add content hashes for deduplication
    • • Store source file reference in metadata
    • • Use overlap in chunking (10-20%)
    • • Log processing details for debugging
    • • Handle document deletions in vector store

    ✗ Avoid

    • • Processing all documents in memory at once
    • • Chunking with zero overlap
    • • Ignoring failed documents without alerting
    • • Re-processing unchanged documents
    • • Hardcoding chunk sizes without testing
    • • Skipping metadata enrichment

    Build Your Data Pipeline

    A solid ETL pipeline is the difference between a demo and a production RAG system. Start with the basics, then add monitoring and incremental updates.