Skip to content

Streaming Guide

Learn how to implement real-time streaming responses with the DeepSeek API for improved user experience and responsiveness.

Overview

Streaming allows you to receive partial responses as they're generated, providing:

  • Real-time feedback: Users see responses as they're generated
  • Improved perceived performance: Faster time to first token
  • Better user experience: Progressive content loading
  • Reduced latency: Start processing partial responses immediately
  • Interactive applications: Build chat interfaces and real-time tools

Basic Streaming

Simple Streaming Example

python
from openai import OpenAI

client = OpenAI(
    api_key="sk-your-api-key",
    base_url="https://api.deepseek.com/v1"
)

def basic_streaming():
    """Basic streaming example"""
    
    stream = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "user", "content": "Write a short story about a robot learning to paint."}
        ],
        stream=True,  # Enable streaming
        max_tokens=500
    )
    
    print("🤖 DeepSeek: ", end="", flush=True)
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
    
    print("\n")  # New line at the end

# Run the example
basic_streaming()

Streaming with Error Handling

python
import time
from typing import Generator, Optional

def robust_streaming(
    messages: list,
    model: str = "deepseek-chat",
    max_tokens: int = 500,
    temperature: float = 0.7
) -> Generator[str, None, None]:
    """Robust streaming with error handling and retries"""
    
    max_retries = 3
    retry_delay = 1.0
    
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model=model,
                messages=messages,
                stream=True,
                max_tokens=max_tokens,
                temperature=temperature
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    yield chunk.choices[0].delta.content
                
                # Check for finish reason
                if chunk.choices[0].finish_reason is not None:
                    break
            
            return  # Success, exit retry loop
            
        except Exception as e:
            print(f"❌ Streaming error (attempt {attempt + 1}): {e}")
            
            if attempt < max_retries - 1:
                print(f"⏳ Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
            else:
                print("🚫 Max retries exceeded")
                raise e

# Usage
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Explain quantum computing in simple terms."}
]

print("🤖 DeepSeek: ", end="", flush=True)

try:
    for content in robust_streaming(messages):
        print(content, end="", flush=True)
        time.sleep(0.01)  # Small delay for visual effect
    print("\n")
except Exception as e:
    print(f"\n❌ Streaming failed: {e}")

Advanced Streaming Techniques

Streaming with Token Counting

python
import tiktoken
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class StreamingStats:
    """Statistics for streaming response"""
    total_tokens: int = 0
    chunks_received: int = 0
    time_to_first_token: float = 0
    total_time: float = 0
    tokens_per_second: float = 0

