Skip to content

Streaming

Streaming is the pipeline that delivers tokens from the API to the user’s screen in real-time. It involves SSE parsing, token accumulation, markdown rendering, tool call detection, and display throttling. Each reference tool takes a different approach: Aider uses a Python generator with Rich markdown rendering, Codex uses a Rust async stream with newline-gated buffering, and OpenCode uses the Vercel AI SDK’s event-driven model with database-backed persistence.

For reasoning-token stream semantics, see Reasoning & Thinking Tokens. For overflow and truncation pressure caused by long streams, see Token Budgeting and Chat History. For the render-layer details beneath this pipeline, see Markdown Rendering.


Pinned commit: b9050e1d

Aider’s streaming is a synchronous Python generator pipeline: the litellm completion returns an iterable, and Aider iterates over it, accumulating content and rendering markdown to the terminal.

Entry: send() at aider/coders/base_coder.py:1783

def send(self, messages, model=None, functions=None):
self.partial_response_content = ""
self.partial_response_function_call = dict()
completion = model.send_completion(messages, functions, self.stream, self.temperature)
if self.stream:
yield from self.show_send_output_stream(completion)
else:
self.show_send_output(completion)

model.send_completion() returns a streaming completion object when self.stream is True. This is a generator that yields OpenAI-compatible chunks.

Token Accumulation: show_send_output_stream()

Section titled “Token Accumulation: show_send_output_stream()”

File: aider/coders/base_coder.py:1900

Iterates over the completion generator:

for chunk in completion:
# Extract text delta
text = chunk.choices[0].delta.content
if text:
self.partial_response_content += text
# Extract function call delta (for function-calling coders)
func = chunk.choices[0].delta.function_call
if func:
for k, v in func.items():
if k in self.partial_response_function_call:
self.partial_response_function_call[k] += v
else:
self.partial_response_function_call[k] = v
# Extract reasoning content (thinking tags)
# ...
# Render incrementally
self.live_incremental_response(False)
# Check for finish_reason == "length" (context exhausted)
if chunk.choices[0].finish_reason == "length":
raise FinishReasonLength()

File: aider/mdstream.py:92

The MarkdownStream class manages real-time markdown display using Rich’s Live display:

Key state:

  • self.printed — lines already emitted to console scrollback (stable)
  • self.live — Rich Live instance for animated window
  • self.live_window = 6 — number of lines kept in the animated window

Update logic (update() at line 149):

  1. On first call, start Rich.Live with 1.0 / min_delay refresh rate (20 FPS default)
  2. Throttle updates to maintain smooth rendering (line 174-175)
  3. Measure render time and adjust min_delay dynamically (line 179-184): min_delay = max(0.05, min(2.0, render_time * 10))
  4. Render the full accumulated markdown buffer through Rich’s markdown parser
  5. Split output into “stable” (older lines) and “unstable” (last 6 lines)
  6. Emit stable lines to console.print() — these go to terminal scrollback and cannot change
  7. Update the Live window with unstable lines — these are re-rendered each frame

The 6-line live window is the key insight: markdown rendering can change earlier output as new tokens arrive (e.g., a heading becomes a code block fence), so only the last few lines are kept “live” for re-rendering. Lines above the window are committed permanently.

show_send_output() (line 1836) handles non-streaming responses by extracting the full content, function calls, and reasoning content from the completed response object in one pass.

self.partial_response_content = "" # accumulated text
self.partial_response_function_call = {} # accumulated function call
self.got_reasoning_content = False # seen <think> tag
self.ended_reasoning_content = False # seen </think> tag

Pinned commit: 4ab44e2c5

Codex streams from the OpenAI Responses API via SSE, processes events in a tokio async loop, and renders to the TUI using a newline-gated buffering system.

File: codex-rs/core/src/codex.rs:5505

try_run_sampling_request() calls client_session.stream() to get an async SSE stream, then processes events in a loop:

while let Some(event) = stream.next().await {
match event {
ResponseEvent::Created => { /* no-op */ }
ResponseEvent::OutputItemAdded(item) => {
// Emit TurnItemStarted
}
ResponseEvent::OutputTextDelta(delta) => {
// Emit AgentMessageContentDelta
}
ResponseEvent::OutputItemDone(item) => {
// Tool call extraction and dispatch
if let Some(tool_call) = extract_tool_call(&item) {
let future = tool_runtime.handle_tool_call(tool_call, cancel.clone());
in_flight.push_back(future);
}
}
ResponseEvent::Completed { usage, .. } => {
// Update token tracking, break
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
// Emit reasoning event
}
}
}

Tool calls discovered in OutputItemDone events are dispatched immediately via ToolCallRuntime, with futures collected in a FuturesOrdered. After the stream completes, drain_in_flight() waits for all tool results.

File: codex-rs/tui/src/markdown_stream.rs:7

pub(crate) struct MarkdownStreamCollector {
buffer: String,
committed_line_count: usize,
width: Option<usize>,
}

