Event Streaming
Real-time event processing and streaming patterns in ADK
ADK's event streaming enables real-time interaction with agents, allowing your applications to process and display responses as they're generated rather than waiting for complete responses.
Understanding Streaming Events
Streaming events allow for progressive content delivery, improving user experience by showing partial results immediately.
Event Properties for Streaming
Key properties that control streaming behavior:
interface Event {
partial?: boolean; // True for incomplete streaming chunks
turnComplete?: boolean; // True when the agent's turn is finished
content?: any; // Contains the partial or complete content
// ... other properties
}
Basic Streaming Patterns
Simple Text Streaming
Handle streaming text responses:
async function handleStreamingResponse(runner, query, session) {
let accumulatedText = '';
for await (const event of runner.runAsync(query, session)) {
if (event.content?.parts?.[0]?.text) {
if (event.partial) {
// Partial chunk - accumulate and display
accumulatedText += event.content.parts[0].text;
displayPartialText(accumulatedText);
} else {
// Complete chunk
accumulatedText += event.content.parts[0].text;
if (event.isFinalResponse()) {
displayFinalText(accumulatedText);
accumulatedText = ''; // Reset for next response
}
}
}
}
}
Progressive UI Updates
import { useState, useEffect } from 'react';
function useEventStream(runner, query, session) {
const [streamingText, setStreamingText] = useState('');
const [events, setEvents] = useState<Event[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
useEffect(() => {
let currentText = '';
async function processStream() {
setIsStreaming(true);
try {
for await (const event of runner.runAsync(query, session)) {
setEvents(prev => [...prev, event]);
if (event.content?.parts?.[0]?.text) {
if (event.partial) {
currentText += event.content.parts[0].text;
setStreamingText(currentText);
} else if (event.isFinalResponse()) {
currentText += event.content.parts[0].text;
setStreamingText(currentText);
currentText = ''; // Reset for next response
}
}
}
} finally {
setIsStreaming(false);
}
}
processStream();
}, [query]);
return { streamingText, events, isStreaming };
}
class StreamingRenderer {
private container: HTMLElement;
private currentElement: HTMLElement | null = null;
constructor(containerId: string) {
this.container = document.getElementById(containerId)!;
}
async processStream(runner, query, session) {
let currentText = '';
for await (const event of runner.runAsync(query, session)) {
if (event.content?.parts?.[0]?.text) {
if (event.partial) {
currentText += event.content.parts[0].text;
this.updateCurrentElement(currentText);
} else if (event.isFinalResponse()) {
currentText += event.content.parts[0].text;
this.finalizeCurrentElement(currentText);
this.createNewElement();
currentText = '';
}
}
}
}
private updateCurrentElement(text: string) {
if (!this.currentElement) {
this.createNewElement();
}
this.currentElement!.textContent = text;
}
private createNewElement() {
this.currentElement = document.createElement('div');
this.currentElement.className = 'streaming-text';
this.container.appendChild(this.currentElement);
}
private finalizeCurrentElement(text: string) {
if (this.currentElement) {
this.currentElement.textContent = text;
this.currentElement.className = 'final-text';
}
}
}
import { ref, onMounted } from 'vue';
export function useEventStream(runner, query, session) {
const streamingText = ref('');
const events = ref<Event[]>([]);
const isStreaming = ref(false);
const processStream = async () => {
let currentText = '';
isStreaming.value = true;
try {
for await (const event of runner.runAsync(query, session)) {
events.value.push(event);
if (event.content?.parts?.[0]?.text) {
if (event.partial) {
currentText += event.content.parts[0].text;
streamingText.value = currentText;
} else if (event.isFinalResponse()) {
currentText += event.content.parts[0].text;
streamingText.value = currentText;
currentText = '';
}
}
}
} finally {
isStreaming.value = false;
}
};
onMounted(processStream);
return { streamingText, events, isStreaming };
}
Advanced Streaming Patterns
Multi-Stream Handling
Handle multiple concurrent streams:
class MultiStreamManager {
private streams = new Map<string, StreamState>();
interface StreamState {
text: string;
isActive: boolean;
lastUpdate: number;
}
async processEvent(event: Event) {
const streamId = event.invocationId;
if (!this.streams.has(streamId)) {
this.streams.set(streamId, {
text: '',
isActive: true,
lastUpdate: Date.now()
});
}
const stream = this.streams.get(streamId)!;
if (event.content?.parts?.[0]?.text) {
if (event.partial) {
stream.text += event.content.parts[0].text;
stream.lastUpdate = Date.now();
this.onStreamUpdate(streamId, stream.text);
} else if (event.isFinalResponse()) {
stream.text += event.content.parts[0].text;
stream.isActive = false;
this.onStreamComplete(streamId, stream.text);
}
}
}
private onStreamUpdate(streamId: string, text: string) {
// Update UI for specific stream
console.log(`Stream ${streamId} updated:`, text);
}
private onStreamComplete(streamId: string, text: string) {
// Finalize stream display
console.log(`Stream ${streamId} completed:`, text);
this.streams.delete(streamId);
}
}
Buffered Streaming
Implement buffering for smoother display:
class BufferedStreamRenderer {
private buffer: string[] = [];
private isDisplaying = false;
private displayInterval = 50; // ms
addChunk(text: string) {
this.buffer.push(text);
this.startDisplayLoop();
}
private startDisplayLoop() {
if (this.isDisplaying) return;
this.isDisplaying = true;
const displayNext = () => {
if (this.buffer.length > 0) {
const chunk = this.buffer.shift()!;
this.displayChunk(chunk);
setTimeout(displayNext, this.displayInterval);
} else {
this.isDisplaying = false;
}
};
displayNext();
}
private displayChunk(chunk: string) {
// Implement smooth character-by-character display
const element = document.getElementById('output');
if (element) {
element.textContent += chunk;
}
}
}
Event Queue Management
Priority Queue
Handle events with different priorities:
class PriorityEventQueue {
private queues = {
high: [] as Event[],
normal: [] as Event[],
low: [] as Event[]
};
enqueue(event: Event, priority: 'high' | 'normal' | 'low' = 'normal') {
this.queues[priority].push(event);
this.processNext();
}
private processNext() {
const event = this.dequeue();
if (event) {
this.handleEvent(event);
// Schedule next processing
setImmediate(() => this.processNext());
}
}
private dequeue(): Event | null {
if (this.queues.high.length > 0) {
return this.queues.high.shift()!;
}
if (this.queues.normal.length > 0) {
return this.queues.normal.shift()!;
}
if (this.queues.low.length > 0) {
return this.queues.low.shift()!;
}
return null;
}
private handleEvent(event: Event) {
// Process the event based on its type
if (event.partial) {
this.handleStreamingEvent(event);
} else if (event.isFinalResponse()) {
this.handleFinalEvent(event);
}
}
}
Real-Time Features
Live Status Updates
Show real-time processing status:
class LiveStatusManager {
private statusElement: HTMLElement;
private currentStatus = 'idle';
constructor(elementId: string) {
this.statusElement = document.getElementById(elementId)!;
}
updateFromEvent(event: Event) {
if (event.partial) {
this.setStatus('streaming', 'Receiving response...');
} else if (event.getFunctionCalls().length > 0) {
this.setStatus('processing', 'Using tools...');
} else if (event.getFunctionResponses().length > 0) {
this.setStatus('processing', 'Processing results...');
} else if (event.isFinalResponse()) {
this.setStatus('complete', 'Response complete');
setTimeout(() => this.setStatus('idle', 'Ready'), 2000);
}
}
private setStatus(status: string, message: string) {
this.currentStatus = status;
this.statusElement.textContent = message;
this.statusElement.className = `status ${status}`;
}
}
Typing Indicators
Show when agents are "thinking":
class TypingIndicator {
private element: HTMLElement;
private timer: NodeJS.Timeout | null = null;
constructor(elementId: string) {
this.element = document.getElementById(elementId)!;
}
show(agentName: string) {
this.element.textContent = `${agentName} is thinking...`;
this.element.style.display = 'block';
this.startAnimation();
}
hide() {
this.element.style.display = 'none';
this.stopAnimation();
}
private startAnimation() {
let dots = '';
this.timer = setInterval(() => {
dots = dots.length >= 3 ? '' : dots + '.';
this.element.textContent = this.element.textContent?.replace(/\.+$/, '') + dots;
}, 500);
}
private stopAnimation() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
}
Performance Optimization
Debounced Updates
Reduce UI update frequency:
function debounce<T extends (...args: any[]) => void>(
func: T,
delay: number
): (...args: Parameters<T>) => void {
let timeoutId: NodeJS.Timeout;
return (...args: Parameters<T>) => {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => func(...args), delay);
};
}
class OptimizedStreamRenderer {
private debouncedUpdate = debounce((text: string) => {
this.updateUI(text);
}, 16); // ~60fps
handleStreamingEvent(event: Event) {
if (event.partial && event.content?.parts?.[0]?.text) {
this.debouncedUpdate(event.content.parts[0].text);
}
}
private updateUI(text: string) {
// Expensive UI update operation
document.getElementById('output')!.textContent = text;
}
}
Memory Management
Handle large streams efficiently:
class MemoryEfficientStream {
private maxEvents = 1000;
private events: Event[] = [];
addEvent(event: Event) {
this.events.push(event);
// Keep only recent events
if (this.events.length > this.maxEvents) {
this.events = this.events.slice(-this.maxEvents);
}
}
getRecentEvents(count: number = 100): Event[] {
return this.events.slice(-count);
}
cleanup() {
this.events = [];
}
}
Error Handling in Streams
Stream Recovery
Handle stream interruptions gracefully:
async function robustStreamProcessing(runner, query, session) {
let retries = 0;
const maxRetries = 3;
while (retries < maxRetries) {
try {
for await (const event of runner.runAsync(query, session)) {
await processEvent(event);
}
break; // Success, exit retry loop
} catch (error) {
retries++;
console.warn(`Stream interrupted (attempt ${retries}):`, error);
if (retries < maxRetries) {
await new Promise(resolve => setTimeout(resolve, 1000 * retries));
} else {
console.error('Max retries exceeded, stream failed');
throw error;
}
}
}
}
When implementing streaming, always handle network interruptions and provide fallback mechanisms for critical functionality.
Event streaming enables responsive, real-time agent interactions that significantly improve user experience in ADK applications.