Streaming Response in Spring AI ChatClient
Implement real-time streaming responses for better user experience using Flux and Server-Sent Events
1Why Use Streaming?
When an LLM generates a response, it produces tokens one at a time. Streaming sends these tokens to the client as they're generated, rather than waiting for the complete response.
Without Streaming
User waits 5-30 seconds staring at a loading spinner until the entire response is ready
With Streaming
User sees words appearing in real-time, like watching someone type — feels instant and interactive
Perceived Speed
Response feels immediate
Better UX
ChatGPT-like experience
Early Cancel
Stop if response is wrong
2Basic Streaming with ChatClient
Using stream() Instead of call()
Replace .call() with .stream() to get a reactive Flux:
importorg.springframework.ai.chat.client.ChatClient;importreactor.core.publisher.Flux;@ServicepublicclassStreamingChatService{privatefinalChatClient chatClient;publicStreamingChatService(ChatClient.Builder builder){this.chatClient = builder.build();}// Non-streaming (blocks until complete)publicStringchat(String message){return chatClient.prompt().user(message).call().content();// Returns complete String}// Streaming (emits tokens as they arrive)publicFlux<String>chatStream(String message){return chatClient.prompt().user(message).stream().content();// Returns Flux<String>}}Flux Explained
Flux<String> is a Reactor type representing 0 to N items emitted over time. Each emitted item is a token (word or word fragment) from the LLM.
3REST Endpoint with Server-Sent Events
Create a Streaming Controller
Use MediaType.TEXT_EVENT_STREAM_VALUE for SSE:
importorg.springframework.http.MediaType;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;@RestController@RequestMapping("/api/chat")@RequiredArgsConstructorpublicclassStreamingChatController{privatefinalStreamingChatService chatService;// Standard endpoint (waits for complete response)@PostMappingpublicStringchat(@RequestBodyChatRequest request){return chatService.chat(request.message());}// Streaming endpoint using Server-Sent Events@PostMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<String>chatStream(@RequestBodyChatRequest request){return chatService.chatStream(request.message());}}recordChatRequest(String message){}Frontend JavaScript Consumer
// Using EventSource for SSEasyncfunctionstreamChat(message){const response =awaitfetch('/api/chat/stream',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({ message })});const reader = response.body.getReader();const decoder =newTextDecoder();let fullResponse ='';while(true){const{ done, value }=await reader.read();if(done)break;const chunk = decoder.decode(value);
fullResponse += chunk;// Update UI with each chunk
document.getElementById('response').textContent = fullResponse;}}Real-Time Effect
The response text will appear word by word, creating the familiar ChatGPT-style typing effect.
4Advanced Streaming Patterns
Access Full ChatResponse Metadata
Get token usage, finish reason, and other metadata from streaming responses:
publicFlux<ChatResponse>chatStreamWithMetadata(String message){return chatClient.prompt().user(message).stream().chatResponse();// Returns Flux<ChatResponse> with full metadata}// Process responses with metadatapublicvoidprocessStream(String message){chatStreamWithMetadata(message).doOnNext(response ->{// Access contentString content = response.getResult().getOutput().getContent();// Access metadata (available on final chunk)var metadata = response.getMetadata();if(metadata !=null){Long promptTokens = metadata.getUsage().getPromptTokens();Long completionTokens = metadata.getUsage().getGenerationTokens();System.out.println("Tokens used: "+(promptTokens + completionTokens));}}).subscribe();}Buffering for Word Boundaries
Buffer tokens to emit complete words instead of fragments:
publicFlux<String>streamByWords(String message){return chatClient.prompt().user(message).stream().content().bufferUntil(token -> token.contains(" ")|| token.contains("\n")).map(tokens ->String.join("", tokens));}Timeout and Error Handling
publicFlux<String>streamWithTimeout(String message){return chatClient.prompt().user(message).stream().content().timeout(Duration.ofSeconds(60)).onErrorResume(TimeoutException.class, e ->Flux.just("[Response timed out]")).onErrorResume(Exception.class, e ->Flux.just("[Error: "+ e.getMessage()+"]"));}5Best Practices
Always Set Timeouts
Prevent hanging connections with proper timeout configuration.
Handle Client Disconnection
Use .takeUntilOther() to cancel when client disconnects.
Backpressure Handling
Use .onBackpressureBuffer() for slow consumers.
Memory with Tools
Chat memory advisors may not work with streaming — test thoroughly.
WebSocket Alternative
For bidirectional communication (e.g., cancellation signals), consider WebSocket instead of SSE:
@ControllerpublicclassChatWebSocketHandlerimplementsWebSocketHandler{privatefinalChatClient chatClient;@OverridepublicMono<Void>handle(WebSocketSession session){return session.receive().flatMap(message ->{String userMessage = message.getPayloadAsText();return chatClient.prompt().user(userMessage).stream().content().map(session::textMessage).as(session::send);}).then();}}What You've Learned
Streaming Benefits
Real-time UX, early cancellation
Basic Streaming
.stream() returns Flux<String>
SSE Endpoints
TEXT_EVENT_STREAM media type
Frontend Integration
Fetch API with ReadableStream
Advanced Patterns
Buffering, metadata, timeouts
Best Practices
Error handling, backpressure