ETL Pipeline for AI
Transform raw documents into AI-ready embeddings. ETL pipelines are the foundation of every production RAG system—extracting content, chunking intelligently, and loading into vector stores.
ETL (Extract, Transform, Load) in the AI context means taking your documents—PDFs, Word files, databases, web pages—and converting them into vector embeddings stored in a searchable database. The quality of your ETL pipeline directly determines the quality of your RAG answers. Poor chunking = poor retrieval.
Spring AI provides document readers for common formats, text splitters for intelligent chunking, and seamless integration with vector stores. Build pipelines that handle thousands of documents with proper error handling and monitoring.
The ETL Flow
Extract
Load raw content from various sources into Spring AI's Document abstraction.
- • PDF, Word, Markdown, HTML
- • JSON, CSV, databases
- • Web pages, APIs
Transform
Clean, chunk, and enrich documents with metadata for better retrieval.
- • Text cleaning & normalization
- • Intelligent chunking
- • Metadata enrichment
Load
Generate embeddings and store in vector database for similarity search.
- • Embedding generation
- • Batch insertion
- • Deduplication
Document Readers (Extract)
Built-in Readers
Spring AI supports many document formats out of the box
@ServicepublicclassDocumentExtractor{// PDF Documents (with page-level extraction)publicList<Document>extractPdf(Resource pdfResource){PagePdfDocumentReader reader =newPagePdfDocumentReader(
pdfResource,PdfDocumentReaderConfig.builder().withPageTopMargin(50)// Skip headers.withPageBottomMargin(50)// Skip footers.withPagesPerDocument(1)// One doc per page.build());return reader.read();}// Markdown / Text filespublicList<Document>extractText(Resource textResource){TextReader reader =newTextReader(textResource);
reader.getCustomMetadata().put("source", textResource.getFilename());return reader.read();}// JSON documents (specify content and metadata fields)publicList<Document>extractJson(Resource jsonResource){JsonReader reader =newJsonReader(
jsonResource,newJsonReader.JsonMetadataGenerator(){@OverridepublicMap<String,Object>generate(Map<String,Object> jsonMap){returnMap.of("title", jsonMap.get("title"),"author", jsonMap.get("author"));}},"content"// Field containing main text);return reader.read();}// Tika for complex formats (Word, PowerPoint, etc.)publicList<Document>extractWithTika(Resource resource){TikaDocumentReader reader =newTikaDocumentReader(resource);return reader.read();}}Text Splitters (Transform)
Chunking strategy massively impacts retrieval quality. Too small = lost context. Too large = irrelevant noise in results.
TokenTextSplitter
Splits by token count with overlap. Most common choice for general use.
// 500 tokens per chunk, 100 token overlapTokenTextSplitter splitter =newTokenTextSplitter(500,100);List<Document> chunks = splitter.split(documents);CharacterTextSplitter
Splits by character count at natural boundaries (sentences, paragraphs).
// Split at 1000 chars with 200 overlapCharacterTextSplitter splitter =newCharacterTextSplitter();
splitter.setChunkSize(1000);
splitter.setChunkOverlap(200);
splitter.setSeparator("\n\n");// Prefer paragraph breaksRule of Thumb: Start with 500 tokens, 100 overlap. For code use 300 tokens. For narratives/legal docs try 800 tokens. Always test with real queries.
Metadata Enrichment
Add Context for Better Filtering
Rich metadata enables precise filtering during retrieval
@ServicepublicclassMetadataEnricher{publicvoidenrichDocument(Document doc,Resource source){Map<String,Object> metadata = doc.getMetadata();// Source information
metadata.put("source", source.getFilename());
metadata.put("source_type",getFileType(source));
metadata.put("ingested_at",Instant.now().toString());// Content statisticsString content = doc.getContent();
metadata.put("word_count", content.split("\\s+").length);
metadata.put("char_count", content.length());// Content hash for deduplication
metadata.put("content_hash",DigestUtils.sha256Hex(content));// Extract dates mentioned in contentList<String> dates =extractDates(content);if(!dates.isEmpty()){
metadata.put("mentioned_dates", dates);}// Business-specific metadataif(isLegalDocument(source)){
metadata.put("document_type","legal");
metadata.put("requires_review",true);}}// Later, use metadata in queriespublicList<Document>searchLegalDocs(String query){FilterExpressionBuilder b =newFilterExpressionBuilder();return vectorStore.similaritySearch(SearchRequest.query(query).withFilterExpression(b.eq("document_type","legal").build()));}}Complete ETL Pipeline
Production-Ready Implementation
@Service@Slf4jpublicclassDocumentETLPipeline{privatefinalVectorStore vectorStore;privatefinalTokenTextSplitter splitter;publicDocumentETLPipeline(VectorStore vectorStore){this.vectorStore = vectorStore;this.splitter =newTokenTextSplitter(500,100);}@TransactionalpublicETLResultprocess(List<Resource> sources){int processed =0;int failed =0;List<String> errors =newArrayList<>();for(Resource source : sources){try{// EXTRACTList<Document> documents =extract(source);// TRANSFORMList<Document> chunks =transform(documents, source);// Deduplicate
chunks =deduplicate(chunks);// LOADif(!chunks.isEmpty()){
vectorStore.add(chunks);
processed += chunks.size();}
log.info("Processed {} chunks from {}", chunks.size(), source.getFilename());}catch(Exception e){
failed++;
errors.add(source.getFilename()+": "+ e.getMessage());
log.error("Failed to process {}", source.getFilename(), e);}}returnnewETLResult(processed, failed, errors);}privateList<Document>extract(Resource source){String filename = source.getFilename().toLowerCase();if(filename.endsWith(".pdf")){returnnewPagePdfDocumentReader(source).read();}elseif(filename.endsWith(".json")){returnnewJsonReader(source).read();}else{returnnewTextReader(source).read();}}privateList<Document>transform(List<Document> docs,Resource source){return docs.stream().map(this::cleanDocument).flatMap(doc -> splitter.split(List.of(doc)).stream()).peek(doc ->enrichMetadata(doc, source)).toList();}privateList<Document>deduplicate(List<Document> docs){Set<String> seen =newHashSet<>();return docs.stream().filter(doc ->{String hash =(String) doc.getMetadata().get("content_hash");return seen.add(hash);}).toList();}}recordETLResult(int processed,int failed,List<String> errors){}Scheduling & Incremental Updates
@Component@Slf4jpublicclassScheduledETLJob{privatefinalDocumentETLPipeline pipeline;privatefinalDocumentRepository documentRepo;@Value("${etl.source.directory}")privateString sourceDirectory;// Full sync daily at 2 AM@Scheduled(cron ="0 0 2 * * *")publicvoidfullSync(){
log.info("Starting full ETL sync");List<Resource> allFiles =loadAllFiles(sourceDirectory);ETLResult result = pipeline.process(allFiles);
log.info("Full sync complete: {} processed, {} failed",
result.processed(), result.failed());}// Incremental sync every hour@Scheduled(fixedRate =3600000)publicvoidincrementalSync(){Instant lastRun =getLastRunTimestamp();List<Resource> newFiles =loadFilesSince(sourceDirectory, lastRun);if(newFiles.isEmpty()){
log.debug("No new files since {}", lastRun);return;}
log.info("Processing {} new files", newFiles.size());
pipeline.process(newFiles);saveLastRunTimestamp(Instant.now());}// Handle deleted documents@Scheduled(cron ="0 0 3 * * *")publicvoidcleanupDeleted(){List<String> existingFiles =getExistingFileHashes(sourceDirectory);List<String> indexedHashes = documentRepo.findAllContentHashes();List<String> toDelete = indexedHashes.stream().filter(hash ->!existingFiles.contains(hash)).toList();if(!toDelete.isEmpty()){
vectorStore.delete(toDelete);
log.info("Removed {} stale documents", toDelete.size());}}}Monitoring & Observability
Metrics to Track
- • Documents processed per hour
- • Chunks generated per document
- • Embedding generation latency
- • Vector store insertion time
- • Error rate by document type
Alerts to Configure
- • ETL job failure
- • Error rate exceeds threshold
- • Processing queue backup
- • Vector store connection issues
- • Embedding API rate limits
@ComponentpublicclassETLMetrics{privatefinalMeterRegistry meterRegistry;privatefinalCounter documentsProcessed;privatefinalCounter documentsFailed;privatefinalTimer processingTime;publicETLMetrics(MeterRegistry meterRegistry){this.meterRegistry = meterRegistry;this.documentsProcessed =Counter.builder("etl.documents.processed").description("Number of documents successfully processed").register(meterRegistry);this.documentsFailed =Counter.builder("etl.documents.failed").description("Number of documents that failed processing").register(meterRegistry);this.processingTime =Timer.builder("etl.processing.time").description("Time to process each document").register(meterRegistry);}publicvoidrecordSuccess(int count){
documentsProcessed.increment(count);}publicvoidrecordFailure(){
documentsFailed.increment();}}Best Practices
✓ Do
- • Process documents in batches (100-500 at a time)
- • Add content hashes for deduplication
- • Store source file reference in metadata
- • Use overlap in chunking (10-20%)
- • Log processing details for debugging
- • Handle document deletions in vector store
✗ Avoid
- • Processing all documents in memory at once
- • Chunking with zero overlap
- • Ignoring failed documents without alerting
- • Re-processing unchanged documents
- • Hardcoding chunk sizes without testing
- • Skipping metadata enrichment