⚡ Concurrency Model Specification
Actor-Based Concurrency with Async/Await for AI/ML Workloads
Overview
STARK employs a sophisticated concurrency model combining async/await with an actor-based message passing system, optimized for AI/ML workloads.
Concurrency Philosophy
STARK's concurrency is built around:
- Structured Concurrency - Clear hierarchical task management with automatic cleanup
- Actor-Based Isolation - Message passing for safe concurrent state management
- Data Parallelism - First-class support for parallel tensor operations
- Work Stealing - Efficient load balancing across CPU cores and devices
- Zero-Cost Abstractions - Compile-time optimization of async operations
- ML-Optimized - Specialized primitives for training and inference workloads
High-Level Concurrency Overview
// High-level concurrency overview
async fn ml_training_pipeline() {
// Structured concurrency with automatic cleanup
let training_scope = async_scope! {
// Data loading in parallel
let data_loader = spawn_task("data", async {
load_and_preprocess_data("train.csv").await
});
// Model initialization
let model = spawn_task("model", async {
create_and_initialize_model(&config).await
});
// Wait for both to complete
let (dataset, model) = join!(data_loader, model);
// Actor-based training coordinator
let trainer = TrainingActor::new(model?, dataset?);
trainer.start_training(epochs: 100).await
};
training_scope.await?;
}
Async/Await Execution Model
Future and Task Abstractions
// Core Future trait
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll;
}
enum Poll {
Ready(T),
Pending
}
// Task spawning and management
struct Task {
id: TaskId,
name: Option,
future: Pin + Send>>,
waker: Option,
executor: ExecutorRef
}
// Task spawning functions
fn spawn(future: impl Future
Structured Concurrency
// Structured concurrency scope
macro_rules! async_scope {
($body:block) => {
async move {
let scope = ConcurrencyScope::new();
let result = async move $body.await;
scope.shutdown().await;
result
}
}
}
// Join combinators
async fn join(a: impl Future, b: impl Future) -> (A, B);
async fn try_join(
a: impl Future>,
b: impl Future>
) -> Result<(A, B), E>;
// Select combinators
async fn select(a: impl Future, b: impl Future) -> Either;
// Timeout and cancellation
async fn timeout(duration: Duration, future: impl Future) -> Result;
// Cancellation token for cooperative cancellation
struct CancellationToken {
inner: Arc
}
impl CancellationToken {
fn new() -> Self;
fn child_token(&self) -> Self;
fn cancel(&self);
fn is_cancelled(&self) -> bool;
async fn cancelled(&self);
fn run_until_cancelled(&self, future: impl Future) -> Option;
}
Runtime Configuration
// Runtime configuration
struct RuntimeConfig {
worker_threads: Option,
max_blocking_threads: usize,
thread_stack_size: Option,
thread_name_prefix: String,
enable_io: bool,
enable_time: bool,
enable_metrics: bool,
scheduler: SchedulerType
}
enum SchedulerType {
WorkStealing,
FIFO,
Custom(Box)
}
// Runtime execution
struct Runtime {
config: RuntimeConfig,
executor: Arc
}
impl Runtime {
fn new() -> Self {
RuntimeBuilder::new().build()
}
fn block_on(&self, future: impl Future) -> T;
fn spawn(&self, future: impl Future + Send + 'static) -> JoinHandle;
fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> JoinHandle;
async fn shutdown(self);
fn metrics(&self) -> RuntimeMetrics;
}
Actor System Implementation
Actor Trait and Lifecycle
// Core Actor trait
trait Actor {
type Message: Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
// Actor initialization
async fn started(&mut self) -> Result<(), Self::Error> { Ok(()) }
// Handle incoming messages
async fn handle_message(&mut self, message: Self::Message) -> Result<(), Self::Error>;
// Actor cleanup
async fn stopped(&mut self) -> Result<(), Self::Error> { Ok(()) }
// Error handling
async fn handle_error(&mut self, error: Self::Error) -> ErrorAction {
ErrorAction::Stop
}
}
enum ErrorAction {
Continue, // Continue processing messages
Restart, // Restart the actor
Stop // Stop the actor
}
// Actor reference for sending messages
struct ActorRef {
id: ActorId,
sender: mpsc::UnboundedSender,
system: WeakActorSystem
}
impl ActorRef {
async fn send(&self, message: M) -> Result<(), SendError>;
fn try_send(&self, message: M) -> Result<(), TrySendError>;
async fn ask(&self, message: impl FnOnce(oneshot::Sender) -> M) -> Result;
fn id(&self) -> ActorId;
fn is_alive(&self) -> bool;
async fn stop(&self);
}
Message Passing Protocols
// Request-Response pattern
struct Request {
data: T,
reply_to: oneshot::Sender
}
impl Request {
fn new(data: T) -> (Self, oneshot::Receiver) {
let (tx, rx) = oneshot::channel();
(Request { data, reply_to: tx }, rx)
}
fn respond(self, response: R) {
let _ = self.reply_to.send(response);
}
}
// Event pattern (fire-and-forget)
struct Event {
data: T,
timestamp: Instant,
source: ActorId
}
// Command pattern
struct Command {
data: T,
correlation_id: Option
}
// Publish-Subscribe messaging
struct EventBus {
subscribers: HashMap>>>
}
impl EventBus {
fn new() -> Self;
fn subscribe(&mut self, subscriber: ActorRef);
fn unsubscribe(&mut self, subscriber: &ActorRef);
fn publish(&self, event: T);
}
Supervision and Fault Tolerance
// Supervisor strategies
enum SupervisionStrategy {
OneForOne, // Restart only the failed actor
OneForAll, // Restart all children when one fails
RestForOne, // Restart failed actor and all actors started after it
Custom(Box)
}
enum SupervisorAction {
Restart,
Stop,
Escalate,
Ignore
}
// Supervisor configuration
struct SupervisorConfig {
strategy: SupervisionStrategy,
max_restarts: u32,
restart_window: Duration,
backoff_strategy: BackoffStrategy
}
enum BackoffStrategy {
None,
Linear { initial: Duration, increment: Duration, max: Duration },
Exponential { initial: Duration, factor: f64, max: Duration },
Custom(Box Duration + Send + Sync>)
}
// Circuit breaker for actor communication
struct ActorCircuitBreaker {
failure_threshold: u32,
success_threshold: u32,
timeout: Duration,
state: CircuitState,
failure_count: u32,
last_failure: Option
}
ML-Specific Concurrency Patterns
Data Parallel Processing
// Parallel data processing for ML workloads
struct DataParallel {
data: Vec,
chunk_size: usize,
num_workers: usize
}
impl DataParallel {
fn new(data: Vec) -> Self;
fn with_chunk_size(mut self, size: usize) -> Self;
fn with_workers(mut self, workers: usize) -> Self;
async fn map(self, f: F) -> Vec
where
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static;
async fn map_async(self, f: F) -> Vec
where
U: Send + 'static,
F: Fn(T) -> Fut + Send + Sync + 'static,
Fut: Future + Send;
}
// Tensor-aware parallel operations
trait TensorParallel {
async fn parallel_apply(&self, f: F) -> Self
where
F: Fn(&[T]) -> Vec + Send + Sync + 'static;
async fn parallel_map_chunks(&self, chunk_size: usize, f: F) -> Tensor
where
F: Fn(&[T]) -> Vec + Send + Sync + 'static,
U: Send + Sync + 'static;
}
Training Pipeline Coordination
// Training coordinator actor
struct TrainingCoordinator {
model: Model,
optimizer: Optimizer,
dataset: Arc,
config: TrainingConfig,
metrics: TrainingMetrics,
workers: Vec>,
epoch: u32,
global_step: u64
}
impl Actor for TrainingCoordinator {
type Message = TrainingMessage;
type Error = TrainingError;
async fn handle_message(&mut self, message: TrainingMessage) -> Result<(), TrainingError> {
match message {
TrainingMessage::StartEpoch { epoch } => {
self.start_epoch(epoch).await
}
TrainingMessage::BatchCompleted { worker_id, batch_id, gradients, metrics } => {
self.aggregate_gradients(worker_id, batch_id, gradients, metrics).await
}
TrainingMessage::EpochCompleted { epoch, metrics } => {
self.finish_epoch(epoch, metrics).await
}
}
}
}
// Training worker actor
struct TrainingWorker {
worker_id: usize,
model: Model,
dataset: Arc,
coordinator: ActorRef,
current_batch: Option
}
// Distributed training coordination
struct DistributedTrainer {
world_size: usize,
rank: usize,
coordinator: ActorRef,
all_reduce_group: CommunicationGroup
}
enum DistributedMessage {
AllReduce { tensors: Vec, reply_to: oneshot::Sender> },
AllGather { tensor: Tensor, reply_to: oneshot::Sender> },
Broadcast { tensor: Tensor, root: usize, reply_to: oneshot::Sender },
Barrier { reply_to: oneshot::Sender<()> }
}
Inference Pipeline
// Inference server actor
struct InferenceServer {
model: Arc,
batch_processor: BatchProcessor,
request_queue: ActorRef>,
response_router: ActorRef,
metrics: InferenceMetrics
}
// Request batching actor
struct BatchingActor {
max_batch_size: usize,
max_wait_time: Duration,
pending_requests: Vec,
batch_timer: Option,
processor: ActorRef
}
// Load balancing actor
struct LoadBalancer {
workers: Vec>,
strategy: LoadBalancingStrategy,
health_checker: ActorRef,
metrics: LoadBalancerMetrics
}
enum LoadBalancingStrategy {
RoundRobin,
LeastConnections,
WeightedRoundRobin { weights: Vec },
ConsistentHashing,
Custom(Box usize + Send + Sync>)
}
Key Benefits
🏗️ Structured Concurrency
Hierarchical task management with automatic cleanup and resource management
🛡️ Actor Isolation
Message passing for safe concurrent state management without data races
⚡ Work Stealing
Efficient load balancing and task scheduling across CPU cores and devices
🤖 ML-Optimized
Specialized patterns for data/model parallelism and distributed training
🔧 Fault Tolerance
Supervision strategies, circuit breakers, and error recovery mechanisms
📊 Production Ready
Load balancing, monitoring integration, and performance metrics