public class UserManager
{
private readonly Dictionary<string, UserState> _users
= new Dictionary<string, UserState>();
private readonly object _lock = new object();
public void UpdateUserState(string userId, UserState state)
{
lock (_lock)
{
if (_users.ContainsKey(userId))
{
_users[userId] = state;
}
}
}
public UserState GetUserState(string userId)
{
lock (_lock)
{
return _users.GetValueOrDefault(userId);
}
}
// Multiple threads accessing this can cause:
// - Deadlocks
// - Race conditions if locks are missed
// - Performance bottlenecks
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public interface IStockListener
{
void OnPriceChanged(string symbol, decimal price);
}
public class PriceChangedEventArgs : EventArgs
{
public string Symbol { get; }
public decimal Price { get; }
public PriceChangedEventArgs(string symbol, decimal price)
{
Symbol = symbol;
Price = price;
}
}
public class StockTracker : IDisposable
{
public event EventHandler<PriceChangedEventArgs>?
PriceChanged;
private readonly Dictionary<string, decimal> _prices = new();
private readonly object _lock = new();
private readonly List<WeakReference<IStockListener>>
_listeners = new();
private readonly Timer _cleanupTimer;
private readonly Channel<(string Symbol, decimal Price)>
_updateChannel;
private readonly CancellationTokenSource _cts;
private readonly Task _processingTask;
public StockTracker()
{
_updateChannel =
Channel.CreateUnbounded<(string, decimal)>();
_cts = new CancellationTokenSource();
// Start background worker to process price updates
_processingTask = Task.Run(ProcessUpdatesAsync);
// Periodically clean up dead event listeners
_cleanupTimer = new Timer(
_ => CleanupDeadListeners(),
null,
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(1));
}
public void Subscribe(IStockListener listener)
{
lock (_lock)
{
_listeners.Add(new WeakReference<IStockListener>(listener));
// Acknowledge subscription by sending current prices
foreach (var kvp in _prices)
{
try
{
listener.OnPriceChanged(kvp.Key, kvp.Value);
}
catch (Exception ex)
{
Console.WriteLine(
$"[Warning] Failed to notify listener: {ex.Message}");
}
}
}
}
public async Task UpdatePriceAsync(string symbol, decimal price)
{
lock (_lock)
{
_prices[symbol] = price;
}
await _updateChannel.Writer.WriteAsync((symbol, price));
}
private async Task ProcessUpdatesAsync()
{
try
{
await foreach (var (symbol, price)
in _updateChannel.Reader
.ReadAllAsync(_cts.Token))
{
var args = new PriceChangedEventArgs(
symbol,
price);
PriceChanged?.Invoke(this, args);
List<WeakReference<IStockListener>> listeners;
lock (_lock)
{
listeners = _listeners.ToList();
}
// Notify subscribers asynchronously
foreach (var weakListener in listeners)
{
if (weakListener.TryGetTarget(out var listener))
{
try
{
listener.OnPriceChanged(
symbol,
price);
}
catch (Exception ex)
{
Console.WriteLine(
$"[Warning] Failed to notify listener: " +
$"{ex.Message}");
}
}
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
catch (Exception ex)
{
Console.WriteLine(
$"[Error] ProcessUpdatesAsync encountered " +
$"an error: {ex}");
}
}
private void CleanupDeadListeners()
{
lock (_lock)
{
_listeners.RemoveAll(w => !w.TryGetTarget(out _));
}
}
public void Dispose()
{
_cts.Cancel();
_updateChannel.Writer.Complete();
_processingTask.Wait();
_cleanupTimer?.Dispose();
_cts.Dispose();
}
}
public class WorkProcessor
{
private readonly object _lock = new object();
private bool _isProcessing;
private int _failureCount;
public async Task ProcessWork()
{
lock (_lock)
{
if (_isProcessing)
throw new InvalidOperationException();
_isProcessing = true;
}
try
{
await DoWork();
_failureCount = 0;
}
catch (Exception ex)
{
_failureCount++;
if (_failureCount > 3)
{
// Now what? Entire system is in unknown state
throw;
}
// Retry? Reset? Notify someone?
}
finally
{
lock (_lock)
{
_isProcessing = false;
}
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class WorkItem
{
public int Id { get; }
public WorkItem(int id) => Id = id;
public async Task ExecuteAsync()
{
Console.WriteLine($"Processing WorkItem {Id}...");
await Task.Delay(TimeSpan.FromSeconds(2)); // Simulate work
Console.WriteLine($"Finished WorkItem {Id}");
}
}
public class WorkerPool : IDisposable
{
private readonly Channel<WorkItem> _workQueue;
private readonly List<Task> _workers = new();
private readonly int _maxWorkers;
private readonly CancellationTokenSource _cts;
public WorkerPool(int maxWorkers)
{
_maxWorkers = maxWorkers;
_workQueue = Channel.CreateUnbounded<WorkItem>();
_cts = new CancellationTokenSource();
// Start workers as long-running detached tasks
for (int i = 0; i < _maxWorkers; i++)
{
_workers.Add(Task.Run(
() => WorkerLoopAsync(_cts.Token),
_cts.Token));
}
}
public async Task EnqueueWorkAsync(WorkItem work)
{
await _workQueue.Writer.WriteAsync(work);
}
private async Task WorkerLoopAsync(CancellationToken token)
{
while (await _workQueue.Reader.WaitToReadAsync(token))
{
while (_workQueue.Reader.TryRead(out var workItem))
{
try
{
await workItem.ExecuteAsync();
}
catch (Exception ex)
{
Console.WriteLine(
$"[Error] Worker encountered an error: {ex.Message}");
}
}
}
}
public void Dispose()
{
_cts.Cancel();
_workQueue.Writer.Complete();
Task.WhenAll(_workers).Wait();
_cts.Dispose();
}
}
public class UserActor : ReceiveActor
{
private readonly Dictionary<string, UserState> _users
= new Dictionary<string, UserState>();
public UserActor()
{
// Message handlers are processed one at a time
// No locks needed!
Receive<UpdateUser>(msg =>
{
if (_users.ContainsKey(msg.UserId))
{
_users[msg.UserId] = msg.State;
Sender.Tell(new UserUpdated(msg.UserId));
}
});
Receive<GetUser>(msg =>
{
var state = _users.GetValueOrDefault(msg.UserId);
Sender.Tell(new UserState(msg.UserId, state));
});
}
// State is automatically thread-safe
// No locks, no race conditions
}
using System;
using System.Collections.Generic;
using Akka.Actor;
public sealed class StockTrackerActor : ReceiveActor
{
// Message classes
public sealed record SubscribeToUpdates(string[] Symbols);
public sealed record UnsubscribeFromUpdates();
public sealed record UpdateStockPrice(string Symbol, decimal Price);
public sealed record StockPriceChanged(string Symbol, decimal Price, DateTimeOffset Timestamp);
public sealed record SubscriptionAck(string[] Symbols);
public sealed record UnsubscriptionAck();
private readonly Dictionary<string, decimal> _prices = new();
private readonly HashSet<IActorRef> _subscribers = new();
public StockTrackerActor()
{
// Handle subscription requests
Receive<SubscribeToUpdates>(msg =>
{
_subscribers.Add(Sender);
Context.Watch(Sender); // Watch for actor termination
// Acknowledge subscription
Sender.Tell(new SubscriptionAck(msg.Symbols));
// Send current prices for requested symbols
foreach (var symbol in msg.Symbols)
{
if (_prices.TryGetValue(symbol, out var price))
{
Sender.Tell(new StockPriceChanged(symbol, price, DateTimeOffset.UtcNow));
}
}
});
// Handle unsubscription requests
Receive<UnsubscribeFromUpdates>(_ =>
{
_subscribers.Remove(Sender);
Context.Unwatch(Sender); // Stop watching this actor
Sender.Tell(new UnsubscriptionAck());
});
// Handle stock price updates
Receive<UpdateStockPrice>(msg =>
{
_prices[msg.Symbol] = msg.Price;
var notification = new StockPriceChanged(msg.Symbol, msg.Price, DateTimeOffset.UtcNow);
// Broadcast to all subscribers
foreach (var subscriber in _subscribers)
{
subscriber.Tell(notification);
}
});
// Handle actor termination (when a watched subscriber terminates)
Receive<Terminated>(msg =>
{
_subscribers.Remove(msg.ActorRef);
});
// (Optional) Forward externally received StockPriceChanged messages to subscribers.
Receive<StockPriceChanged>(msg =>
{
foreach (var subscriber in _subscribers)
{
subscriber.Tell(msg);
}
});
// (Optional) Log SubscriptionAck messages.
Receive<SubscriptionAck>(msg =>
{
Context.GetLogger().Debug($"Subscription acknowledged for symbols: {string.Join(", ", msg.Symbols)}");
});
// (Optional) Log UnsubscriptionAck messages.
Receive<UnsubscriptionAck>(_ =>
{
Context.GetLogger().Debug("Unsubscription acknowledged");
});
}
}
public class WorkProcessorActor : ReceiveActor
{
private readonly IActorRef _worker;
private int _failureCount;
public WorkProcessorActor()
{
_worker = Context.ActorOf(
Props.Create(() => new WorkerActor()));
Receive<ProcessWork>(msg =>
{
_worker.Tell(msg);
});
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 3,
withinTimeRange: TimeSpan.FromMinutes(1),
decider: ex =>
{
switch (ex)
{
case RecoverableException _:
return Directive.Restart;
default:
return Directive.Stop;
}
});
}
// Built-in supervision and recovery
// Clear error handling policies
// Automatic restart/stop decisions
}
using System;
using System.Threading.Tasks;
using Akka.Actor;
public class WorkItem
{
public int Id { get; }
public WorkItem(int id) => Id = id;
}
public class WorkerActor : ReceiveActor
{
public WorkerActor()
{
Receive<WorkItem>(async msg =>
{
Console.WriteLine($"Processing WorkItem {msg.Id}...");
await Task.Delay(TimeSpan.FromSeconds(2)); // Simulate work
Console.WriteLine($"Finished WorkItem {msg.Id}");
});
}
}
public class WorkerPoolActor : ReceiveActor
{
private readonly IActorRef _router;
public WorkerPoolActor()
{
_router = Context.ActorOf(
Props.Create<WorkerActor>()
.WithRouter(new RoundRobinPool(
nrOfInstances: 10,
supervisorStrategy: new OneForOneStrategy(
maxNrOfRetries: 3,
withinTimeRange: TimeSpan.FromMinutes(1),
decider: ex => Directive.Restart
)
))
);
Receive<WorkItem>(msg =>
{
_router.Tell(msg);
});
}
// Automatic load balancing
// Built-in scaling
// Fault tolerance included
// Easy local => cluster transition
}