Akka.NET
See how Akka.NET simplifies state management in concurrent applications

From Traditional to Actor-Based Code

See how Akka.NET simplifies concurrent programming
Traditional Approach
public class UserManager
{
    private readonly Dictionary<string, UserState> _users 
        = new Dictionary<string, UserState>();
    private readonly object _lock = new object();

    public void UpdateUserState(string userId, UserState state)
    {
        lock (_lock)
        {
            if (_users.ContainsKey(userId))
            {
                _users[userId] = state;
            }
        }
    }

    public UserState GetUserState(string userId)
    {
        lock (_lock)
        {
            return _users.GetValueOrDefault(userId);
        }
    }

    // Multiple threads accessing this can cause:
    // - Deadlocks
    // - Race conditions if locks are missed
    // - Performance bottlenecks
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public interface IStockListener
{
    void OnPriceChanged(string symbol, decimal price);
}

public class PriceChangedEventArgs : EventArgs
{
    public string Symbol { get; }
    public decimal Price { get; }

    public PriceChangedEventArgs(string symbol, decimal price)
    {
        Symbol = symbol;
        Price = price;
    }
}

public class StockTracker : IDisposable
{
    public event EventHandler<PriceChangedEventArgs>? 
        PriceChanged;

    private readonly Dictionary<string, decimal> _prices = new();
    private readonly object _lock = new();
    private readonly List<WeakReference<IStockListener>> 
        _listeners = new();
    private readonly Timer _cleanupTimer;

    private readonly Channel<(string Symbol, decimal Price)> 
        _updateChannel;
    private readonly CancellationTokenSource _cts;
    private readonly Task _processingTask;

    public StockTracker()
    {
        _updateChannel = 
            Channel.CreateUnbounded<(string, decimal)>();
        _cts = new CancellationTokenSource();

        // Start background worker to process price updates
        _processingTask = Task.Run(ProcessUpdatesAsync);

        // Periodically clean up dead event listeners
        _cleanupTimer = new Timer(
            _ => CleanupDeadListeners(), 
            null, 
            TimeSpan.FromMinutes(1), 
            TimeSpan.FromMinutes(1));
    }

    public void Subscribe(IStockListener listener)
    {
        lock (_lock)
        {
            _listeners.Add(new WeakReference<IStockListener>(listener));

            // Acknowledge subscription by sending current prices
            foreach (var kvp in _prices)
            {
                try
                {
                    listener.OnPriceChanged(kvp.Key, kvp.Value);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(
                        $"[Warning] Failed to notify listener: {ex.Message}");
                }
            }
        }
    }

    public async Task UpdatePriceAsync(string symbol, decimal price)
    {
        lock (_lock)
        {
            _prices[symbol] = price;
        }

        await _updateChannel.Writer.WriteAsync((symbol, price));
    }

    private async Task ProcessUpdatesAsync()
    {
        try
        {
            await foreach (var (symbol, price) 
                in _updateChannel.Reader
                    .ReadAllAsync(_cts.Token))
            {
                var args = new PriceChangedEventArgs(
                    symbol, 
                    price);
                PriceChanged?.Invoke(this, args);

                List<WeakReference<IStockListener>> listeners;
                lock (_lock)
                {
                    listeners = _listeners.ToList();
                }

                // Notify subscribers asynchronously
                foreach (var weakListener in listeners)
                {
                    if (weakListener.TryGetTarget(out var listener))
                    {
                        try
                        {
                            listener.OnPriceChanged(
                                symbol, 
                                price);
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(
                                $"[Warning] Failed to notify listener: " + 
                                $"{ex.Message}");
                        }
                    }
                }
            }
        }
        catch (OperationCanceledException)
        {
            // Normal shutdown
        }
        catch (Exception ex)
        {
            Console.WriteLine(
                $"[Error] ProcessUpdatesAsync encountered " +
                $"an error: {ex}");
        }
    }

    private void CleanupDeadListeners()
    {
        lock (_lock)
        {
            _listeners.RemoveAll(w => !w.TryGetTarget(out _));
        }
    }

