Batch Processing Guide
Learn how to efficiently process multiple requests using DeepSeek's batch processing capabilities to reduce costs and improve throughput.
Overview
Batch processing allows you to:
- Reduce costs: Up to 50% savings compared to real-time requests
- Increase throughput: Process thousands of requests efficiently
- Optimize resources: Better resource utilization for large workloads
- Simplify workflows: Handle bulk operations with ease
When to Use Batch Processing
Ideal Use Cases
- Data analysis: Processing large datasets
- Content generation: Bulk content creation
- Translation: Translating multiple documents
- Summarization: Summarizing many articles
- Classification: Categorizing large volumes of text
- Code analysis: Analyzing multiple code files
Not Suitable For
- Real-time applications: Interactive chatbots, live support
- Time-sensitive tasks: Urgent processing requirements
- Single requests: Individual, one-off requests
Getting Started
1. Prepare Your Batch File
Create a JSONL (JSON Lines) file where each line contains a request:
jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "Summarize the benefits of renewable energy."}], "max_tokens": 200}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "Explain quantum computing in simple terms."}], "max_tokens": 200}}
{"custom_id": "request-3", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "deepseek-chat", "messages": [{"role": "user", "content": "What are the main causes of climate change?"}], "max_tokens": 200}}
2. Upload the Batch File
python
from openai import OpenAI
client = OpenAI(
api_key="YOUR_API_KEY",
base_url="https://api.deepseek.com/v1"
)
# Upload the batch file
batch_file = client.files.create(
file=open("batch_requests.jsonl", "rb"),
purpose="batch"
)
print(f"File uploaded: {batch_file.id}")
3. Create a Batch Job
python
# Create the batch job
batch_job = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
print(f"Batch job created: {batch_job.id}")
print(f"Status: {batch_job.status}")
4. Monitor Batch Status
python
# Check batch status
batch_status = client.batches.retrieve(batch_job.id)
print(f"Status: {batch_status.status}")
print(f"Progress: {batch_status.request_counts}")
# Possible statuses:
# - validating: Checking input file format
# - failed: Validation failed
# - in_progress: Processing requests
# - finalizing: Preparing output file
# - completed: All done
# - expired: Took too long (24h limit)
# - cancelled: Manually cancelled
5. Retrieve Results
python
# Once completed, get the results
if batch_status.status == "completed":
# Download the output file
output_file_id = batch_status.output_file_id
output_content = client.files.content(output_file_id)
# Save results to file
with open("batch_results.jsonl", "wb") as f:
f.write(output_content.content)
# Process results
with open("batch_results.jsonl", "r") as f:
for line in f:
result = json.loads(line)
custom_id = result["custom_id"]
response = result["response"]["body"]
print(f"Request {custom_id}: {response['choices'][0]['message']['content']}")
Advanced Usage
Batch File Format
Each line in your JSONL file must contain:
json
{
"custom_id": "unique-identifier",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "deepseek-chat",
"messages": [...],
"max_tokens": 1000,
"temperature": 0.7
}
}
Required Fields
- custom_id: Unique identifier for each request (max 64 characters)
- method: HTTP method (always "POST" for chat completions)
- url: API endpoint (always "/v1/chat/completions")
- body: The request payload
Custom ID Best Practices
python
# Good custom IDs
"doc-001-summary"
"user-12345-query"
"batch-2024-01-15-item-001"
# Avoid special characters and keep it descriptive
Large-Scale Processing
For processing thousands of requests:
python
import json
from typing import List, Dict
def create_batch_file(requests: List[Dict], filename: str):
"""Create a batch file from a list of requests"""
with open(filename, 'w') as f:
for i, request in enumerate(requests):
batch_request = {
"custom_id": f"request-{i+1:06d}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": request["prompt"]}],
"max_tokens": request.get("max_tokens", 500),
"temperature": request.get("temperature", 0.7)
}
}
f.write(json.dumps(batch_request) + '\n')
# Example: Process 1000 documents
documents = [
{"prompt": "Summarize this document: ...", "max_tokens": 200},
{"prompt": "Translate this text: ...", "max_tokens": 300},
# ... more documents
]
create_batch_file(documents, "large_batch.jsonl")
Error Handling
python
def process_batch_with_error_handling(batch_file_path: str):
try:
# Upload file
with open(batch_file_path, "rb") as f:
batch_file = client.files.create(file=f, purpose="batch")
# Create batch job
batch_job = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
# Monitor progress
import time
while True:
batch_status = client.batches.retrieve(batch_job.id)
print(f"Status: {batch_status.status}")
if batch_status.status in ["completed", "failed", "expired", "cancelled"]:
break
time.sleep(60) # Check every minute
# Handle results
if batch_status.status == "completed":
print("Batch completed successfully!")
return process_results(batch_status.output_file_id)
else:
print(f"Batch failed with status: {batch_status.status}")
if batch_status.error_file_id:
error_content = client.files.content(batch_status.error_file_id)
print("Error details:", error_content.content.decode())
return None
except Exception as e:
print(f"Error processing batch: {e}")
return None
def process_results(output_file_id: str):
"""Process batch results and handle errors"""
output_content = client.files.content(output_file_id)
results = []
errors = []
for line in output_content.content.decode().strip().split('\n'):
result = json.loads(line)
if result.get("error"):
errors.append({
"custom_id": result["custom_id"],
"error": result["error"]
})
else:
results.append({
"custom_id": result["custom_id"],
"response": result["response"]["body"]["choices"][0]["message"]["content"]
})
print(f"Successful requests: {len(results)}")
print(f"Failed requests: {len(errors)}")
return {"results": results, "errors": errors}
Batch Management
List All Batches
python
# List recent batches
batches = client.batches.list(limit=10)
for batch in batches.data:
print(f"Batch {batch.id}: {batch.status}")
Cancel a Batch
python
# Cancel a running batch
cancelled_batch = client.batches.cancel(batch_job.id)
print(f"Batch cancelled: {cancelled_batch.status}")
Retrieve Batch Details
python
# Get detailed information about a batch
batch_details = client.batches.retrieve(batch_job.id)
print(f"Created at: {batch_details.created_at}")
print(f"Request counts: {batch_details.request_counts}")
print(f"Metadata: {batch_details.metadata}")
Cost Optimization
Pricing Benefits
Batch processing offers significant cost savings:
python
# Example cost comparison
real_time_cost = 1000 * 0.002 # 1000 requests at $0.002 each
batch_cost = 1000 * 0.001 # 50% discount for batch processing
savings = real_time_cost - batch_cost
print(f"Real-time cost: ${real_time_cost}")
print(f"Batch cost: ${batch_cost}")
print(f"Savings: ${savings} ({savings/real_time_cost*100:.1f}%)")
Optimization Strategies
- Batch Size: Optimize batch sizes for your use case
- Request Grouping: Group similar requests together
- Token Management: Use appropriate max_tokens settings
- Model Selection: Choose the right model for each task
python
def optimize_batch_requests(requests: List[Dict]):
"""Optimize requests for batch processing"""
optimized = []
for request in requests:
# Estimate optimal max_tokens based on task type
if "summarize" in request["prompt"].lower():
max_tokens = min(200, len(request["prompt"].split()) // 2)
elif "translate" in request["prompt"].lower():
max_tokens = len(request["prompt"].split()) * 2
else:
max_tokens = 500
optimized.append({
**request,
"max_tokens": max_tokens,
"temperature": 0.3 # Lower temperature for consistent results
})
return optimized
Use Case Examples
Document Summarization
python
def batch_summarize_documents(documents: List[str]):
"""Summarize multiple documents in batch"""
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"summary-{i+1:04d}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "You are a professional summarizer. Provide concise, accurate summaries."
},
{
"role": "user",
"content": f"Summarize this document in 2-3 sentences:\n\n{doc}"
}
],
"max_tokens": 150,
"temperature": 0.3
}
})
# Save to file
with open("summarization_batch.jsonl", "w") as f:
for request in requests:
f.write(json.dumps(request) + '\n')
return "summarization_batch.jsonl"
Content Translation
python
def batch_translate_content(texts: List[str], target_language: str):
"""Translate multiple texts in batch"""
requests = []
for i, text in enumerate(texts):
requests.append({
"custom_id": f"translate-{i+1:04d}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "deepseek-chat",
"messages": [
{
"role": "user",
"content": f"Translate the following text to {target_language}:\n\n{text}"
}
],
"max_tokens": len(text.split()) * 2,
"temperature": 0.1
}
})
with open("translation_batch.jsonl", "w") as f:
for request in requests:
f.write(json.dumps(request) + '\n')
return "translation_batch.jsonl"
Data Classification
python
def batch_classify_data(items: List[str], categories: List[str]):
"""Classify multiple items in batch"""
category_list = ", ".join(categories)
requests = []
for i, item in enumerate(items):
requests.append({
"custom_id": f"classify-{i+1:04d}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": f"Classify the given text into one of these categories: {category_list}. Respond with only the category name."
},
{
"role": "user",
"content": item
}
],
"max_tokens": 50,
"temperature": 0.1
}
})
with open("classification_batch.jsonl", "w") as f:
for request in requests:
f.write(json.dumps(request) + '\n')
return "classification_batch.jsonl"
Best Practices
File Management
- Validate input files before uploading
- Use descriptive filenames for tracking
- Clean up old files to manage storage
- Backup important batch files
python
def validate_batch_file(filename: str) -> bool:
"""Validate batch file format"""
try:
with open(filename, 'r') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
request = json.loads(line)
required_fields = ["custom_id", "method", "url", "body"]
for field in required_fields:
if field not in request:
print(f"Line {line_num}: Missing required field '{field}'")
return False
# Validate custom_id uniqueness
if len(request["custom_id"]) > 64:
print(f"Line {line_num}: custom_id too long (max 64 characters)")
return False
except json.JSONDecodeError:
print(f"Line {line_num}: Invalid JSON format")
return False
return True
except FileNotFoundError:
print(f"File not found: {filename}")
return False
Monitoring and Logging
python
import logging
from datetime import datetime
def setup_batch_logging():
"""Setup logging for batch operations"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('batch_processing.log'),
logging.StreamHandler()
]
)
def monitor_batch_progress(batch_id: str, check_interval: int = 60):
"""Monitor batch progress with logging"""
logger = logging.getLogger(__name__)
while True:
try:
batch_status = client.batches.retrieve(batch_id)
logger.info(f"Batch {batch_id} status: {batch_status.status}")
if batch_status.request_counts:
counts = batch_status.request_counts
logger.info(f"Progress: {counts.completed}/{counts.total} completed")
if batch_status.status in ["completed", "failed", "expired", "cancelled"]:
logger.info(f"Batch {batch_id} finished with status: {batch_status.status}")
break
time.sleep(check_interval)
except Exception as e:
logger.error(f"Error monitoring batch {batch_id}: {e}")
break
Performance Tips
- Optimal batch sizes: 100-1000 requests per batch
- Parallel processing: Run multiple smaller batches
- Request optimization: Minimize token usage
- Error handling: Plan for partial failures
python
def split_large_batch(requests: List[Dict], batch_size: int = 500):
"""Split large request list into smaller batches"""
batches = []
for i in range(0, len(requests), batch_size):
batch = requests[i:i + batch_size]
batches.append(batch)
return batches
def process_multiple_batches(all_requests: List[Dict]):
"""Process multiple batches in parallel"""
batches = split_large_batch(all_requests)
batch_jobs = []
for i, batch_requests in enumerate(batches):
# Create batch file
filename = f"batch_{i+1:03d}.jsonl"
with open(filename, 'w') as f:
for request in batch_requests:
f.write(json.dumps(request) + '\n')
# Upload and create batch job
with open(filename, "rb") as f:
batch_file = client.files.create(file=f, purpose="batch")
batch_job = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
batch_jobs.append({
"job_id": batch_job.id,
"filename": filename,
"batch_number": i + 1
})
return batch_jobs
Troubleshooting
Common Issues
- File format errors: Ensure proper JSONL format
- Custom ID conflicts: Use unique identifiers
- Request validation: Check required fields
- Timeout issues: Monitor batch progress
Error Resolution
python
def diagnose_batch_errors(batch_id: str):
"""Diagnose and report batch errors"""
batch_status = client.batches.retrieve(batch_id)
print(f"Batch Status: {batch_status.status}")
if batch_status.error_file_id:
error_content = client.files.content(batch_status.error_file_id)
errors = error_content.content.decode().strip().split('\n')
print(f"Found {len(errors)} errors:")
for error_line in errors[:5]: # Show first 5 errors
error = json.loads(error_line)
print(f"- Request {error['custom_id']}: {error['error']['message']}")
if batch_status.request_counts:
counts = batch_status.request_counts
print(f"Request Summary:")
print(f"- Total: {counts.total}")
print(f"- Completed: {counts.completed}")
print(f"- Failed: {counts.failed}")