Event Loop
Understanding the cooperative async generator pattern and event flow
At the heart of the ADK Runtime is an Event Loop that facilitates communication between the Runner component and your execution logic (agents, tools, callbacks).
Event-Driven Architecture
The Runtime operates on a cooperative async generator pattern where components communicate through events:
- Runner receives user query and initiates agent processing
- Agent executes logic until it has something to report (response, tool call, state change)
- Event yielded by agent as part of async generator
- Runner processes event, commits changes via services, forwards event upstream
- Agent resumes execution after Runner completes event processing
- Cycle repeats until agent has no more events for the current query
Event Processing Order
Agent execution pauses after yielding events until the Runner completes processing. This ensures state consistency and proper event ordering.
Execution Flow
Runner Role (Orchestrator)
The Runner acts as the central coordinator for a single user invocation. Its responsibilities in the loop are:
- Initiation: Receives the end user's query (new_message) and typically appends it to the session history via the SessionService.
- Kick-off: Starts the event generation process by calling the main agent's execution method (e.g., agent_to_run.run_async(...)).
- Receive & Process: Waits for the agent logic to yield or emit an Event. Upon receiving an event, the Runner promptly processes it. This involves:
- Using configured Services (SessionService, ArtifactService, MemoryService) to commit changes indicated in event.actions (like state_delta, artifact_delta).
- Performing other internal bookkeeping.
- Yield Upstream: Forwards the processed event onwards (e.g., to the calling application or UI for rendering).
- Iterate: Signals the agent logic that processing is complete for the yielded event, allowing it to resume and generate the next event.
// Simplified view of Runner's main loop logic
async function* runAsync(userId: string, sessionId: string, newMessage: Content): AsyncGenerator<Event> {
// 1. Get or create session
const session = await sessionService.getSession(appName, userId, sessionId);
// 2. Create invocation context
const invocationContext = new InvocationContext({ /* ... */ });
// 3. Append new message to session (if provided)
if (newMessage) {
await appendNewMessageToSession(session, newMessage, invocationContext);
}
// 4. Find the agent to run for this session
const agentToRun = findAgentToRun(session, rootAgent);
// 5. Get agent's async generator
const agentGenerator = agentToRun.runAsync(invocationContext);
// 6. Process events from agent
for await (const event of agentGenerator) {
// 7. For non-partial events, commit changes to session and memory
if (!event.partial) {
await sessionService.appendEvent(session, event);
if (memoryService) {
await memoryService.addSessionToMemory(session);
}
}
// 8. Yield event for upstream processing (e.g., UI rendering)
yield event;
}
}Execution Logic Role (Agent, Tool, Callback)
Your code within agents, tools, and callbacks is responsible for the actual computation and decision-making. Its interaction with the loop involves:
- Execute: Runs its logic based on the current InvocationContext, including the session state as it was when execution resumed.
- Yield: When the logic needs to communicate (send a message, call a tool, report a state change), it constructs an Event containing the relevant content and actions, and then yields this event back to the Runner.
- Pause: Crucially, execution of the agent logic pauses immediately after the yield statement. It waits for the Runner to complete step 3 (processing and committing).
- Resume: Only after the Runner has processed the yielded event does the agent logic resume execution from the statement immediately following the yield.
- See Updated State: Upon resumption, the agent logic can now reliably access the session state (ctx.session.state) reflecting the changes that were committed by the Runner from the previously yielded event.
// Simplified view of logic inside Agent.run_async, callbacks, or tools
// ... previous code runs based on current state ...
// 1. Determine a change or output is needed, construct the event
// Example: Updating state
const updateData = { 'field_1': 'value_2' };
const eventWithStateChange = new Event({
author: this.name,
actions: new EventActions({ stateDelta: updateData }),
content: { parts: [{ text: "State updated." }] }
// ... other event fields ...
});
// 2. Yield the event to the Runner for processing & commit
yield eventWithStateChange;
// <<<<<<<<<<<< EXECUTION PAUSES HERE >>>>>>>>>>>>
// <<<<<<<<<<<< RUNNER PROCESSES & COMMITS THE EVENT >>>>>>>>>>>>
// 3. Resume execution ONLY after Runner is done processing the above event.
// Now, the state committed by the Runner is reliably reflected.
// Subsequent code can safely assume the change from the yielded event happened.
const val = ctx.session.state['field_1'];
// here `val` is guaranteed to be "value_2" (assuming Runner committed successfully)
console.log(`Resumed execution. Value of field_1 is now: ${val}`);
// ... subsequent code continues ...
// Maybe yield another event later...Event Types and Processing
Content Events
Events containing user messages or agent responses:
// User message event (generated by Runner)
const userEvent = new Event({
invocationId: "inv_123",
author: "user",
content: {
parts: [{ text: "Hello, how are you?" }]
}
});
// Agent response event (generated by Agent)
const agentEvent = new Event({
invocationId: "inv_123",
author: "my_agent",
content: {
parts: [{ text: "I'm doing well, thank you!" }]
}
});Function Call Events
Events requesting tool execution:
// Function call event
const functionCallEvent = new Event({
invocationId: "inv_123",
author: "my_agent",
content: {
parts: [{
functionCall: {
name: "search_web",
args: { query: "latest AI news" }
}
}]
}
});
// Function response event
const functionResponseEvent = new Event({
invocationId: "inv_123",
author: "my_agent",
content: {
parts: [{
functionResponse: {
name: "search_web",
response: { results: [...] }
}
}]
}
});State Change Events
Events that modify session state:
import { EventActions, Event } from '@iqai/adk';
const stateChangeEvent = new Event({
invocationId: "inv_123",
author: "my_agent",
actions: new EventActions({
stateDelta: {
"user_preference": "dark_mode",
"session_count": 5
}
})
});Async Generator Pattern
Agent Implementation
Agents implement the async generator pattern in their runAsyncImpl method:
import { BaseAgent, InvocationContext, Event } from '@iqai/adk';
class CustomAgent extends BaseAgent {
protected async *runAsyncImpl(
ctx: InvocationContext
): AsyncGenerator<Event, void, unknown> {
// Step 1: Analyze user input
yield new Event({
invocationId: ctx.invocationId,
author: this.name,
content: { parts: [{ text: "Let me think about this..." }] }
});
// Step 2: Perform some computation
const result = await this.processUserQuery(ctx.userContent);
// Step 3: Yield final response
yield new Event({
invocationId: ctx.invocationId,
author: this.name,
content: { parts: [{ text: result }] }
});
}
}Runner Processing
The Runner processes each yielded event:
// Simplified Runner event processing
for await (const event of agent.runAsync(invocationContext)) {
// 1. Validate event
if (!event.partial) {
// 2. Persist to session
await this.sessionService.appendEvent(session, event);
// 3. Apply state changes
if (event.actions?.stateDelta) {
// Apply state changes through service
}
// 4. Handle artifacts
if (event.actions?.artifactDelta) {
// Process artifact changes
}
}
// 5. Forward upstream
yield event;
}Event Processing Guarantees
Ordering Guarantees
- Events are processed in the order they are yielded
- State changes are applied atomically per event
- No race conditions between event processing
State Consistency
- Agent execution pauses until Runner processes events
- State changes are committed before agent resumes
- Rollback support for failed operations
Error Handling
- Exceptions in event processing are propagated to agents
- Partial state changes are rolled back on failure
- Graceful degradation for service failures
Streaming and Partial Events
Partial Events
For real-time user experience, agents can yield partial events:
// Streaming response
yield new Event({
invocationId: ctx.invocationId,
author: this.name,
content: { parts: [{ text: "Thinking" }] },
partial: true // Not persisted to session
});
yield new Event({
invocationId: ctx.invocationId,
author: this.name,
content: { parts: [{ text: "Thinking..." }] },
partial: true
});
// Final response
yield new Event({
invocationId: ctx.invocationId,
author: this.name,
content: { parts: [{ text: "Here's my complete response" }] },
partial: false // Persisted to session
});How is this guide?