The core innovation: tokens are accumulated in a buffer, but lines are only “committed” (rendered and emitted to the TUI) when a newline character is encountered.

push_delta(delta: &str) — appends to the buffer string.

commit_complete_lines() (line 35):

  1. Find the last newline position in buffer
  2. If no newline exists, return empty (nothing to commit)
  3. Render the entire buffer up to the newline as markdown
  4. Count logical rendered lines
  5. Return only newly completed lines (lines after self.committed_line_count)
  6. Increment committed_line_count

This prevents partial markdown from reaching the display. A header like ## He will not render until the full line ## Hello World\n is received.

File: codex-rs/tui/src/streaming/controller.rs:14

pub(crate) struct StreamController {
state: StreamState,
finishing_after_drain: bool,
header_emitted: bool,
}

The controller manages the flow from committed lines to TUI cells:

push(delta) (line 30):

  • Appends delta to the collector
  • If the delta contains \n: commit complete lines, enqueue them with timestamps
  • Returns true if new lines are available for display

on_commit_tick() (line 71):

  • Pops one line from the front of the queue
  • Converts it to an AgentMessageCell (ratatui Line)
  • Returns (Option<HistoryCell>, bool idle)
  • Called once per frame — rate-limits output to the terminal refresh rate

finalize() (line 47):

  • Flushes remaining content (adds temporary \n if needed)
  • Drains the entire queue
  • Emits final cell

File: codex-rs/tui/src/streaming/mod.rs:29

pub(crate) struct StreamState {
pub(crate) collector: MarkdownStreamCollector,
queued_lines: VecDeque<QueuedLine>,
pub(crate) has_seen_delta: bool,
}

QueuedLine includes an enqueued_at: Instant timestamp, enabling age-based drain policies for adaptive rendering.

API SSE delta "Hello" -> push("Hello") -> no newline -> no display
API SSE delta " world" -> push(" world") -> no newline -> no display
API SSE delta "!\n" -> push("!\n") -> newline found -> commit_complete_lines()
-> renders "Hello world!" as markdown -> enqueue
Next render tick -> on_commit_tick() -> dequeue one line -> display

Pinned commit: 7ed449974

OpenCode uses the Vercel AI SDK’s streamText() which provides a standardized event-driven stream across all providers. Events are processed in a SessionProcessor and persisted to SQLite in real-time.

File: packages/opencode/src/session/llm.ts:176

const result = streamText({
model: languageModel,
system: systemPrompts,
messages: convertedMessages,
tools: resolvedTools,
temperature, topP, topK,
maxOutputTokens,
abortSignal,
// ...
})

The AI SDK handles SSE parsing, provider-specific format translation, and tool call accumulation internally. It exposes a result.fullStream async iterable that yields typed events.

File: packages/opencode/src/session/processor.ts:55

The processor iterates stream.fullStream:

EventProcessing
startSet SessionStatus to “busy”
text-startCreate TextPart, persist via Session.updatePart()
text-deltaAccumulate: currentText.text += value.text; emit Session.updatePartDelta()
text-endFinalize text part, fire plugin hook "experimental.text.complete"
reasoning-startCreate ReasoningPart with timestamps
reasoning-deltaAccumulate reasoning text, emit delta
reasoning-endFinalize reasoning part
tool-input-startCreate ToolPart with status “pending”
tool-callTransition to “running”, check doom loop guard
tool-resultTransition to “completed” with output, timing, metadata
tool-errorTransition to “error”, check for permission rejection
start-stepReset step state
finish-stepRecord finish reason, update token usage, check overflow
errorThrow through
finishNo-op (handled by finish-step)

File: packages/opencode/src/session/message-v2.ts:465

Text deltas are broadcast via the event bus:

PartDelta: BusEvent.define("message.part.delta", z.object({
sessionID: z.string(),
messageID: z.string(),
partID: z.string(),
field: z.string(),
delta: z.string(),
}))

The TUI subscribes to "message.part.delta" events and renders them incrementally. The HTTP/SSE server also subscribes and forwards to web clients.

At finish-step (line 244), the processor:

  1. Records finishReason on the assistant message
  2. Updates token usage via Session.getUsage() (computes cumulative from provider response)
  3. Captures a snapshot patch if files were modified
  4. Checks SessionCompaction.isOverflow() and sets needsCompaction = true if exceeded

File: packages/opencode/src/session/retry.ts

The processor wraps stream iteration in a try/catch. On retryable errors:

  1. Check SessionRetry.retryable(error) — respects isRetryable flag and rate limit headers
  2. Compute delay: SessionRetry.delay(attempt, error) — exponential backoff (2s base, 2x factor, 30s cap) or retry-after header
  3. Set SessionStatus to “retry” with attempt count and next retry timestamp
  4. Sleep and retry

Non-retryable errors (context overflow, auth errors) break the loop immediately.

