DStream is a robust Change Data Capture (CDC) streaming solution designed to capture changes from Microsoft SQL Server and reliably deliver them to downstream systems through Azure Service Bus. It follows a two-stage architecture with separate ingestion and routing components, ensuring reliable delivery and proper sequencing of changes.
DStream operates in two main stages:
-
Ingestion Stage (Ingester):
- Monitors SQL Server tables enabled with CDC
- Captures changes (inserts, updates, deletes)
- Publishes changes to a central ingest queue
- Updates CDC offsets only after successful queue publish
- Uses distributed locking for high availability
-
Routing Stage (Router):
- Consumes messages from the ingest queue
- Routes messages to their destination topics
- Pre-creates publishers at startup for optimal performance
- Ensures reliable delivery to downstream systems
This architecture provides several benefits:
- Reliable capture and delivery of changes
- Proper sequencing of messages
- High availability through distributed locking
- Optimized performance with connection pooling
- Clear separation of concerns between ingestion and routing
- CDC Monitoring: Tracks changes (inserts, updates, deletes) on MS SQL Server tables enabled with CDC
- Reliable Offset Management: Updates CDC offsets only after successful publish to ingest queue
- Distributed Locking: Uses Azure Blob Storage for distributed locking in multi-instance deployments
- Adaptive Polling: Features adaptive backoff for table monitoring based on change frequency
- Automatic Topic Creation: Creates topics and subscriptions for each monitored table
- Optimized Publishing: Pre-creates and caches publishers at startup for better performance
- Reliable Delivery: Ensures messages are properly delivered to destination topics
- Message Preservation: Maintains original message properties during routing
- Automatic Topic Management: Creates topics and subscriptions as needed
- Flexible Configuration: HCL-based configuration with environment variable support
- Structured Logging: Built-in structured logging with configurable levels
- High Availability: Supports running multiple instances for redundancy
- Message Metadata: Includes rich metadata for proper message routing and tracking
- MS SQL Server with CDC enabled on target tables
- Azure Service Bus for message streaming
- Azure Blob Storage for distributed locking
- Go (latest version recommended)
-
Clone the repository:
git clone https://github.com/katasec/dstream.git cd dstream
-
Install dependencies:
go mod tidy
-
Configure environment variables:
export DSTREAM_DB_CONNECTION_STRING="sqlserver://user:pass@localhost:1433?database=TestDB" export DSTREAM_INGEST_CONNECTION_STRING="your-azure-service-bus-connection-string" export DSTREAM_BLOB_CONNECTION_STRING="your-azure-blob-storage-connection-string" export DSTREAM_PUBLISHER_CONNECTION_STRING="your-azure-service-bus-connection-string" export DSTREAM_LOG_LEVEL="debug" # Optional, defaults to info
DStream uses HCL for configuration. Here's an example dstream.hcl
:
ingester {
db_type = "sqlserver"
db_connection_string = "{{ env \"DSTREAM_DB_CONNECTION_STRING\" }}"
poll_interval_defaults {
poll_interval = "5s"
max_poll_interval = "2m"
}
queue {
type = "servicebus"
name = "ingest-queue"
connection_string = "{{ env \"DSTREAM_INGEST_CONNECTION_STRING\" }}"
}
locks {
type = "azure_blob"
connection_string = "{{ env \"DSTREAM_BLOB_CONNECTION_STRING\" }}"
container_name = "locks"
}
tables = ["Persons"]
tables_overrides {
overrides {
table_name = "Persons"
poll_interval = "5s"
max_poll_interval = "10m"
}
}
}
publisher {
source {
type = "azure_service_bus"
connection_string = "{{ env \"DSTREAM_PUBLISHER_CONNECTION_STRING\" }}"
}
output {
type = "azure_service_bus"
connection_string = "{{ env \"DSTREAM_PUBLISHER_CONNECTION_STRING\" }}"
}
}
The ingester captures changes from SQL Server and publishes them to the ingest queue:
# Start with debug logging
go run . ingester --log-level debug
# Start with info logging (default)
go run . ingester
The ingester will:
- Create topics for each monitored table
- Create a 'sub1' subscription for each topic
- Begin monitoring tables for changes
- Publish changes to the ingest queue
- Update CDC offsets after successful publish
The router consumes messages from the ingest queue and routes them to destination topics:
# Start with debug logging
go run . router --log-level debug
# Start with info logging (default)
go run . router
The router will:
- Pre-create publishers for all configured tables
- Begin consuming messages from the ingest queue
- Route messages to their destination topics
- Ensure reliable delivery with proper sequencing
DStream uses a consistent message format throughout the pipeline:
{
"data": {
"FirstName": "Diana",
"ID": "180",
"LastName": "Williams"
},
"metadata": {
"Destination": "server.database.table.events",
"IngestQueue": "ingest-queue",
"LSN": "0000003600000b200003",
"OperationID": 2,
"OperationType": "Insert",
"TableName": "Persons"
}
}
- Contains the actual change data
- Includes all columns from the monitored table
- Values are preserved in their original types
Destination
: Fully qualified destination topic nameIngestQueue
: Name of the central ingest queueLSN
: Log Sequence Number from SQL Server CDCOperationID
: Type of change (1=delete, 2=insert, 3=update before, 4=update after)OperationType
: Human-readable operation typeTableName
: Source table name
For production deployments:
go run . server --log-level info
DStream follows a modular architecture with clear separation of concerns:
-
CDC Monitor
- Monitors SQL Server tables for changes using CDC
- Uses adaptive polling with configurable intervals
- Tracks changes using LSN (Log Sequence Numbers)
-
Publisher Adapter
- Wraps publishers with additional metadata
- Adds destination routing information
- Provides a unified interface for all publishers
-
Publishers
- Pluggable components that handle message delivery
- Implementations available for:
- Azure Service Bus
- Azure Event Hubs
- Console (for debugging)
- Easy to add new implementations via the Publisher interface
[SQL Server] --> [CDC Monitor] --> [Publisher Adapter] --> [Publisher] --> [Destination]
| | | |
| | | |- Service Bus
| | | |- Event Hubs
| | | |- Console
| | |
| | |- Add Metadata
| | |- Route Information
| |
| |- Track LSN
| |- Adaptive Polling
|
|- CDC Enabled Tables
-
Modularity
- Clear separation between components
- Pluggable publishers for different destinations
- Easy to extend and maintain
-
Reliability
- Distributed locking for multiple instances
- Message queuing for reliable delivery
- Graceful shutdown handling
-
Observability
- Structured logging throughout
- Configurable log levels
- Clear error reporting
-
Configuration
- HCL-based configuration
- Environment variable support
- Per-table configuration options
DStream publishes changes in a standardized JSON format:
{
"data": {
"Field1": "Value1",
"Field2": "Value2"
},
"metadata": {
"Destination": "topic-or-queue-name",
"LSN": "00000034000025c80003",
"OperationID": 2,
"OperationType": "Insert|Update|Delete",
"TableName": "TableName"
}
}
The metadata includes:
- Destination for routing
- LSN for tracking
- Operation type (Insert=2, Update=4, Delete=1)
- Source table name
Contributions are welcome! Please submit a pull request or create an issue if you encounter bugs or have suggestions for new features.
This project is licensed under the MIT License. See the LICENSE file for details.