A comprehensive analysis of streaming and async patterns has been completed and documented in /Users/apple/Desktop/code/AI/gateway/litellm-rs/docs/analysis/streaming_async_analysis.md.
- Buffer Flush Missing on Stream End (HIGH) - Lines 294 in
src/core/providers/base/sse.rs - Carriage Return Not Trimmed (MEDIUM) - SSE parsing in multiple files
- Potential Busy Loop with Immediate Wake (MEDIUM) - Line 274 in
src/core/providers/base/sse.rs - Task Spawn Without Cancellation Handling (HIGH) - Line 51 in
src/core/streaming/handler.rs - String Buffer Reallocation (LOW) -
src/core/providers/databricks/streaming.rs - Missing Backpressure Handling (MEDIUM) -
src/core/streaming/handler.rs - OCI Stream Data Loss (MEDIUM) - Lines 193-208 in
src/core/providers/oci/streaming.rs
After line 222 (end of process_event method), add:
/// Flush any remaining buffered data
///
/// Call this when the stream ends to process any incomplete events
/// that don't end with a blank line.
pub fn flush(&mut self) -> Result<Vec<ChatChunk>, ProviderError> {
let mut chunks = Vec::new();
// Process any buffered incomplete line as a potential event
if !self.buffer.is_empty() {
// Trim trailing carriage returns that might have been left
let buffered = self.buffer.trim_end_matches('\r').trim_end_matches('\n');
if !buffered.is_empty() {
// Try to parse as a complete line
for line in buffered.lines() {
if let Some(chunk) = self.process_line(line)? {
chunks.push(chunk);
}
}
}
self.buffer.clear();
}
// Process any pending event
if let Some(event) = self.current_event.take() {
if let Some(chunk) = self.process_event(event)? {
chunks.push(chunk);
}
}
Ok(chunks)
}Replace line 326:
Poll::Ready(None) => Poll::Ready(None),With:
Poll::Ready(None) => {
// Flush any remaining buffered events before ending stream
match this.parser.flush() {
Ok(final_chunks) if !final_chunks.is_empty() => {
this.chunk_buffer.extend(final_chunks);
if let Some(chunk) = this.chunk_buffer.pop_front() {
Poll::Ready(Some(Ok(chunk)))
} else {
Poll::Ready(None)
}
}
Ok(_) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}Replace line 306:
cx.waker().wake_by_ref();With:
// Let runtime poll again naturallyReplace lines 160-162:
let line = buffer[..pos].trim().to_string();
buffer = buffer[pos + 1..].to_string();With:
let line = buffer[..pos].trim_end_matches('\r');
buffer.drain(..=pos); // Efficient in-place removalAnd update line 164 to use line instead of &line.
Replace lines 196-207 with:
if !self.buffer.is_empty() {
let remaining = std::mem::take(&mut self.buffer);
// Try to process as complete event
for line in remaining.lines() {
if let Some(data) = line.strip_prefix("data: ") {
let data = data.trim();
if data != "[DONE]" {
if let Some(result) = self.process_data(data) {
return Poll::Ready(Some(result));
}
}
}
}
}After applying fixes, run:
cargo test --all-features -- streamingfix(streaming): improve async and streaming patterns
Critical fixes:
- Add flush() method to UnifiedSSEParser to process remaining buffered data
- Call flush() when inner stream ends to prevent data loss
- Remove immediate wake to prevent busy loop
- Fix buffer management in Databricks stream (use drain instead of reallocation)
- Fix OCI stream to use mem::take for efficient buffer handling
- Trim carriage returns for Windows compatibility
Medium priority fixes:
- Improve error handling in stream end scenarios
- Add proper cleanup for incomplete events
Issues addressed:
- Buffer flush missing on stream end (HIGH)
- Potential busy loop with immediate wake (MEDIUM)
- String buffer reallocation (LOW)
- OCI stream data loss (MEDIUM)
Signed-off-by: majiayu000 <1835304752@qq.com>
docs/analysis/streaming_async_analysis.md(new)src/core/providers/base/sse.rssrc/core/providers/databricks/streaming.rssrc/core/providers/oci/streaming.rs
- Apply the code fixes listed above
- Run
cargo test --all-features -- streamingto verify - Run
cargo clippy --all-targets --all-featuresto check for warnings - Commit with the message above
- Consider adding integration tests for:
- Stream end with partial data
- Windows line endings (\r\n)
- Client disconnection scenarios