    public void Dispose()
    {
        _cts.Cancel();
        _updateChannel.Writer.Complete();
        _processingTask.Wait();

        _cleanupTimer?.Dispose();
        _cts.Dispose();
    }
}
public class WorkProcessor
{
    private readonly object _lock = new object();
    private bool _isProcessing;
    private int _failureCount;

    public async Task ProcessWork()
    {
        lock (_lock)
        {
            if (_isProcessing) 
                throw new InvalidOperationException();
            _isProcessing = true;
        }

        try
        {
            await DoWork();
            _failureCount = 0;
        }
        catch (Exception ex)
        {
            _failureCount++;
            if (_failureCount > 3)
            {
                // Now what? Entire system is in unknown state
                throw;
            }
            // Retry? Reset? Notify someone?
        }
        finally
        {
            lock (_lock)
            {
                _isProcessing = false;
            }
        }
    }
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class WorkItem
{
    public int Id { get; }
    public WorkItem(int id) => Id = id;
    
    public async Task ExecuteAsync()
    {
        Console.WriteLine($"Processing WorkItem {Id}...");
        await Task.Delay(TimeSpan.FromSeconds(2)); // Simulate work
        Console.WriteLine($"Finished WorkItem {Id}");
    }
}

public class WorkerPool : IDisposable
{
    private readonly Channel<WorkItem> _workQueue;
    private readonly List<Task> _workers = new();
    private readonly int _maxWorkers;
    private readonly CancellationTokenSource _cts;

    public WorkerPool(int maxWorkers)
    {
        _maxWorkers = maxWorkers;
        _workQueue = Channel.CreateUnbounded<WorkItem>();
        _cts = new CancellationTokenSource();

        // Start workers as long-running detached tasks
        for (int i = 0; i < _maxWorkers; i++)
        {
            _workers.Add(Task.Run(
                () => WorkerLoopAsync(_cts.Token), 
                _cts.Token));
        }
    }

    public async Task EnqueueWorkAsync(WorkItem work)
    {
        await _workQueue.Writer.WriteAsync(work);
    }

    private async Task WorkerLoopAsync(CancellationToken token)
    {
        while (await _workQueue.Reader.WaitToReadAsync(token))
        {
            while (_workQueue.Reader.TryRead(out var workItem))
            {
                try
                {
                    await workItem.ExecuteAsync();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(
                        $"[Error] Worker encountered an error: {ex.Message}");
                }
            }
        }
    }

    public void Dispose()
    {
        _cts.Cancel();
        _workQueue.Writer.Complete();
        Task.WhenAll(_workers).Wait();
        _cts.Dispose();
    }
}
Akka.NET Approach
public class UserActor : ReceiveActor
{
    private readonly Dictionary<string, UserState> _users 
        = new Dictionary<string, UserState>();

    public UserActor()
    {
        // Message handlers are processed one at a time
        // No locks needed!
        Receive<UpdateUser>(msg =>
        {
            if (_users.ContainsKey(msg.UserId))
            {
                _users[msg.UserId] = msg.State;
                Sender.Tell(new UserUpdated(msg.UserId));
            }
        });

        Receive<GetUser>(msg =>
        {
            var state = _users.GetValueOrDefault(msg.UserId);
            Sender.Tell(new UserState(msg.UserId, state));
        });
    }

    // State is automatically thread-safe
    // No locks, no race conditions
}
using System;
using System.Collections.Generic;
using Akka.Actor;

public sealed class StockTrackerActor : ReceiveActor
{
    // Message classes
    public sealed record SubscribeToUpdates(string[] Symbols);
    public sealed record UnsubscribeFromUpdates();
    public sealed record UpdateStockPrice(string Symbol, decimal Price);
    public sealed record StockPriceChanged(string Symbol, decimal Price, DateTimeOffset Timestamp);
    public sealed record SubscriptionAck(string[] Symbols);
    public sealed record UnsubscriptionAck();

    private readonly Dictionary<string, decimal> _prices = new();
    private readonly HashSet<IActorRef> _subscribers = new();

