Performance Guide
Optimize your DeepSeek API usage for maximum performance, efficiency, and cost-effectiveness.
Overview
This guide covers:
- Request optimization: Minimize latency and maximize throughput
- Token efficiency: Reduce costs and improve response times
- Caching strategies: Implement smart caching for repeated requests
- Batch processing: Handle multiple requests efficiently
- Rate limit management: Optimize within API constraints
- Monitoring and metrics: Track and improve performance
Request Optimization
Connection Management
python
import httpx
from openai import OpenAI
import time
class OptimizedDeepSeekClient:
"""Optimized client with connection pooling and reuse"""
def __init__(self, api_key: str):
# Configure HTTP client with connection pooling
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
),
timeout=httpx.Timeout(
connect=10.0,
read=60.0,
write=10.0,
pool=5.0
)
)
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1",
http_client=http_client
)
def create_completion(self, **kwargs):
"""Create completion with optimized settings"""
return self.client.chat.completions.create(**kwargs)
def close(self):
"""Close HTTP connections"""
self.client.close()
# Usage
client = OptimizedDeepSeekClient("sk-your-api-key")
try:
response = client.create_completion(
model="deepseek-chat",
messages=[{"role": "user", "content": "Hello!"}]
)
finally:
client.close()
Async Processing
python
import asyncio
import aiohttp
from typing import List, Dict, Any
class AsyncDeepSeekClient:
"""Async client for concurrent requests"""
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = "https://api.deepseek.com/v1"
self.semaphore = asyncio.Semaphore(max_concurrent)
async def create_completion(self, session: aiohttp.ClientSession, **kwargs) -> Dict[str, Any]:
"""Create async completion"""
async with self.semaphore:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with session.post(
f"{self.base_url}/chat/completions",
json=kwargs,
headers=headers
) as response:
return await response.json()
async def batch_completions(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple requests concurrently"""
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=20,
keepalive_timeout=30
)
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
tasks = [
self.create_completion(session, **request)
for request in requests
]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
async def main():
client = AsyncDeepSeekClient("sk-your-api-key", max_concurrent=5)
requests = [
{
"model": "deepseek-chat",
"messages": [{"role": "user", "content": f"Question {i}"}],
"max_tokens": 100
}
for i in range(10)
]
start_time = time.time()
results = await client.batch_completions(requests)
end_time = time.time()
print(f"Processed {len(requests)} requests in {end_time - start_time:.2f} seconds")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request {i} failed: {result}")
else:
print(f"Request {i} succeeded")
# Run async example
# asyncio.run(main())
Request Batching
python
from dataclasses import dataclass
from typing import List, Optional
import time
from collections import defaultdict
@dataclass
class BatchRequest:
"""Individual request in a batch"""
id: str
model: str
messages: List[Dict[str, str]]
max_tokens: Optional[int] = None
temperature: Optional[float] = None
class RequestBatcher:
"""Batch requests for efficient processing"""
def __init__(self, batch_size: int = 10, max_wait_time: float = 1.0):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.pending_requests = []
self.results = {}
def add_request(self, request: BatchRequest) -> str:
"""Add request to batch"""
self.pending_requests.append(request)
# Process batch if full or timeout
if len(self.pending_requests) >= self.batch_size:
self._process_batch()
return request.id
def _process_batch(self):
"""Process current batch of requests"""
if not self.pending_requests:
return
batch = self.pending_requests.copy()
self.pending_requests.clear()
# Group by model for efficiency
model_groups = defaultdict(list)
for request in batch:
model_groups[request.model].append(request)
# Process each model group
for model, requests in model_groups.items():
self._process_model_group(model, requests)
def _process_model_group(self, model: str, requests: List[BatchRequest]):
"""Process requests for a specific model"""
for request in requests:
try:
response = client.chat.completions.create(
model=request.model,
messages=request.messages,
max_tokens=request.max_tokens,
temperature=request.temperature
)
self.results[request.id] = {
"success": True,
"response": response.choices[0].message.content
}
except Exception as e:
self.results[request.id] = {
"success": False,
"error": str(e)
}
def get_result(self, request_id: str, timeout: float = 10.0) -> Dict[str, Any]:
"""Get result for a specific request"""
start_time = time.time()
while request_id not in self.results:
if time.time() - start_time > timeout:
return {"success": False, "error": "Timeout waiting for result"}
# Process pending batch if timeout reached
if (time.time() - start_time > self.max_wait_time and
self.pending_requests):
self._process_batch()
time.sleep(0.1)
return self.results.pop(request_id)
def flush(self):
"""Process all pending requests"""
self._process_batch()
# Usage
batcher = RequestBatcher(batch_size=5, max_wait_time=2.0)
# Add requests
request_ids = []
for i in range(10):
request = BatchRequest(
id=f"req_{i}",
model="deepseek-chat",
messages=[{"role": "user", "content": f"Question {i}"}],
max_tokens=50
)
request_ids.append(batcher.add_request(request))
# Get results
for req_id in request_ids:
result = batcher.get_result(req_id)
if result["success"]:
print(f"{req_id}: {result['response']}")
else:
print(f"{req_id}: Error - {result['error']}")
Token Optimization
Token Counting and Estimation
python
import tiktoken
from typing import List, Dict
class TokenOptimizer:
"""Optimize token usage for cost and performance"""
def __init__(self, model: str = "deepseek-chat"):
self.model = model
# Use cl100k_base encoding as approximation
self.encoding = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoding.encode(text))
def count_message_tokens(self, messages: List[Dict[str, str]]) -> int:
"""Count tokens in message list"""
total_tokens = 0
for message in messages:
# Add tokens for role and content
total_tokens += self.count_tokens(message.get("role", ""))
total_tokens += self.count_tokens(message.get("content", ""))
# Add overhead tokens per message
total_tokens += 4
# Add overhead for the conversation
total_tokens += 2
return total_tokens
def estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost based on token usage"""
# Example pricing (adjust based on actual rates)
input_cost_per_1k = 0.0014 # $0.0014 per 1K input tokens
output_cost_per_1k = 0.0028 # $0.0028 per 1K output tokens
input_cost = (input_tokens / 1000) * input_cost_per_1k
output_cost = (output_tokens / 1000) * output_cost_per_1k
return input_cost + output_cost
def optimize_messages(self, messages: List[Dict[str, str]], max_tokens: int) -> List[Dict[str, str]]:
"""Optimize messages to fit within token limit"""
optimized = []
current_tokens = 0
# Always include the last message (usually the current question)
if messages:
last_message = messages[-1]
last_tokens = self.count_message_tokens([last_message])
if last_tokens <= max_tokens:
optimized.append(last_message)
current_tokens = last_tokens
# Add previous messages in reverse order
for message in reversed(messages[:-1]):
message_tokens = self.count_message_tokens([message])
if current_tokens + message_tokens <= max_tokens:
optimized.insert(0, message)
current_tokens += message_tokens
else:
break
return optimized
def truncate_content(self, content: str, max_tokens: int) -> str:
"""Truncate content to fit within token limit"""
tokens = self.encoding.encode(content)
if len(tokens) <= max_tokens:
return content
# Truncate and decode
truncated_tokens = tokens[:max_tokens]
return self.encoding.decode(truncated_tokens)
def smart_summarize_context(self, messages: List[Dict[str, str]], target_tokens: int) -> List[Dict[str, str]]:
"""Summarize older messages to reduce token count"""
current_tokens = self.count_message_tokens(messages)
if current_tokens <= target_tokens:
return messages
# Keep recent messages, summarize older ones
keep_recent = 3 # Keep last 3 messages
recent_messages = messages[-keep_recent:]
older_messages = messages[:-keep_recent]
if older_messages:
# Create summary of older messages
summary_content = "Previous conversation summary: "
for msg in older_messages:
summary_content += f"{msg['role']}: {msg['content'][:100]}... "
summary_message = {
"role": "system",
"content": self.truncate_content(summary_content, target_tokens // 4)
}
return [summary_message] + recent_messages
return recent_messages
# Usage
optimizer = TokenOptimizer()
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is machine learning?"},
{"role": "assistant", "content": "Machine learning is..."},
{"role": "user", "content": "Can you give me examples?"}
]
# Count tokens
total_tokens = optimizer.count_message_tokens(messages)
print(f"Total tokens: {total_tokens}")
# Optimize for token limit
optimized_messages = optimizer.optimize_messages(messages, max_tokens=1000)
print(f"Optimized to {len(optimized_messages)} messages")
# Estimate cost
estimated_cost = optimizer.estimate_cost(input_tokens=total_tokens, output_tokens=150)
print(f"Estimated cost: ${estimated_cost:.4f}")
Prompt Optimization
python
class PromptOptimizer:
"""Optimize prompts for efficiency and effectiveness"""
def __init__(self):
self.token_optimizer = TokenOptimizer()
def compress_prompt(self, prompt: str) -> str:
"""Compress prompt while maintaining meaning"""
# Remove unnecessary whitespace
compressed = " ".join(prompt.split())
# Replace verbose phrases with concise alternatives
replacements = {
"Please provide a detailed explanation of": "Explain",
"I would like you to": "Please",
"Can you help me understand": "Explain",
"It would be great if you could": "Please",
"I need assistance with": "Help with",
"Could you please tell me": "What is",
"I am interested in learning about": "Explain",
}
for verbose, concise in replacements.items():
compressed = compressed.replace(verbose, concise)
return compressed
def create_efficient_system_prompt(self, role: str, constraints: List[str] = None) -> str:
"""Create efficient system prompts"""
base_prompts = {
"assistant": "You are a helpful AI assistant.",
"coder": "You are an expert programmer.",
"analyst": "You are a data analyst.",
"writer": "You are a professional writer.",
"teacher": "You are an educational tutor."
}
prompt = base_prompts.get(role, "You are a helpful AI assistant.")
if constraints:
# Add constraints efficiently
constraint_text = " ".join(constraints)
prompt += f" {constraint_text}"
return self.compress_prompt(prompt)
def optimize_few_shot_examples(self, examples: List[Dict[str, str]], max_examples: int = 3) -> List[Dict[str, str]]:
"""Optimize few-shot examples for token efficiency"""
if len(examples) <= max_examples:
return examples
# Score examples by length and diversity
scored_examples = []
for i, example in enumerate(examples):
input_tokens = self.token_optimizer.count_tokens(example.get("input", ""))
output_tokens = self.token_optimizer.count_tokens(example.get("output", ""))
total_tokens = input_tokens + output_tokens
# Prefer shorter examples
score = 1.0 / (total_tokens + 1)
scored_examples.append((score, i, example))
# Sort by score and take top examples
scored_examples.sort(reverse=True)
return [example for _, _, example in scored_examples[:max_examples]]
def create_template_prompt(self, task: str, input_format: str, output_format: str, examples: List[Dict[str, str]] = None) -> str:
"""Create optimized template prompt"""
prompt_parts = [
f"Task: {task}",
f"Input: {input_format}",
f"Output: {output_format}"
]
if examples:
optimized_examples = self.optimize_few_shot_examples(examples)
prompt_parts.append("Examples:")
for i, example in enumerate(optimized_examples, 1):
prompt_parts.append(f"{i}. Input: {example['input']}")
prompt_parts.append(f" Output: {example['output']}")
return "\n".join(prompt_parts)
# Usage
prompt_optimizer = PromptOptimizer()
# Compress verbose prompt
verbose_prompt = "Please provide a detailed explanation of how machine learning algorithms work and I would like you to include examples."
compressed = prompt_optimizer.compress_prompt(verbose_prompt)
print(f"Original: {len(verbose_prompt)} chars")
print(f"Compressed: {len(compressed)} chars")
# Create efficient system prompt
system_prompt = prompt_optimizer.create_efficient_system_prompt(
"coder",
["Be concise.", "Include examples.", "Focus on Python."]
)
print(f"System prompt: {system_prompt}")
# Optimize few-shot examples
examples = [
{"input": "Sort list [3,1,4,1,5]", "output": "[1,1,3,4,5]"},
{"input": "Reverse string 'hello'", "output": "'olleh'"},
{"input": "Find max in [2,8,1,9,3]", "output": "9"}
]
template = prompt_optimizer.create_template_prompt(
"Python operations",
"Description of operation",
"Result or code",
examples
)
print(f"Template:\n{template}")
Caching Strategies
Response Caching
python
import hashlib
import json
import time
from pathlib import Path
from typing import Optional, Dict, Any
class ResponseCache:
"""Cache API responses for improved performance"""
def __init__(self, cache_dir: str = "./cache", ttl: int = 3600):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.ttl = ttl # Time to live in seconds
def _get_cache_key(self, model: str, messages: List[Dict[str, str]], **kwargs) -> str:
"""Generate cache key for request"""
# Create deterministic hash of request parameters
cache_data = {
"model": model,
"messages": messages,
**kwargs
}
# Sort to ensure consistent hashing
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()
def get(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Optional[Dict[str, Any]]:
"""Get cached response if available and valid"""
cache_key = self._get_cache_key(model, messages, **kwargs)
cache_file = self.cache_dir / f"{cache_key}.json"
if not cache_file.exists():
return None
try:
with open(cache_file, 'r') as f:
cached_data = json.load(f)
# Check if cache is still valid
if time.time() - cached_data['timestamp'] > self.ttl:
cache_file.unlink() # Remove expired cache
return None
return cached_data['response']
except (json.JSONDecodeError, KeyError, FileNotFoundError):
return None
def set(self, model: str, messages: List[Dict[str, str]], response: Dict[str, Any], **kwargs):
"""Cache response"""
cache_key = self._get_cache_key(model, messages, **kwargs)
cache_file = self.cache_dir / f"{cache_key}.json"
cache_data = {
"timestamp": time.time(),
"response": response,
"model": model,
"messages": messages,
"kwargs": kwargs
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
def clear_expired(self):
"""Clear expired cache entries"""
current_time = time.time()
for cache_file in self.cache_dir.glob("*.json"):
try:
with open(cache_file, 'r') as f:
cached_data = json.load(f)
if current_time - cached_data['timestamp'] > self.ttl:
cache_file.unlink()
except (json.JSONDecodeError, KeyError, FileNotFoundError):
cache_file.unlink() # Remove corrupted cache
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
cache_files = list(self.cache_dir.glob("*.json"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"total_entries": len(cache_files),
"total_size_mb": total_size / (1024 * 1024),
"cache_dir": str(self.cache_dir)
}
class CachedDeepSeekClient:
"""DeepSeek client with caching"""
def __init__(self, api_key: str, cache_ttl: int = 3600):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.cache = ResponseCache(ttl=cache_ttl)
self.cache_hits = 0
self.cache_misses = 0
def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
"""Create chat completion with caching"""
# Try to get from cache first
cached_response = self.cache.get(**kwargs)
if cached_response:
self.cache_hits += 1
print(f"✅ Cache hit! (Hits: {self.cache_hits}, Misses: {self.cache_misses})")
return cached_response
# Make API call
self.cache_misses += 1
print(f"🔄 Cache miss, making API call (Hits: {self.cache_hits}, Misses: {self.cache_misses})")
response = self.client.chat.completions.create(**kwargs)
# Convert response to dict for caching
response_dict = {
"id": response.id,
"object": response.object,
"created": response.created,
"model": response.model,
"choices": [
{
"index": choice.index,
"message": {
"role": choice.message.role,
"content": choice.message.content
},
"finish_reason": choice.finish_reason
}
for choice in response.choices
],
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
# Cache the response
self.cache.set(response=response_dict, **kwargs)
return response_dict
def get_cache_stats(self) -> Dict[str, Any]:
"""Get caching statistics"""
cache_stats = self.cache.get_stats()
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
return {
**cache_stats,
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate_percent": hit_rate
}
# Usage
cached_client = CachedDeepSeekClient("sk-your-api-key", cache_ttl=1800)
# First call - cache miss
response1 = cached_client.chat_completions_create(
model="deepseek-chat",
messages=[{"role": "user", "content": "What is Python?"}]
)
# Second call with same parameters - cache hit
response2 = cached_client.chat_completions_create(
model="deepseek-chat",
messages=[{"role": "user", "content": "What is Python?"}]
)
# Get cache statistics
stats = cached_client.get_cache_stats()
print(f"Cache statistics: {stats}")
Semantic Caching
python
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticCache:
"""Cache based on semantic similarity of requests"""
def __init__(self, similarity_threshold: float = 0.85, max_entries: int = 1000):
self.similarity_threshold = similarity_threshold
self.max_entries = max_entries
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.cache_entries = []
self.embeddings = []
def _get_query_text(self, messages: List[Dict[str, str]]) -> str:
"""Extract query text from messages"""
# Combine all user messages
user_messages = [msg['content'] for msg in messages if msg['role'] == 'user']
return " ".join(user_messages)
def _find_similar_entry(self, query_embedding: np.ndarray) -> Optional[int]:
"""Find semantically similar cache entry"""
if not self.embeddings:
return None
# Calculate similarities
similarities = cosine_similarity([query_embedding], self.embeddings)[0]
# Find best match above threshold
best_idx = np.argmax(similarities)
best_similarity = similarities[best_idx]
if best_similarity >= self.similarity_threshold:
return best_idx
return None
def get(self, messages: List[Dict[str, str]], **kwargs) -> Optional[Dict[str, Any]]:
"""Get semantically similar cached response"""
query_text = self._get_query_text(messages)
query_embedding = self.model.encode([query_text])[0]
similar_idx = self._find_similar_entry(query_embedding)
if similar_idx is not None:
entry = self.cache_entries[similar_idx]
# Check if other parameters match
if entry['kwargs'] == kwargs:
return entry['response']
return None
def set(self, messages: List[Dict[str, str]], response: Dict[str, Any], **kwargs):
"""Cache response with semantic indexing"""
query_text = self._get_query_text(messages)
query_embedding = self.model.encode([query_text])[0]
# Add to cache
cache_entry = {
"messages": messages,
"response": response,
"kwargs": kwargs,
"query_text": query_text,
"timestamp": time.time()
}
self.cache_entries.append(cache_entry)
self.embeddings.append(query_embedding)
# Maintain cache size limit
if len(self.cache_entries) > self.max_entries:
# Remove oldest entry
self.cache_entries.pop(0)
self.embeddings.pop(0)
def get_stats(self) -> Dict[str, Any]:
"""Get semantic cache statistics"""
return {
"total_entries": len(self.cache_entries),
"similarity_threshold": self.similarity_threshold,
"max_entries": self.max_entries
}
class SemanticCachedClient:
"""Client with semantic caching"""
def __init__(self, api_key: str, similarity_threshold: float = 0.85):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.semantic_cache = SemanticCache(similarity_threshold)
self.semantic_hits = 0
self.semantic_misses = 0
def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
"""Create completion with semantic caching"""
# Try semantic cache
cached_response = self.semantic_cache.get(**kwargs)
if cached_response:
self.semantic_hits += 1
print(f"🎯 Semantic cache hit! (Hits: {self.semantic_hits}, Misses: {self.semantic_misses})")
return cached_response
# Make API call
self.semantic_misses += 1
print(f"🔄 Semantic cache miss (Hits: {self.semantic_hits}, Misses: {self.semantic_misses})")
response = self.client.chat.completions.create(**kwargs)
# Convert and cache response
response_dict = {
"choices": [
{
"message": {
"content": choice.message.content
}
}
for choice in response.choices
]
}
self.semantic_cache.set(response=response_dict, **kwargs)
return response_dict
# Usage
semantic_client = SemanticCachedClient("sk-your-api-key", similarity_threshold=0.8)
# These queries are semantically similar and should hit cache
queries = [
"What is machine learning?",
"Can you explain machine learning?",
"Tell me about ML",
"What does machine learning mean?"
]
for query in queries:
response = semantic_client.chat_completions_create(
model="deepseek-chat",
messages=[{"role": "user", "content": query}]
)
Rate Limit Management
Rate Limiter
python
import time
from collections import deque
from threading import Lock
import asyncio
class RateLimiter:
"""Manage API rate limits"""
def __init__(self, requests_per_minute: int = 60, tokens_per_minute: int = 60000):
self.requests_per_minute = requests_per_minute
self.tokens_per_minute = tokens_per_minute
self.request_times = deque()
self.token_usage = deque()
self.lock = Lock()
def _clean_old_entries(self, current_time: float):
"""Remove entries older than 1 minute"""
cutoff_time = current_time - 60
# Clean request times
while self.request_times and self.request_times[0] < cutoff_time:
self.request_times.popleft()
# Clean token usage
while self.token_usage and self.token_usage[0][0] < cutoff_time:
self.token_usage.popleft()
def can_make_request(self, estimated_tokens: int = 0) -> tuple[bool, float]:
"""Check if request can be made, return (can_make, wait_time)"""
with self.lock:
current_time = time.time()
self._clean_old_entries(current_time)
# Check request rate limit
if len(self.request_times) >= self.requests_per_minute:
wait_time = 60 - (current_time - self.request_times[0])
return False, max(0, wait_time)
# Check token rate limit
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
# Calculate wait time based on oldest token usage
if self.token_usage:
wait_time = 60 - (current_time - self.token_usage[0][0])
return False, max(0, wait_time)
return True, 0
def record_request(self, tokens_used: int = 0):
"""Record a successful request"""
with self.lock:
current_time = time.time()
self.request_times.append(current_time)
if tokens_used > 0:
self.token_usage.append((current_time, tokens_used))
def get_current_usage(self) -> Dict[str, Any]:
"""Get current rate limit usage"""
with self.lock:
current_time = time.time()
self._clean_old_entries(current_time)
current_requests = len(self.request_times)
current_tokens = sum(tokens for _, tokens in self.token_usage)
return {
"requests_used": current_requests,
"requests_limit": self.requests_per_minute,
"requests_remaining": self.requests_per_minute - current_requests,
"tokens_used": current_tokens,
"tokens_limit": self.tokens_per_minute,
"tokens_remaining": self.tokens_per_minute - current_tokens
}
class RateLimitedClient:
"""Client with automatic rate limiting"""
def __init__(self, api_key: str, requests_per_minute: int = 60, tokens_per_minute: int = 60000):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.rate_limiter = RateLimiter(requests_per_minute, tokens_per_minute)
self.token_optimizer = TokenOptimizer()
def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
"""Create completion with rate limiting"""
# Estimate tokens for rate limiting
messages = kwargs.get('messages', [])
estimated_tokens = self.token_optimizer.count_message_tokens(messages)
estimated_tokens += kwargs.get('max_tokens', 150) # Add estimated output tokens
# Check rate limits
can_make, wait_time = self.rate_limiter.can_make_request(estimated_tokens)
if not can_make:
print(f"⏳ Rate limit reached, waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
# Make request
try:
response = self.client.chat.completions.create(**kwargs)
# Record successful request
actual_tokens = response.usage.total_tokens
self.rate_limiter.record_request(actual_tokens)
return response
except Exception as e:
# Handle rate limit errors
if "rate limit" in str(e).lower():
print("🚫 Rate limit error from API, waiting 60 seconds...")
time.sleep(60)
return self.chat_completions_create(**kwargs)
else:
raise e
def get_rate_limit_status(self) -> Dict[str, Any]:
"""Get current rate limit status"""
return self.rate_limiter.get_current_usage()
# Usage
rate_limited_client = RateLimitedClient(
"sk-your-api-key",
requests_per_minute=50, # Conservative limit
tokens_per_minute=50000
)
# Make multiple requests
for i in range(10):
print(f"Making request {i+1}...")
response = rate_limited_client.chat_completions_create(
model="deepseek-chat",
messages=[{"role": "user", "content": f"Question {i+1}"}],
max_tokens=100
)
# Check rate limit status
status = rate_limited_client.get_rate_limit_status()
print(f"Rate limit status: {status['requests_remaining']} requests, {status['tokens_remaining']} tokens remaining")
Performance Monitoring
Metrics Collection
python
import time
import statistics
from dataclasses import dataclass, field
from typing import List, Dict, Any
import json
from datetime import datetime
@dataclass
class RequestMetrics:
"""Metrics for a single request"""
timestamp: float
model: str
input_tokens: int
output_tokens: int
total_tokens: int
latency: float
success: bool
error: str = None
cost: float = 0.0
class PerformanceMonitor:
"""Monitor and analyze API performance"""
def __init__(self):
self.metrics: List[RequestMetrics] = []
self.start_time = time.time()
def record_request(self, metrics: RequestMetrics):
"""Record request metrics"""
self.metrics.append(metrics)
def get_summary_stats(self, time_window: int = 3600) -> Dict[str, Any]:
"""Get summary statistics for the specified time window (seconds)"""
current_time = time.time()
cutoff_time = current_time - time_window
# Filter metrics within time window
recent_metrics = [m for m in self.metrics if m.timestamp >= cutoff_time]
if not recent_metrics:
return {"error": "No metrics in time window"}
# Calculate statistics
latencies = [m.latency for m in recent_metrics]
input_tokens = [m.input_tokens for m in recent_metrics]
output_tokens = [m.output_tokens for m in recent_metrics]
total_tokens = [m.total_tokens for m in recent_metrics]
costs = [m.cost for m in recent_metrics]
successful_requests = [m for m in recent_metrics if m.success]
failed_requests = [m for m in recent_metrics if not m.success]
return {
"time_window_hours": time_window / 3600,
"total_requests": len(recent_metrics),
"successful_requests": len(successful_requests),
"failed_requests": len(failed_requests),
"success_rate": len(successful_requests) / len(recent_metrics) * 100,
"latency": {
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"p95": self._percentile(latencies, 95),
"p99": self._percentile(latencies, 99),
"min": min(latencies),
"max": max(latencies)
},
"tokens": {
"total_input": sum(input_tokens),
"total_output": sum(output_tokens),
"total_combined": sum(total_tokens),
"avg_input": statistics.mean(input_tokens),
"avg_output": statistics.mean(output_tokens),
"avg_total": statistics.mean(total_tokens)
},
"cost": {
"total": sum(costs),
"average_per_request": statistics.mean(costs),
"cost_per_1k_tokens": sum(costs) / (sum(total_tokens) / 1000) if sum(total_tokens) > 0 else 0
},
"throughput": {
"requests_per_minute": len(recent_metrics) / (time_window / 60),
"tokens_per_minute": sum(total_tokens) / (time_window / 60)
}
}
def _percentile(self, data: List[float], percentile: int) -> float:
"""Calculate percentile"""
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
def get_error_analysis(self) -> Dict[str, Any]:
"""Analyze errors and failures"""
failed_metrics = [m for m in self.metrics if not m.success]
if not failed_metrics:
return {"total_errors": 0}
# Group errors by type
error_counts = {}
for metric in failed_metrics:
error_type = metric.error or "Unknown"
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return {
"total_errors": len(failed_metrics),
"error_rate": len(failed_metrics) / len(self.metrics) * 100,
"error_types": error_counts,
"most_common_error": max(error_counts.items(), key=lambda x: x[1]) if error_counts else None
}
def export_metrics(self, filename: str):
"""Export metrics to JSON file"""
export_data = {
"export_timestamp": datetime.now().isoformat(),
"total_metrics": len(self.metrics),
"metrics": [
{
"timestamp": m.timestamp,
"model": m.model,
"input_tokens": m.input_tokens,
"output_tokens": m.output_tokens,
"total_tokens": m.total_tokens,
"latency": m.latency,
"success": m.success,
"error": m.error,
"cost": m.cost
}
for m in self.metrics
]
}
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
class MonitoredDeepSeekClient:
"""DeepSeek client with performance monitoring"""
def __init__(self, api_key: str):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com/v1"
)
self.monitor = PerformanceMonitor()
self.token_optimizer = TokenOptimizer()
def chat_completions_create(self, **kwargs) -> Dict[str, Any]:
"""Create completion with monitoring"""
start_time = time.time()
# Estimate input tokens
messages = kwargs.get('messages', [])
input_tokens = self.token_optimizer.count_message_tokens(messages)
try:
response = self.client.chat.completions.create(**kwargs)
# Calculate metrics
end_time = time.time()
latency = end_time - start_time
output_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
# Estimate cost
cost = self.token_optimizer.estimate_cost(input_tokens, output_tokens)
# Record metrics
metrics = RequestMetrics(
timestamp=start_time,
model=kwargs.get('model', 'unknown'),
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
latency=latency,
success=True,
cost=cost
)
self.monitor.record_request(metrics)
return response
except Exception as e:
# Record failed request
end_time = time.time()
latency = end_time - start_time
metrics = RequestMetrics(
timestamp=start_time,
model=kwargs.get('model', 'unknown'),
input_tokens=input_tokens,
output_tokens=0,
total_tokens=input_tokens,
latency=latency,
success=False,
error=str(e)
)
self.monitor.record_request(metrics)
raise e
def get_performance_report(self) -> Dict[str, Any]:
"""Get comprehensive performance report"""
return {
"summary_1h": self.monitor.get_summary_stats(3600),
"summary_24h": self.monitor.get_summary_stats(86400),
"error_analysis": self.monitor.get_error_analysis()
}
def export_performance_data(self, filename: str):
"""Export performance data"""
self.monitor.export_metrics(filename)
# Usage
monitored_client = MonitoredDeepSeekClient("sk-your-api-key")
# Make some requests
for i in range(5):
try:
response = monitored_client.chat_completions_create(
model="deepseek-chat",
messages=[{"role": "user", "content": f"Test question {i+1}"}],
max_tokens=100
)
print(f"✅ Request {i+1} successful")
except Exception as e:
print(f"❌ Request {i+1} failed: {e}")
# Get performance report
report = monitored_client.get_performance_report()
print(json.dumps(report, indent=2))
# Export data
monitored_client.export_performance_data("performance_metrics.json")
Best Practices Summary
Performance Checklist
python
class PerformanceChecklist:
"""Performance optimization checklist"""
@staticmethod
def check_request_optimization(client_config: Dict[str, Any]) -> List[str]:
"""Check request optimization"""
recommendations = []
# Connection pooling
if not client_config.get("connection_pooling"):
recommendations.append("✅ Enable HTTP connection pooling")
# Timeout configuration
if not client_config.get("timeout_configured"):
recommendations.append("✅ Configure appropriate timeouts")
# Async processing
if not client_config.get("async_support"):
recommendations.append("✅ Consider async processing for concurrent requests")
return recommendations
@staticmethod
def check_token_optimization(prompt_config: Dict[str, Any]) -> List[str]:
"""Check token optimization"""
recommendations = []
# Prompt length
if prompt_config.get("avg_prompt_tokens", 0) > 2000:
recommendations.append("✅ Consider shortening prompts")
# Context management
if not prompt_config.get("context_management"):
recommendations.append("✅ Implement context window management")
# Token counting
if not prompt_config.get("token_counting"):
recommendations.append("✅ Implement token counting and estimation")
return recommendations
@staticmethod
def check_caching_strategy(cache_config: Dict[str, Any]) -> List[str]:
"""Check caching strategy"""
recommendations = []
# Response caching
if not cache_config.get("response_caching"):
recommendations.append("✅ Implement response caching")
# Cache hit rate
hit_rate = cache_config.get("hit_rate", 0)
if hit_rate < 20:
recommendations.append("✅ Improve cache hit rate (currently {hit_rate}%)")
# Semantic caching
if not cache_config.get("semantic_caching"):
recommendations.append("✅ Consider semantic caching for similar queries")
return recommendations
@staticmethod
def check_rate_limiting(rate_config: Dict[str, Any]) -> List[str]:
"""Check rate limiting"""
recommendations = []
# Rate limiter
if not rate_config.get("rate_limiter"):
recommendations.append("✅ Implement client-side rate limiting")
# Backoff strategy
if not rate_config.get("backoff_strategy"):
recommendations.append("✅ Implement exponential backoff")
# Usage monitoring
if not rate_config.get("usage_monitoring"):
recommendations.append("✅ Monitor rate limit usage")
return recommendations
@staticmethod
def check_monitoring(monitor_config: Dict[str, Any]) -> List[str]:
"""Check monitoring setup"""
recommendations = []
# Performance monitoring
if not monitor_config.get("performance_monitoring"):
recommendations.append("✅ Implement performance monitoring")
# Error tracking
if not monitor_config.get("error_tracking"):
recommendations.append("✅ Implement error tracking and analysis")
# Metrics export
if not monitor_config.get("metrics_export"):
recommendations.append("✅ Set up metrics export and analysis")
return recommendations
@staticmethod
def generate_full_report(config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Generate full performance optimization report"""
return {
"request_optimization": PerformanceChecklist.check_request_optimization(
config.get("client", {})
),
"token_optimization": PerformanceChecklist.check_token_optimization(
config.get("prompts", {})
),
"caching_strategy": PerformanceChecklist.check_caching_strategy(
config.get("cache", {})
),
"rate_limiting": PerformanceChecklist.check_rate_limiting(
config.get("rate_limits", {})
),
"monitoring": PerformanceChecklist.check_monitoring(
config.get("monitoring", {})
)
}
# Usage
config = {
"client": {
"connection_pooling": True,
"timeout_configured": True,
"async_support": False
},
"prompts": {
"avg_prompt_tokens": 1500,
"context_management": True,
"token_counting": True
},
"cache": {
"response_caching": True,
"hit_rate": 35,
"semantic_caching": False
},
"rate_limits": {
"rate_limiter": True,
"backoff_strategy": True,
"usage_monitoring": False
},
"monitoring": {
"performance_monitoring": False,
"error_tracking": True,
"metrics_export": False
}
}
report = PerformanceChecklist.generate_full_report(config)
for category, recommendations in report.items():
print(f"\n{category.replace('_', ' ').title()}:")
for rec in recommendations:
print(f" {rec}")