A powerful and scalable solution for translating text columns in large datasets using OpenAI's language models with PySpark distributed processing.
This system allows you to translate text columns in data files (Stata, CSV, Parquet, JSON) with these key features:
- Distributed processing with PySpark for handling large datasets efficiently
- Smart caching to avoid redundant API calls and reduce costs
- Fault tolerance with checkpointing to resume interrupted processes
- Multiple file formats with preserved metadata (especially for Stata files)
- Language detection for automatic source language identification
- Batch processing for optimized throughput
distributed-translation/
├── main.py # Main orchestrator script
├── config.py # Configuration management
├── requirements.txt # Python dependencies
├── README.md # This document
└── modules/ # Core functionality modules
├── __init__.py
├── translator.py # Translation logic and API integration
├── cache.py # Translation caching implementation
├── checkpoint.py # Process state management for fault tolerance
├── file_manager.py # Data I/O operations for various formats
└── utilities.py # Common utility functions
The system uses a modular architecture with well-defined interfaces:
- TranslationOrchestrator (in
main.py
): Central controller coordinating all components - ConfigManager (in
config.py
): Manages configuration loading and validation - TranslationManager (in
modules/translator.py
): Manages the translation process - CacheManager (in
modules/cache.py
): Coordinates caching operations - CheckpointManager (in
modules/checkpoint.py
): Handles state persistence - DataReader/DataWriter (in
modules/file_manager.py
): Handle I/O operations
The system follows a flow where:
- Data is read from input files
- Translation is applied to specified columns
- Results are cached to avoid redundant API calls
- Results are checkpointed for fault tolerance
- Translated data is written to output files
- Python 3.7+
- Java 8+ (for PySpark)
- OpenAI API key
# Clone the repository
git clone https://github.com/JuanLara18/distributed-translation.git
cd distributed-translation
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set up your API key
export OPENAI_API_KEY=your-api-key-here
# On Windows: set OPENAI_API_KEY=your-api-key-here
- Create a configuration file
config.yaml
:
input_file: "data/input.dta" # Supports .dta, .csv, .parquet, .json
output_file: "data/output.dta"
columns_to_translate: ["text_column1", "text_column2"]
target_language: "english"
openai:
model: "gpt-3.5-turbo" # Or any supported OpenAI model
temperature: 0.1
max_tokens: 1500
api_key_env: "OPENAI_API_KEY"
- Run the translation process:
python main.py --config config.yaml
The script supports several command-line options to override configuration:
# Basic usage
python main.py --config config.yaml
# Override configuration settings
python main.py --config config.yaml --input_file new_input.dta --target_language spanish
# Resume from checkpoint after interruption
python main.py --config config.yaml --resume
# Force restart (ignore existing checkpoints)
python main.py --config config.yaml --force_restart
# Enable verbose logging
python main.py --config config.yaml --verbose
Setting | Description | Example |
---|---|---|
input_file |
Path to your input data file | "data/input.dta" |
output_file |
Where to save the translated data | "data/output.dta" |
columns_to_translate |
List of columns to translate | ["text_col", "description_col"] |
target_language |
Language to translate into | "english" |
source_language_column |
Column containing source languages (optional) | "language_col" |
openai:
model: "gpt-3.5-turbo" # or "gpt-4" for higher quality
temperature: 0.1 # Lower for more consistent translations
max_tokens: 1500
api_key_env: "OPENAI_API_KEY"
# For smaller datasets (<100MB)
spark:
executor_memory: "2g"
driver_memory: "2g"
executor_cores: 1
default_parallelism: 2
# For larger datasets (>1GB)
spark:
executor_memory: "8g"
driver_memory: "6g"
executor_cores: 4
default_parallelism: 8
Caching significantly improves performance and reduces API costs by storing translations:
cache:
type: "sqlite" # Options: "sqlite", "postgres", "memory"
location: "./cache/translations.db" # for SQLite
ttl: 2592000 # Cache lifetime in seconds (30 days)
For PostgreSQL cache:
cache:
type: "postgres"
connection_string: "postgresql://user:password@localhost/translations"
Checkpointing enables fault tolerance and the ability to resume interrupted processes:
checkpoint:
enabled: true
interval: 1 # Save every N partitions
directory: "./checkpoints"
max_checkpoints: 5
- The system reads your input file and splits it into partitions for distributed processing
- For each text column to translate:
- First checks if the translation is already in the cache
- If not cached, sends text to OpenAI's API
- Saves results to cache to avoid future redundant API calls
- Translations are added as new columns with
_[target_language]
suffix - The processed data is written to your output file
For large datasets, consider:
- Increasing partitions: Set higher
default_parallelism
in Spark config - Using checkpointing: Enable checkpoints to resume after interruptions
- PostgreSQL cache: For multi-node setups, use a central PostgreSQL cache instead of SQLite
Example configuration for large datasets:
spark:
executor_memory: "10g"
driver_memory: "8g"
executor_cores: 6
default_parallelism: 12
cache:
type: "postgres"
connection_string: "postgresql://user:password@centraldb/translations"
checkpoint:
enabled: true
interval: 2
directory: "/shared/checkpoints"
The system includes special support for Stata (.dta) files:
- Preserves variable labels and value labels
- Handles metadata correctly
- Supports different Stata versions (13-18)
If the source_language_column
is not specified, the system automatically detects source languages, but you can customize detection behavior:
# Automatic detection (default)
source_language_column: null
# Use a specific column for source language
source_language_column: "language_code"
- API Key issues: Make sure your OpenAI API key is set in your environment
- Memory errors: Reduce
batch_size
or increasespark.executor_memory
- Missing translations: Verify source language detection or specify a source language column
- Corrupted checkpoints: Use
--force_restart
to start fresh
logging:
level: "DEBUG"
log_file: "debug.log"
-
Caching Strategy:
- Use SQLite for single-machine processing
- Use PostgreSQL for distributed setups
- Consider memory cache only for small datasets
-
Batch Size Tuning:
- Start with
batch_size: 10
- Decrease for large texts, increase for short texts
- Start with
-
Spark Configuration:
- Increase parallelism for more concurrent processing
- Allocate sufficient memory to avoid OOM errors
Contributions are welcome! Please feel free to submit a pull request or open an issue.
To contribute:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.