    public StockTrackerActor()
    {
        // Handle subscription requests
        Receive<SubscribeToUpdates>(msg =>
        {
            _subscribers.Add(Sender);
            Context.Watch(Sender); // Watch for actor termination

            // Acknowledge subscription
            Sender.Tell(new SubscriptionAck(msg.Symbols));

            // Send current prices for requested symbols
            foreach (var symbol in msg.Symbols)
            {
                if (_prices.TryGetValue(symbol, out var price))
                {
                    Sender.Tell(new StockPriceChanged(symbol, price, DateTimeOffset.UtcNow));
                }
            }
        });

        // Handle unsubscription requests
        Receive<UnsubscribeFromUpdates>(_ =>
        {
            _subscribers.Remove(Sender);
            Context.Unwatch(Sender); // Stop watching this actor
            Sender.Tell(new UnsubscriptionAck());
        });

        // Handle stock price updates
        Receive<UpdateStockPrice>(msg =>
        {
            _prices[msg.Symbol] = msg.Price;
            var notification = new StockPriceChanged(msg.Symbol, msg.Price, DateTimeOffset.UtcNow);

            // Broadcast to all subscribers
            foreach (var subscriber in _subscribers)
            {
                subscriber.Tell(notification);
            }
        });

        // Handle actor termination (when a watched subscriber terminates)
        Receive<Terminated>(msg =>
        {
            _subscribers.Remove(msg.ActorRef);
        });

        // (Optional) Forward externally received StockPriceChanged messages to subscribers.
        Receive<StockPriceChanged>(msg =>
        {
            foreach (var subscriber in _subscribers)
            {
                subscriber.Tell(msg);
            }
        });

        // (Optional) Log SubscriptionAck messages.
        Receive<SubscriptionAck>(msg =>
        {
            Context.GetLogger().Debug($"Subscription acknowledged for symbols: {string.Join(", ", msg.Symbols)}");
        });

        // (Optional) Log UnsubscriptionAck messages.
        Receive<UnsubscriptionAck>(_ =>
        {
            Context.GetLogger().Debug("Unsubscription acknowledged");
        });
    }
}
public class WorkProcessorActor : ReceiveActor
{
    private readonly IActorRef _worker;
    private int _failureCount;

    public WorkProcessorActor()
    {
        _worker = Context.ActorOf(
            Props.Create(() => new WorkerActor()));

        Receive<ProcessWork>(msg =>
        {
            _worker.Tell(msg);
        });
    }

    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: 3,
            withinTimeRange: TimeSpan.FromMinutes(1),
            decider: ex =>
            {
                switch (ex)
                {
                    case RecoverableException _:
                        return Directive.Restart;
                    default:
                        return Directive.Stop;
                }
            });
    }

    // Built-in supervision and recovery
    // Clear error handling policies
    // Automatic restart/stop decisions
}
using System;
using System.Threading.Tasks;
using Akka.Actor;

public class WorkItem
{
    public int Id { get; }
    public WorkItem(int id) => Id = id;
}

public class WorkerActor : ReceiveActor
{
    public WorkerActor()
    {
        Receive<WorkItem>(async msg =>
        {
            Console.WriteLine($"Processing WorkItem {msg.Id}...");
            await Task.Delay(TimeSpan.FromSeconds(2)); // Simulate work
            Console.WriteLine($"Finished WorkItem {msg.Id}");
        });
    }
}

public class WorkerPoolActor : ReceiveActor
{
    private readonly IActorRef _router;

    public WorkerPoolActor()
    {
        _router = Context.ActorOf(
            Props.Create<WorkerActor>()
                .WithRouter(new RoundRobinPool(
                    nrOfInstances: 10,
                    supervisorStrategy: new OneForOneStrategy(
                        maxNrOfRetries: 3,
                        withinTimeRange: TimeSpan.FromMinutes(1),
                        decider: ex => Directive.Restart
                    )
                ))
        );

        Receive<WorkItem>(msg =>
        {
            _router.Tell(msg);
        });
    }

    // Automatic load balancing
    // Built-in scaling
    // Fault tolerance included
    // Easy local => cluster transition
}