Most stream state transitions are persisted with Session.updatePart() (an upsert on PartTable) and finalized with Session.updateMessage():

  • Session.updatePart() — creates or updates text/reasoning/tool/step/patch parts
  • Session.updatePartDelta() — emits message.part.delta on the bus only (no DB write)
  • Session.updateMessage() — writes assistant message state (finish/error/tokens/cost)

This gives durable turn state while still keeping high-frequency token deltas off the database write path.


Aider’s 6-line live window and Codex’s newline-gating both solve the same problem: partial markdown tokens can produce invalid rendering. A half-received ## Hel renders as a heading in markdown, but becomes body text when completed as ## Hello World\n\nSome text. Aider solves this by keeping recent lines in a re-renderable Live window. Codex solves it by never committing incomplete lines. OpenCode delegates to the TUI’s markdown renderer, which handles partial content gracefully.

Rendering every token at 60 FPS is wasteful and can cause UI flickering. Aider dynamically adjusts its throttle based on measured render time (line 179-184 of mdstream.py). Codex rate-limits output to one line per frame tick via on_commit_tick(). OpenCode relies on the event bus and React/Solid reconciliation to batch updates.

Tool call arguments arrive as fragments across multiple tool-input-delta events. All three tools accumulate these fragments before dispatching:

  • Aider: self.partial_response_function_call[k] += v
  • Codex: Accumulated in the SSE parser before OutputItemDone is emitted
  • OpenCode: AI SDK handles accumulation internally, emits complete tool-call event

If the SSE connection drops mid-stream:

  • Aider: Python exception propagates to retry loop, exponential backoff, re-sends entire prompt
  • Codex: ResponseStreamFailed error classified as retryable, can fall back from WebSocket to HTTPS transport
  • OpenCode: APIError with isRetryable: true, respects retry-after headers

All three restart from scratch — none support resuming a partial stream.

Token usage is only available after the stream completes (the Completed / finish-step event includes it). During streaming, tools must rely on heuristic estimates (Codex: 4 bytes/token, OpenCode: 4 chars/token, Aider: litellm estimate). This means overflow detection is always slightly delayed.

OpenCode writes every tool state transition to SQLite during streaming. Under heavy tool use (many grep/read calls), this creates write pressure. WAL mode helps, but the per-event write pattern is inherently heavier than Aider’s in-memory accumulation or Codex’s deferred JSONL persistence.


Architecture: Async Stream with Event Emission

Section titled “Architecture: Async Stream with Event Emission”

OpenOxide combines Codex’s newline-gated buffering with OpenCode’s event-driven broadcasting:

pub struct StreamProcessor {
collector: MarkdownStreamCollector, // from Codex
tx_event: Sender<StreamEvent>, // from OpenCode
token_accumulator: String,
tool_calls: HashMap<String, ToolCallState>,
}

Use reqwest-eventsource or a custom SSE parser to consume the API stream. Events are parsed into a ResponseEvent enum following Codex’s pattern. The parser handles reconnection for transient network errors.

Adopt Codex’s MarkdownStreamCollector pattern:

  1. Accumulate text deltas in a buffer
  2. On newline: render accumulated markdown, emit completed lines
  3. On finalize: flush remaining content

This prevents partial markdown from reaching the display.

Stream events are emitted to an async broadcast channel:

pub enum StreamEvent {
TextDelta { part_id: String, delta: String },
TextComplete { part_id: String, text: String },
ReasoningDelta { part_id: String, delta: String },
ToolCallStart { call_id: String, tool_name: String },
ToolCallResult { call_id: String, output: ToolOutput },
ToolCallError { call_id: String, error: String },
TokenUsage { input: u32, output: u32, cached: u32 },
Finished { reason: FinishReason },
}

The TUI subscribes and renders. The MCP server subscribes and forwards to clients. The session store subscribes and persists. Each consumer operates independently.

The TUI uses a frame-rate limiter (60 FPS cap) and drains one committed line per tick from the queue, following Codex’s on_commit_tick() pattern. This prevents render thrashing while maintaining perceived responsiveness.

Tool call arguments are accumulated internally by the SSE parser. Only complete tool calls (after the OutputItemDone equivalent) are dispatched to the tool registry.

CrateResponsibility
openoxide-streamSSE parser, ResponseEvent enum, StreamProcessor
openoxide-markdownMarkdownStreamCollector, newline-gated buffering
openoxide-tuiStreamController, frame-rate limiter, widget rendering
openoxide-providerAPI client, transport (HTTPS/WebSocket), retry logic
  1. Newline-gated buffering. Prevents partial markdown rendering artifacts.
  2. Event broadcast channel. Decouples streaming from display — TUI, MCP, and persistence are independent subscribers.
  3. Frame-rate limiter. One line per tick prevents render thrashing.
  4. No mid-stream persistence. Unlike OpenCode, persist only at turn boundaries (completed messages and tool results). Reduces write pressure, accepts that a crash mid-stream loses the partial turn.
  5. Transport fallback. WebSocket preferred for lower latency, HTTPS SSE as fallback.