class StreamingTokenCounter:
    """Count tokens during streaming"""
    
    def __init__(self, model: str = "deepseek-chat"):
        self.encoding = tiktoken.get_encoding("cl100k_base")
        self.model = model
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in text"""
        return len(self.encoding.encode(text))
    
    def stream_with_stats(
        self,
        messages: List[Dict[str, str]],
        **kwargs
    ) -> tuple[Generator[str, None, None], StreamingStats]:
        """Stream with token counting and statistics"""
        
        stats = StreamingStats()
        start_time = time.time()
        first_token_time = None
        accumulated_content = ""
        
        def token_generator():
            nonlocal first_token_time, accumulated_content
            
            stream = client.chat.completions.create(
                messages=messages,
                stream=True,
                **kwargs
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    content = chunk.choices[0].delta.content
                    accumulated_content += content
                    
                    # Record first token time
                    if first_token_time is None:
                        first_token_time = time.time()
                        stats.time_to_first_token = first_token_time - start_time
                    
                    stats.chunks_received += 1
                    yield content
            
            # Calculate final statistics
            end_time = time.time()
            stats.total_time = end_time - start_time
            stats.total_tokens = self.count_tokens(accumulated_content)
            
            if stats.total_time > 0:
                stats.tokens_per_second = stats.total_tokens / stats.total_time
        
        return token_generator(), stats

# Usage
counter = StreamingTokenCounter()

messages = [
    {"role": "user", "content": "Write a detailed explanation of machine learning algorithms."}
]

print("🤖 DeepSeek: ", end="", flush=True)

token_stream, stats = counter.stream_with_stats(
    messages,
    model="deepseek-chat",
    max_tokens=800
)

for content in token_stream:
    print(content, end="", flush=True)

print(f"\n\n📊 Streaming Statistics:")
print(f"   Total tokens: {stats.total_tokens}")
print(f"   Chunks received: {stats.chunks_received}")
print(f"   Time to first token: {stats.time_to_first_token:.2f}s")
print(f"   Total time: {stats.total_time:.2f}s")
print(f"   Tokens per second: {stats.tokens_per_second:.1f}")

Streaming with Content Processing

python
import re
import json
from typing import Callable, Any

class StreamingProcessor:
    """Process streaming content in real-time"""
    
    def __init__(self):
        self.processors = []
        self.accumulated_content = ""
    
    def add_processor(self, processor: Callable[[str], Any]):
        """Add a content processor"""
        self.processors.append(processor)
    
    def stream_with_processing(
        self,
        messages: List[Dict[str, str]],
        **kwargs
    ) -> Generator[Dict[str, Any], None, None]:
        """Stream with real-time content processing"""
        
        stream = client.chat.completions.create(
            messages=messages,
            stream=True,
            **kwargs
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                self.accumulated_content += content
                
                # Process content with all processors
                processed_data = {
                    "content": content,
                    "accumulated": self.accumulated_content,
                    "processed": {}
                }
                
                for i, processor in enumerate(self.processors):
                    try:
                        result = processor(self.accumulated_content)
                        processed_data["processed"][f"processor_{i}"] = result
                    except Exception as e:
                        processed_data["processed"][f"processor_{i}"] = f"Error: {e}"
                
                yield processed_data

# Example processors
def extract_code_blocks(content: str) -> List[str]:
    """Extract code blocks from content"""
    pattern = r'```[\w]*\n(.*?)\n```'
    return re.findall(pattern, content, re.DOTALL)

def count_sentences(content: str) -> int:
    """Count sentences in content"""
    sentences = re.split(r'[.!?]+', content)
    return len([s for s in sentences if s.strip()])

def extract_keywords(content: str) -> List[str]:
    """Extract potential keywords (simple implementation)"""
    words = re.findall(r'\b[A-Z][a-z]+\b', content)
    return list(set(words))

# Usage
processor = StreamingProcessor()
processor.add_processor(extract_code_blocks)
processor.add_processor(count_sentences)
processor.add_processor(extract_keywords)

messages = [
    {"role": "user", "content": "Write a Python function to calculate fibonacci numbers with examples."}
]

print("🤖 Streaming with real-time processing:\n")

for data in processor.stream_with_processing(
    messages,
    model="deepseek-chat",
    max_tokens=600
):
    # Display content
    print(data["content"], end="", flush=True)
    
    # Show processing results (every 10 chunks to avoid spam)
    if len(data["accumulated"]) % 100 < 10:
        print(f"\n[Processing: {len(data['processed']['processor_0'])} code blocks, "
              f"{data['processed']['processor_1']} sentences, "
              f"{len(data['processed']['processor_2'])} keywords]", end="")

print("\n")

Streaming for Different Use Cases

Chat Interface Streaming

python
import asyncio
from datetime import datetime
from typing import AsyncGenerator

class StreamingChatInterface:
    """Streaming chat interface implementation"""
    
    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.conversation_history = []
    
    async def stream_chat_response(
        self,
        user_message: str,
        system_prompt: str = None
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """Stream chat response with conversation context"""
        
        # Add user message to history
        self.conversation_history.append({
            "role": "user",
            "content": user_message,
            "timestamp": datetime.now().isoformat()
        })
        
        # Prepare messages for API
        messages = []
        
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        
        # Add conversation history (limit to last 10 messages for context)
        for msg in self.conversation_history[-10:]:
            messages.append({
                "role": msg["role"],
                "content": msg["content"]
            })
        
        # Stream response
        accumulated_response = ""
        
        try:
            stream = self.client.chat.completions.create(
                model="deepseek-chat",
                messages=messages,
                stream=True,
                max_tokens=1000,
                temperature=0.7
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    content = chunk.choices[0].delta.content
                    accumulated_response += content
                    
                    yield {
                        "type": "content",
                        "content": content,
                        "accumulated": accumulated_response,
                        "timestamp": datetime.now().isoformat()
                    }
                
                # Check for completion
                if chunk.choices[0].finish_reason is not None:
                    yield {
                        "type": "complete",
                        "finish_reason": chunk.choices[0].finish_reason,
                        "total_content": accumulated_response,
                        "timestamp": datetime.now().isoformat()
                    }
                    break
            
            # Add assistant response to history
            self.conversation_history.append({
                "role": "assistant",
                "content": accumulated_response,
                "timestamp": datetime.now().isoformat()
            })
            
        except Exception as e:
            yield {
                "type": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
    
    def get_conversation_history(self) -> List[Dict[str, Any]]:
        """Get conversation history"""
        return self.conversation_history.copy()
    
    def clear_history(self):
        """Clear conversation history"""
        self.conversation_history.clear()

# Usage example
async def chat_demo():
    """Demo chat interface"""
    
    chat = StreamingChatInterface("sk-your-api-key")
    
    system_prompt = "You are a helpful AI assistant. Be concise but informative."
    
    questions = [
        "What is artificial intelligence?",
        "How does machine learning work?",
        "Can you give me a simple example?"
    ]
    
    for question in questions:
        print(f"\n👤 User: {question}")
        print("🤖 DeepSeek: ", end="", flush=True)
        
        async for response in chat.stream_chat_response(question, system_prompt):
            if response["type"] == "content":
                print(response["content"], end="", flush=True)
            elif response["type"] == "complete":
                print(f"\n   [Completed: {response['finish_reason']}]")
            elif response["type"] == "error":
                print(f"\n   [Error: {response['error']}]")
        
        await asyncio.sleep(1)  # Brief pause between questions

# Run the demo
# asyncio.run(chat_demo())

Document Generation Streaming

python
class StreamingDocumentGenerator:
    """Generate documents with streaming progress"""
    
    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
    
    def generate_document_sections(
        self,
        document_type: str,
        topic: str,
        sections: List[str]
    ) -> Generator[Dict[str, Any], None, None]:
        """Generate document sections with streaming"""
        
        for i, section in enumerate(sections):
            print(f"\n📝 Generating section {i+1}/{len(sections)}: {section}")
            
            prompt = f"""
            Write a detailed {section} section for a {document_type} about {topic}.
            Make it comprehensive and well-structured.
            """
            
            messages = [
                {"role": "system", "content": f"You are writing a {document_type} about {topic}."},
                {"role": "user", "content": prompt}
            ]
            
            section_content = ""
            
            try:
                stream = self.client.chat.completions.create(
                    model="deepseek-chat",
                    messages=messages,
                    stream=True,
                    max_tokens=800,
                    temperature=0.3
                )
                
                for chunk in stream:
                    if chunk.choices[0].delta.content is not None:
                        content = chunk.choices[0].delta.content
                        section_content += content
                        
                        yield {
                            "section_index": i,
                            "section_name": section,
                            "content_chunk": content,
                            "accumulated_content": section_content,
                            "progress": (i + 1) / len(sections) * 100
                        }
                
                yield {
                    "section_index": i,
                    "section_name": section,
                    "content_chunk": "",
                    "accumulated_content": section_content,
                    "section_complete": True,
                    "progress": (i + 1) / len(sections) * 100
                }
                
            except Exception as e:
                yield {
                    "section_index": i,
                    "section_name": section,
                    "error": str(e),
                    "progress": (i + 1) / len(sections) * 100
                }

# Usage
generator = StreamingDocumentGenerator("sk-your-api-key")

document_sections = [
    "Introduction",
    "Background and Context",
    "Main Analysis",
    "Key Findings",
    "Recommendations",
    "Conclusion"
]

print("📄 Generating Technical Report: 'AI in Healthcare'")
print("=" * 50)

complete_document = {}

for update in generator.generate_document_sections(
    "technical report",
    "AI in Healthcare",
    document_sections
):
    if "content_chunk" in update and update["content_chunk"]:
        print(update["content_chunk"], end="", flush=True)
    
    if update.get("section_complete"):
        section_name = update["section_name"]
        complete_document[section_name] = update["accumulated_content"]
        print(f"\n\n✅ Section '{section_name}' completed")
        print(f"📊 Progress: {update['progress']:.1f}%")
    
    if update.get("error"):
        print(f"\n❌ Error in section '{update['section_name']}': {update['error']}")

print("\n🎉 Document generation complete!")
print(f"📋 Generated {len(complete_document)} sections")

Code Generation Streaming

python
class StreamingCodeGenerator:
    """Generate code with streaming and syntax highlighting"""
    
    def __init__(self, api_key: str):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.current_code_block = ""
        self.in_code_block = False
    
    def stream_code_generation(
        self,
        task_description: str,
        language: str = "python",
        include_tests: bool = True
    ) -> Generator[Dict[str, Any], None, None]:
        """Stream code generation with syntax detection"""
        
        prompt = f"""
        Write {language} code for the following task: {task_description}
        
        Requirements:
        - Include clear comments
        - Follow best practices
        - Make the code production-ready
        """
        
        if include_tests:
            prompt += "\n- Include unit tests"
        
        messages = [
            {"role": "system", "content": f"You are an expert {language} developer."},
            {"role": "user", "content": prompt}
        ]
        
        accumulated_content = ""
        
        stream = self.client.chat.completions.create(
            model="deepseek-coder",  # Use code-specific model
            messages=messages,
            stream=True,
            max_tokens=1500,
            temperature=0.1  # Lower temperature for code
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                accumulated_content += content
                
                # Detect code blocks
                code_blocks = self._extract_code_blocks(accumulated_content)
                
                yield {
                    "content_chunk": content,
                    "accumulated_content": accumulated_content,
                    "code_blocks": code_blocks,
                    "language": language,
                    "task": task_description
                }
    
    def _extract_code_blocks(self, content: str) -> List[Dict[str, str]]:
        """Extract code blocks from content"""
        
        pattern = r'```(\w+)?\n(.*?)\n```'
        matches = re.findall(pattern, content, re.DOTALL)
        
        code_blocks = []
        for lang, code in matches:
            code_blocks.append({
                "language": lang or "text",
                "code": code.strip()
            })
        
        return code_blocks

# Usage
code_generator = StreamingCodeGenerator("sk-your-api-key")

task = "Create a Python class for managing a simple todo list with add, remove, and list functionality"

print(f"💻 Generating code for: {task}")
print("=" * 60)

all_code_blocks = []

for update in code_generator.stream_code_generation(
    task,
    language="python",
    include_tests=True
):
    # Display streaming content
    print(update["content_chunk"], end="", flush=True)
    
    # Track code blocks
    if update["code_blocks"]:
        all_code_blocks = update["code_blocks"]

print(f"\n\n🎉 Code generation complete!")
print(f"📝 Generated {len(all_code_blocks)} code blocks")

# Display extracted code blocks
for i, block in enumerate(all_code_blocks, 1):
    print(f"\n--- Code Block {i} ({block['language']}) ---")
    print(block['code'])

Streaming Performance Optimization

Buffered Streaming

python
import threading
import queue
from typing import Optional

class BufferedStreamer:
    """Buffer streaming content for smoother display"""
    
    def __init__(self, buffer_size: int = 10, flush_interval: float = 0.1):
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.output_queue = queue.Queue()
        self.stop_event = threading.Event()
    
    def start_buffering(self, stream_generator: Generator[str, None, None]):
        """Start buffering stream content"""
        
        def buffer_worker():
            """Worker thread for buffering"""
            
            for content in stream_generator:
                self.buffer.append(content)
                
                # Flush buffer when full
                if len(self.buffer) >= self.buffer_size:
                    self._flush_buffer()
                
                if self.stop_event.is_set():
                    break
            
            # Flush remaining content
            self._flush_buffer()
            self.output_queue.put(None)  # Signal completion
        
        def flush_worker():
            """Worker thread for periodic flushing"""
            
            while not self.stop_event.is_set():
                time.sleep(self.flush_interval)
                if self.buffer:
                    self._flush_buffer()
        
        # Start worker threads
        buffer_thread = threading.Thread(target=buffer_worker)
        flush_thread = threading.Thread(target=flush_worker)
        
        buffer_thread.start()
        flush_thread.start()
        
        return buffer_thread, flush_thread
    
    def _flush_buffer(self):
        """Flush buffer to output queue"""
        
        if self.buffer:
            content = "".join(self.buffer)
            self.output_queue.put(content)
            self.buffer.clear()
    
    def get_buffered_content(self) -> Generator[str, None, None]:
        """Get buffered content"""
        
        while True:
            try:
                content = self.output_queue.get(timeout=1.0)
                if content is None:  # Completion signal
                    break
                yield content
            except queue.Empty:
                continue
    
    def stop(self):
        """Stop buffering"""
        self.stop_event.set()

# Usage
def create_stream():
    """Create a sample stream"""
    
    messages = [
        {"role": "user", "content": "Write a long essay about the future of artificial intelligence."}
    ]
    
    stream = client.chat.completions.create(
        model="deepseek-chat",
        messages=messages,
        stream=True,
        max_tokens=1000
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            yield chunk.choices[0].delta.content

# Create buffered streamer
buffered_streamer = BufferedStreamer(buffer_size=5, flush_interval=0.2)

print("🔄 Starting buffered streaming...")

# Start buffering
buffer_thread, flush_thread = buffered_streamer.start_buffering(create_stream())

print("🤖 DeepSeek: ", end="", flush=True)

# Display buffered content
try:
    for content in buffered_streamer.get_buffered_content():
        print(content, end="", flush=True)
        time.sleep(0.05)  # Smooth display
finally:
    buffered_streamer.stop()
    buffer_thread.join()
    flush_thread.join()

print("\n✅ Buffered streaming complete!")

Streaming with Backpressure

python
import asyncio
from asyncio import Queue

class BackpressureStreamer:
    """Handle streaming with backpressure control"""
    
    def __init__(self, max_queue_size: int = 100):
        self.max_queue_size = max_queue_size
        self.content_queue = Queue(maxsize=max_queue_size)
        self.processing_speed = 1.0  # Characters per second
    
    async def stream_with_backpressure(
        self,
        messages: List[Dict[str, str]],
        **kwargs
    ) -> AsyncGenerator[str, None, None]:
        """Stream with backpressure control"""
        
        # Start streaming task
        stream_task = asyncio.create_task(
            self._stream_producer(messages, **kwargs)
        )
        
        # Start consumer
        try:
            while True:
                try:
                    # Get content with timeout
                    content = await asyncio.wait_for(
                        self.content_queue.get(),
                        timeout=1.0
                    )
                    
                    if content is None:  # End signal
                        break
                    
                    yield content
                    
                    # Simulate processing time (backpressure)
                    processing_time = len(content) / self.processing_speed
                    await asyncio.sleep(processing_time)
                    
                except asyncio.TimeoutError:
                    # Check if stream is still active
                    if stream_task.done():
                        break
                    continue
        
        finally:
            if not stream_task.done():
                stream_task.cancel()
                try:
                    await stream_task
                except asyncio.CancelledError:
                    pass
    
    async def _stream_producer(self, messages: List[Dict[str, str]], **kwargs):
        """Producer coroutine for streaming"""
        
        try:
            # Note: This is a simplified async wrapper
            # In practice, you'd use an async HTTP client
            
            def sync_stream():
                stream = client.chat.completions.create(
                    messages=messages,
                    stream=True,
                    **kwargs
                )
                
                for chunk in stream:
                    if chunk.choices[0].delta.content is not None:
                        return chunk.choices[0].delta.content
                return None
            
            # Simulate async streaming
            loop = asyncio.get_event_loop()
            
            while True:
                content = await loop.run_in_executor(None, sync_stream)
                
                if content is None:
                    break
                
                # Add to queue (will block if queue is full)
                await self.content_queue.put(content)
            
            # Signal completion
            await self.content_queue.put(None)
            
        except Exception as e:
            # Signal error
            await self.content_queue.put(f"Error: {e}")
            await self.content_queue.put(None)

# Usage
async def backpressure_demo():
    """Demo backpressure streaming"""
    
    streamer = BackpressureStreamer(max_queue_size=50)
    
    messages = [
        {"role": "user", "content": "Write a detailed technical explanation of neural networks."}
    ]
    
    print("🔄 Starting backpressure streaming...")
    print("🤖 DeepSeek: ", end="", flush=True)
    
    async for content in streamer.stream_with_backpressure(
        messages,
        model="deepseek-chat",
        max_tokens=800
    ):
        if content.startswith("Error:"):
            print(f"\n{content}")
            break
        
        print(content, end="", flush=True)
    
    print("\n✅ Backpressure streaming complete!")

# Run the demo
# asyncio.run(backpressure_demo())

Error Handling and Recovery

Robust Streaming with Recovery

python
class RobustStreamer:
    """Robust streaming with error recovery"""
    
    def __init__(self, api_key: str, max_retries: int = 3):
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com/v1"
        )
        self.max_retries = max_retries
    
    def stream_with_recovery(
        self,
        messages: List[Dict[str, str]],
        **kwargs
    ) -> Generator[Dict[str, Any], None, None]:
        """Stream with automatic error recovery"""
        
        retry_count = 0
        accumulated_content = ""
        
        while retry_count <= self.max_retries:
            try:
                # If retrying, add context about previous content
                if retry_count > 0 and accumulated_content:
                    recovery_message = {
                        "role": "system",
                        "content": f"Continue from where you left off. Previous content: ...{accumulated_content[-100:]}"
                    }
                    recovery_messages = [recovery_message] + messages
                else:
                    recovery_messages = messages
                
                stream = self.client.chat.completions.create(
                    messages=recovery_messages,
                    stream=True,
                    **kwargs
                )
                
                chunk_count = 0
                
                for chunk in stream:
                    if chunk.choices[0].delta.content is not None:
                        content = chunk.choices[0].delta.content
                        accumulated_content += content
                        chunk_count += 1
                        
                        yield {
                            "type": "content",
                            "content": content,
                            "accumulated": accumulated_content,
                            "chunk_count": chunk_count,
                            "retry_count": retry_count
                        }
                    
                    if chunk.choices[0].finish_reason is not None:
                        yield {
                            "type": "complete",
                            "finish_reason": chunk.choices[0].finish_reason,
                            "total_content": accumulated_content,
                            "total_chunks": chunk_count,
                            "retry_count": retry_count
                        }
                        return  # Success, exit retry loop
                
            except Exception as e:
                retry_count += 1
                
                yield {
                    "type": "error",
                    "error": str(e),
                    "retry_count": retry_count,
                    "max_retries": self.max_retries,
                    "accumulated_content": accumulated_content
                }
                
                if retry_count <= self.max_retries:
                    wait_time = 2 ** retry_count  # Exponential backoff
                    
                    yield {
                        "type": "retry",
                        "wait_time": wait_time,
                        "retry_count": retry_count,
                        "max_retries": self.max_retries
                    }
                    
                    time.sleep(wait_time)
                else:
                    yield {
                        "type": "failed",
                        "error": "Max retries exceeded",
                        "final_content": accumulated_content
                    }
                    return

# Usage
robust_streamer = RobustStreamer("sk-your-api-key", max_retries=3)

messages = [
    {"role": "user", "content": "Write a comprehensive guide to Python web development."}
]

print("🛡️ Starting robust streaming with recovery...")

for update in robust_streamer.stream_with_recovery(
    messages,
    model="deepseek-chat",
    max_tokens=1000
):
    if update["type"] == "content":
        print(update["content"], end="", flush=True)
    
    elif update["type"] == "error":
        print(f"\n⚠️ Error (attempt {update['retry_count']}): {update['error']}")
    
    elif update["type"] == "retry":
        print(f"\n🔄 Retrying in {update['wait_time']} seconds... (attempt {update['retry_count']}/{update['max_retries']})")
    
    elif update["type"] == "complete":
        print(f"\n✅ Streaming completed successfully!")
        print(f"   Total chunks: {update['total_chunks']}")
        print(f"   Retry count: {update['retry_count']}")
    
    elif update["type"] == "failed":
        print(f"\n❌ Streaming failed after {update['retry_count']} attempts")
        if update["final_content"]:
            print(f"   Partial content received: {len(update['final_content'])} characters")

Best Practices

Streaming Optimization Tips

python
class StreamingBestPractices:
    """Best practices for streaming implementation"""
    
    @staticmethod
    def optimize_streaming_parameters() -> Dict[str, Any]:
        """Recommended parameters for optimal streaming"""
        
        return {
            "stream": True,
            "max_tokens": 1000,  # Reasonable limit for streaming
            "temperature": 0.7,  # Balanced creativity
            "top_p": 0.9,       # Focused but diverse
            "frequency_penalty": 0.1,  # Reduce repetition
            "presence_penalty": 0.1    # Encourage variety
        }
    
    @staticmethod
    def streaming_checklist() -> List[str]:
        """Checklist for streaming implementation"""
        
        return [
            "✅ Enable streaming with stream=True",
            "✅ Handle partial content gracefully",
            "✅ Implement error handling and retries",
            "✅ Use appropriate buffer sizes",
            "✅ Monitor token usage during streaming",
            "✅ Implement timeout handling",
            "✅ Provide user feedback during streaming",
            "✅ Handle network interruptions",
            "✅ Optimize for perceived performance",
            "✅ Test with various content lengths",
            "✅ Implement proper cleanup on errors",
            "✅ Consider backpressure for slow consumers"
        ]
    
    @staticmethod
    def common_pitfalls() -> List[str]:
        """Common streaming pitfalls to avoid"""
        
        return [
            "❌ Not handling None content in chunks",
            "❌ Blocking the main thread during streaming",
            "❌ Not implementing proper error recovery",
            "❌ Ignoring finish_reason signals",
            "❌ Not providing user feedback during delays",
            "❌ Accumulating too much content in memory",
            "❌ Not handling network timeouts",
            "❌ Missing proper cleanup on cancellation",
            "❌ Not testing with poor network conditions",
            "❌ Overwhelming users with too-fast streaming"
        ]

# Display best practices
practices = StreamingBestPractices()

print("🎯 Streaming Best Practices:")
for tip in practices.streaming_checklist():
    print(f"   {tip}")

print("\n⚠️ Common Pitfalls to Avoid:")
for pitfall in practices.common_pitfalls():
    print(f"   {pitfall}")

print("\n⚙️ Recommended Parameters:")
params = practices.optimize_streaming_parameters()
for key, value in params.items():
    print(f"   {key}: {value}")

Next Steps

基于 DeepSeek AI 大模型技术