Event Patterns
Common architectural patterns and best practices for event-driven ADK applications
This guide covers common architectural patterns for building robust, scalable event-driven applications with ADK.
Architectural Patterns
Event Sourcing
Store all application state changes as a sequence of events:
class EventStore {
private events: Event[] = [];
private snapshots = new Map<string, any>();
append(event: Event) {
this.events.push(event);
this.updateSnapshot(event);
}
getEventHistory(sessionId: string): Event[] {
return this.events.filter(e =>
e.invocationId === sessionId ||
e.content?.sessionId === sessionId
);
}
rebuildState(sessionId: string): Record<string, any> {
const state: Record<string, any> = {};
const events = this.getEventHistory(sessionId);
for (const event of events) {
if (event.actions?.stateDelta) {
Object.assign(state, event.actions.stateDelta);
}
}
return state;
}
private updateSnapshot(event: Event) {
if (event.actions?.stateDelta) {
const sessionId = event.invocationId;
const currentSnapshot = this.snapshots.get(sessionId) || {};
this.snapshots.set(sessionId, {
...currentSnapshot,
...event.actions.stateDelta
});
}
}
}
Command Query Responsibility Segregation (CQRS)
Separate event writing from reading:
interface Command {
type: string;
payload: any;
sessionId: string;
}
class CommandHandler {
constructor(private eventStore: EventStore) {}
async execute(command: Command): Promise<Event[]> {
const events: Event[] = [];
switch (command.type) {
case 'USER_MESSAGE':
events.push(this.createUserMessageEvent(command));
break;
case 'AGENT_RESPONSE':
events.push(this.createAgentResponseEvent(command));
break;
case 'STATE_UPDATE':
events.push(this.createStateUpdateEvent(command));
break;
}
for (const event of events) {
this.eventStore.append(event);
}
return events;
}
private createUserMessageEvent(command: Command): Event {
return new Event({
author: 'user',
invocationId: command.sessionId,
content: { parts: [{ text: command.payload.message }] }
});
}
}
class QueryHandler {
constructor(private eventStore: EventStore) {}
getConversationHistory(sessionId: string): Event[] {
return this.eventStore.getEventHistory(sessionId)
.filter(event => event.isFinalResponse());
}
getSessionState(sessionId: string): Record<string, any> {
return this.eventStore.rebuildState(sessionId);
}
getEventsByTimeRange(start: Date, end: Date): Event[] {
const startTimestamp = start.getTime() / 1000;
const endTimestamp = end.getTime() / 1000;
return this.eventStore.getEventHistory('')
.filter(event =>
event.timestamp >= startTimestamp &&
event.timestamp <= endTimestamp
);
}
getEventStatistics(sessionId: string) {
const events = this.eventStore.getEventHistory(sessionId);
return {
totalEvents: events.length,
messageCount: events.filter(e => e.content?.parts?.[0]?.text).length,
toolCalls: events.reduce((sum, e) => sum + e.getFunctionCalls().length, 0),
averageResponseTime: this.calculateAverageResponseTime(events)
};
}
}
class EventBus {
private subscribers = new Map<string, Array<(event: Event) => void>>();
subscribe(eventType: string, handler: (event: Event) => void) {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(handler);
}
publish(event: Event) {
const eventType = this.getEventType(event);
const handlers = this.subscribers.get(eventType) || [];
handlers.forEach(handler => {
try {
handler(event);
} catch (error) {
console.error('Event handler error:', error);
}
});
}
private getEventType(event: Event): string {
if (event.author === 'user') return 'USER_EVENT';
if (event.getFunctionCalls().length > 0) return 'TOOL_REQUEST';
if (event.getFunctionResponses().length > 0) return 'TOOL_RESPONSE';
if (event.isFinalResponse()) return 'FINAL_RESPONSE';
return 'GENERAL_EVENT';
}
}
State Management Patterns
State Machine
Manage complex state transitions:
interface StateTransition {
from: string;
to: string;
event: string;
condition?: (event: Event) => boolean;
}
class StateMachine {
private currentState: string;
private transitions: StateTransition[] = [];
constructor(initialState: string) {
this.currentState = initialState;
this.defineTransitions();
}
private defineTransitions() {
this.transitions = [
{ from: 'idle', to: 'processing', event: 'USER_MESSAGE' },
{ from: 'processing', to: 'waiting_tool', event: 'TOOL_REQUEST' },
{ from: 'waiting_tool', to: 'processing', event: 'TOOL_RESPONSE' },
{ from: 'processing', to: 'complete', event: 'FINAL_RESPONSE' },
{ from: 'complete', to: 'idle', event: 'USER_MESSAGE' },
{
from: 'processing',
to: 'error',
event: 'ERROR',
condition: (event) => !!event.errorCode
}
];
}
processEvent(event: Event): string {
const eventType = this.getEventType(event);
const transition = this.transitions.find(t =>
t.from === this.currentState &&
t.event === eventType &&
(!t.condition || t.condition(event))
);
if (transition) {
console.log(`State transition: ${this.currentState} → ${transition.to}`);
this.currentState = transition.to;
this.onStateChange(transition.to, event);
}
return this.currentState;
}
private onStateChange(newState: string, event: Event) {
// Handle state-specific logic
switch (newState) {
case 'processing':
this.startProcessingIndicator();
break;
case 'waiting_tool':
this.showToolExecutionStatus();
break;
case 'complete':
this.hideLoadingIndicators();
break;
case 'error':
this.showErrorMessage(event.errorMessage || 'Unknown error');
break;
}
}
}
Saga Pattern
Manage complex, long-running workflows:
class ConversationSaga {
private steps: SagaStep[] = [];
private currentStep = 0;
private compensations: Array<() => Promise<void>> = [];
async execute(events: AsyncGenerator<Event>): Promise<void> {
try {
for await (const event of events) {
await this.processStep(event);
}
} catch (error) {
await this.compensate();
throw error;
}
}
private async processStep(event: Event) {
const step = this.steps[this.currentStep];
if (!step) return;
try {
const result = await step.execute(event);
if (result.shouldProceed) {
this.currentStep++;
}
// Add compensation for this step
if (step.compensate) {
this.compensations.push(() => step.compensate!(result));
}
} catch (error) {
throw new SagaError(`Step ${this.currentStep} failed`, error);
}
}
private async compensate() {
console.log('Executing saga compensation');
// Execute compensations in reverse order
for (const compensation of this.compensations.reverse()) {
try {
await compensation();
} catch (error) {
console.error('Compensation failed:', error);
}
}
}
}
Observer Patterns
Event Listeners
Implement reactive behaviors:
class EventObserver {
private listeners = new Map<string, Set<EventListener>>();
interface EventListener {
predicate: (event: Event) => boolean;
handler: (event: Event) => void | Promise<void>;
once?: boolean;
}
addListener(
name: string,
predicate: (event: Event) => boolean,
handler: (event: Event) => void | Promise<void>,
once = false
) {
if (!this.listeners.has(name)) {
this.listeners.set(name, new Set());
}
this.listeners.get(name)!.add({
predicate,
handler,
once
});
}
async notify(event: Event) {
for (const [name, listenerSet] of this.listeners) {
const listenersToRemove: EventListener[] = [];
for (const listener of listenerSet) {
if (listener.predicate(event)) {
try {
await listener.handler(event);
} catch (error) {
console.error(`Listener ${name} error:`, error);
}
if (listener.once) {
listenersToRemove.push(listener);
}
}
}
// Remove one-time listeners
listenersToRemove.forEach(listener => listenerSet.delete(listener));
}
}
}
// Usage example
const observer = new EventObserver();
observer.addListener(
'userMessages',
(event) => event.author === 'user',
(event) => console.log('User said:', event.content?.parts?.[0]?.text)
);
observer.addListener(
'toolCalls',
(event) => event.getFunctionCalls().length > 0,
(event) => logToolUsage(event.getFunctionCalls())
);
Reactive Streams
Create reactive event processing pipelines:
class EventStream {
private transforms: Array<(event: Event) => Event | null> = [];
private filters: Array<(event: Event) => boolean> = [];
private subscribers: Array<(event: Event) => void> = [];
filter(predicate: (event: Event) => boolean): EventStream {
this.filters.push(predicate);
return this;
}
map(transform: (event: Event) => Event): EventStream {
this.transforms.push(transform);
return this;
}
subscribe(handler: (event: Event) => void): () => void {
this.subscribers.push(handler);
// Return unsubscribe function
return () => {
const index = this.subscribers.indexOf(handler);
if (index > -1) {
this.subscribers.splice(index, 1);
}
};
}
push(event: Event) {
// Apply filters
for (const filter of this.filters) {
if (!filter(event)) {
return; // Event filtered out
}
}
// Apply transforms
let transformedEvent = event;
for (const transform of this.transforms) {
const result = transform(transformedEvent);
if (result === null) return; // Transform filtered out event
transformedEvent = result;
}
// Notify subscribers
this.subscribers.forEach(subscriber => {
try {
subscriber(transformedEvent);
} catch (error) {
console.error('Stream subscriber error:', error);
}
});
}
}
// Usage example
const userMessageStream = new EventStream()
.filter(event => event.author === 'user')
.map(event => ({
...event,
content: {
...event.content,
timestamp: Date.now()
}
}))
.subscribe(event => processUserMessage(event));
Error Handling Patterns
Circuit Breaker
Prevent cascade failures:
class EventCircuitBreaker {
private failures = 0;
private lastFailure = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private maxFailures = 5,
private timeout = 60000 // 1 minute
) {}
async execute(event: Event, handler: (event: Event) => Promise<any>) {
if (this.state === 'open') {
if (Date.now() - this.lastFailure > this.timeout) {
this.state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await handler(event);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failures = 0;
this.state = 'closed';
}
private onFailure() {
this.failures++;
this.lastFailure = Date.now();
if (this.failures >= this.maxFailures) {
this.state = 'open';
}
}
}
Retry with Backoff
Handle transient failures:
class RetryHandler {
async withRetry<T>(
operation: () => Promise<T>,
maxRetries = 3,
baseDelay = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
if (attempt === maxRetries) {
break; // Don't delay on last attempt
}
const delay = this.calculateDelay(attempt, baseDelay);
console.warn(`Attempt ${attempt + 1} failed, retrying in ${delay}ms:`, error);
await this.delay(delay);
}
}
throw new Error(`Operation failed after ${maxRetries + 1} attempts: ${lastError!.message}`);
}
private calculateDelay(attempt: number, baseDelay: number): number {
// Exponential backoff with jitter
const exponentialDelay = baseDelay * Math.pow(2, attempt);
const jitter = Math.random() * 0.1 * exponentialDelay;
return exponentialDelay + jitter;
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Performance Patterns
Event Batching
Process events in batches for efficiency:
class EventBatcher {
private batch: Event[] = [];
private timer: NodeJS.Timeout | null = null;
constructor(
private batchSize = 10,
private flushInterval = 100, // ms
private processor: (events: Event[]) => Promise<void>
) {}
add(event: Event) {
this.batch.push(event);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
private async flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const eventsToProcess = [...this.batch];
this.batch = [];
try {
await this.processor(eventsToProcess);
} catch (error) {
console.error('Batch processing error:', error);
// Optionally re-queue failed events
}
}
}
Event Deduplication
Prevent duplicate event processing:
class EventDeduplicator {
private processedEvents = new Set<string>();
private cleanupInterval: NodeJS.Timeout;
constructor(private ttl = 300000) { // 5 minutes
// Periodic cleanup of old event IDs
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, ttl);
}
isDuplicate(event: Event): boolean {
const key = this.getEventKey(event);
return this.processedEvents.has(key);
}
markProcessed(event: Event) {
const key = this.getEventKey(event);
this.processedEvents.add(key);
}
private getEventKey(event: Event): string {
// Create unique key based on content and context
return `${event.id}-${event.timestamp}-${event.author}`;
}
private cleanup() {
// Simple cleanup - in production, use time-based expiry
if (this.processedEvents.size > 10000) {
this.processedEvents.clear();
}
}
destroy() {
clearInterval(this.cleanupInterval);
}
}
Best Practices
Event Design
- Use past tense: Events represent things that happened
- Include context: Provide enough information for processing
- Keep events immutable: Never modify events after creation
- Version events: Plan for schema evolution
Error Handling
- Graceful degradation: Continue processing other events when one fails
- Idempotent handlers: Design handlers to be safely retried
- Dead letter queues: Store failed events for analysis
Performance
- Filter early: Apply filters before expensive operations
- Batch when possible: Group related operations
- Use appropriate data structures: Choose efficient storage for your access patterns
These patterns provide building blocks for sophisticated event-driven architectures. Choose patterns that fit your specific use case and complexity requirements.
Event patterns enable you to build maintainable, scalable agent applications that can handle complex workflows and provide excellent user experiences.