Writing Sagas with Actors, Message Stashing, and IWithTimers
25 minutes to readThe final lesson of Unit 1 of Akka.NET Bootcamp! In this lesson we’re going to write a small “saga” actor, the WordCountJobActor, and use it to complete AkkaWordCounter2.
Starts at the appropriate timestamp for this lesson
Sagas and Actors
Actors are a great fit for modeling sagas - and if you’re not sure what a saga is, it’s basically a term invented by message queue and cloud vendors to turn finite state machines into purchaseable products.
Ok, less cynically: sagas are usually focused on distributed systems-type activity and ensuring that work is done in an orchestrated / coordinated fashion between them. Sagas are a type of finite state machine.
In our case the “distributed services” our saga is coordinating between two sets of independent actors: the WordCounterManager (and its children) and the ParserActors. The WordCountJobActor is our implementation of this saga.
Workflow and Design
When it comes to modeling stateful actor behavior, such as the WordCountJobActor’s, there are two schools of behavior:
- Finite streams - entities that have well-defined beginnings and ends and
- Infinite streams - entities that don’t have well-defined beginnings or ends1.
Our WordCountJobActor is a finite stream:
- We begin by sending the
DocumentCommands.ScanDocuments(IReadOnlyList<AbsoluteUri> DocumentIds)message to theWordCountJobActor- which kicks off all of theParserActorandWordCounterManageractivity. - We finish by returning the
DocumentEvents.CountsTabulatedForDocumentsback to all subscribers who sent us aDocumentQueries.SubscribeToAllCountsmessage.
Let’s illustrate how the WordCountJobActor is going to operate:
stateDiagram-v2
state "OnReceive" as OnReceive {
[*] --> Stashing
Stashing --> Stashing: All other messages
Stashing --> [*]: ScanDocuments
}
%% Top-level transition out of OnReceive
OnReceive --> Running: ScanDocuments
state "Running" as Running {
[*] --> Processing
state "Processing" as Processing
note right of Processing
Receives WordsFound events, forwards to WordCounterManager
end note
Processing --> Processing: CountsTabulatedForDocument
Processing --> [*]: Context.Stop(Self)
}
note right of Running
Streams ScanDocument commands to ParserActors and
streams FetchCounts queries to WordCounterManager
end note
- Once we receive the
DocumentCommands.ScanDocumentsmessage, we’re going to feed all of thoseAbsoluteUris to theParserActors to begin their scans and we’ll send aDocumentQueries.FetchCountsmessage for each document to theWordCounterManagerso it’ll send us a notification back once each document has been fully tabulated. - The
ParserActors are going to streamDocumentEvents.FoundWordsmessages back to us, which we’re going to forward to theWordCounterManagerfor tabulation. - While all of this is happening we may or may not receive 1 or more
DocumentQueries.SubscribeToAllCountsmessage. Any actor who sends us this message will be added to our list of subscribers and will receive aDocumentEvents.CountsTabulatedForDocumentswith our completed output once we’re finished. - As
ParserActors reach the end of their documents, we receiveDocumentEvents.EndOfDocumentReached- which we will also forward to theWordCounterManager. - Once each individual
DocumentWordCounteractor completes its tabulation it will send us aDocumentEvents.CountsTabulatedForDocumentevent in response, which we will save and incorporate into our totals. - After we’ve received all
DocumentEvents.CountsTabulatedForDocumentfor all document ids, we are done. We will merge all of the counts together; deliver it to our subscribers; and shut ourselves down.
None of this code is novel or new - these are all of the actors interactions we have both designed and tested in previous lessons. We’re just putting them all together.
Implementing Prerequisites
Before we write the WordCountJobActor we need to add one small convenience method for merging all of the outputs at the end.
Inside AkkaWordCounter2.App please create a new file called CollectionUtilities.cs and then type in the following:
using System.Collections.Immutable;
namespace AkkaWordCounter2.App;
public static class CollectionUtilities
{
public static IImmutableDictionary<string, int> MergeWordCounts(IEnumerable<IDictionary<string, int>> counts)
{
var mergedCounts = counts.Aggregate(ImmutableDictionary<string, int>.Empty,
(acc, next) =>
{
foreach (var (word, count) in next)
{
acc = acc.SetItem(word, acc.GetValueOrDefault(word, 0) + count);
}
return acc;
});
return mergedCounts;
}
}
This is just a bit of LINQ to make it easier to aggregate all of the DocumentEvents.CountsTabulatedForDocument outputs at the end.
Coding the WordCountJobActor
Please create a new file inside AkkaWordCounter2.App/Actors called WordCountJobActor.cs and type in the following:
using System.Collections.Immutable;
using Akka.Hosting;
using static AkkaWordCounter2.App.DocumentCommands;
using static AkkaWordCounter2.App.DocumentQueries;
using static AkkaWordCounter2.App.CollectionUtilities;
namespace AkkaWordCounter2.App.Actors;
/// <summary>
/// Responsible for processing a batch of documents.
/// </summary>
public sealed class WordCountJobActor : UntypedActor, IWithStash, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IRequiredActor<WordCounterManager> _wordCounterManager;
private readonly IRequiredActor<ParserActor> _parserActor;
public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!;
private readonly HashSet<IActorRef> _subscribers = new();
private readonly Dictionary<AbsoluteUri, ProcessingStatus> _documentsToProcess = new();
private readonly Dictionary<AbsoluteUri, ImmutableDictionary<string, int>> _wordCounts = new();
public enum ProcessingStatus
{
Processing = 0,
Completed = 1,
FailedError = 2,
FailedTimeout = 3
}
public sealed class JobTimeout
{
public static readonly JobTimeout Instance = new();
private JobTimeout(){ }
}
public WordCountJobActor(
IRequiredActor<WordCounterManager> wordCounterManager,
IRequiredActor<ParserActor> parserActor)
{
_wordCounterManager = wordCounterManager;
_parserActor = parserActor;
}
protected override void OnReceive(object message)
{
switch (message)
{
case ScanDocuments scan:
_log.Info("Received scan request for {0}", scan.DocumentIds.Count);
foreach (var document in scan.DocumentIds)
{
_documentsToProcess[document] = ProcessingStatus.Processing;
// begin processing
_parserActor.ActorRef.Tell(new ScanDocument(document));
// get back to us once processing is completed
_wordCounterManager.ActorRef.Tell(new FetchCounts(document));
}
Become(Running);
Timers.StartSingleTimer("job-timeout", JobTimeout.Instance, TimeSpan.FromSeconds(30));
Stash.UnstashAll();
break;
default:
{
// buffer any other messages until the job starts
Stash.Stash();
break;
}
}
}
private void Running(object message)
{
switch (message)
{
case DocumentEvents.WordsFound found:
_wordCounterManager.ActorRef.Forward(found);
break;
case DocumentEvents.EndOfDocumentReached eof:
_wordCounterManager.ActorRef.Forward(eof);
break;
case DocumentEvents.CountsTabulatedForDocument counts:
_log.Info("Received word counts for {0}", counts.DocumentId);
_wordCounts[counts.DocumentId] = counts.WordFrequencies;
_documentsToProcess[counts.DocumentId] = ProcessingStatus.Completed;
HandleJobCompletedMaybe();
break;
case DocumentEvents.DocumentScanFailed failed:
_log.Error("Document scan failed for {0}: {1}", failed.DocumentId, failed.Reason);
_documentsToProcess[failed.DocumentId] = ProcessingStatus.FailedError;
HandleJobCompletedMaybe();
break;
case JobTimeout _:
_log.Error("Job timed out");
// Set all documents that haven't been processed yet to timed out
foreach (var (document, status) in _documentsToProcess)
{
if (status == ProcessingStatus.Processing)
{
_documentsToProcess[document] = ProcessingStatus.FailedTimeout;
}
}
HandleJobCompletedMaybe(true);
break;
case SubscribeToAllCounts:
_subscribers.Add(Sender);
break;
default:
Unhandled(message);
break;
}
}
private void HandleJobCompletedMaybe(bool force = false)
{
if (!IsJobCompleted() && !force) return;
// log statuses of each page
foreach (var (document, status) in _documentsToProcess)
{
_log.Info("Document {0} status: {1}, total words: {2}", document, status,
_wordCounts[document].Values.Sum());
}
// need to merge all the word counts
var mergedCounts = MergeWordCounts(_wordCounts.Values);
var finalOutput =
new DocumentEvents.
CountsTabulatedForDocuments(_documentsToProcess.Keys.ToList(), mergedCounts);
foreach (var subscriber in _subscribers)
{
subscriber.Tell(finalOutput);
}
Context.Stop(Self);
}
private bool IsJobCompleted()
{
return _documentsToProcess.Values.All(x => x > ProcessingStatus.Processing);
}
}
The bulk of that code is the HandleJobCompletedMaybe function, which is just a basic C# method - but it has an important job: determining when and if the job is finished.
You may notice that this actor’s type declaration has the IWithStash and IWithTimers interfaces. These are two important actor behavior mix-ins we can use to augment our actors and give them additional abilities. Let’s break these down first.
Defer Processing of Messages with IWithStash
When you decorate an actor with the IWithStash interface, it adds the following property to the actor:
public IStash Stash { get; set; } = null!;
The IStash will be populated via property dependency injection by Akka.NET itself.
The IStash is typically used in combination with behavior-switching to allow actors to defer processing messages until they’re ready, which is exactly what we’re doing inside the OnReceive behavior:
protected override void OnReceive(object message)
{
switch (message)
{
case ScanDocuments scan:
_log.Info("Received scan request for {0}", scan.DocumentIds.Count);
foreach (var document in scan.DocumentIds)
{
_documentsToProcess[document] = ProcessingStatus.Processing;
// begin processing
_parserActor.ActorRef.Tell(new ScanDocument(document));
// get back to us once processing is completed
_wordCounterManager.ActorRef.Tell(new FetchCounts(document));
}
Become(Running);
Timers.StartSingleTimer("job-timeout", JobTimeout.Instance, TimeSpan.FromSeconds(30));
Stash.UnstashAll();
break;
default:
{
// buffer any other messages until the job starts
Stash.Stash();
break;
}
}
}
We can’t do much with any of the other messages this actor can receive until after it receives the ScanDocuments command - so we just save the message and its Sender inside the Stash via a Stash.Stash() call.
Once we do receive the ScanDocuments command, we call Stash.UnstashAll() - which will empty the contents of the stash and put all of those messages at the front of the actor’s mailbox.
This is because actors try to preserve message order, and any messages that are in the actor’s Stash are guaranteed to be older than any unprocessed messages still sitting inside the mailbox.
Scheduling Future or Recurring Messages with IWithTimers
The next detail we want to examine with the WordCountJobActor is the other mix-in it’s using, the IWithTimers interface - which also adds a property that will be automatically filled in by Akka.NET:
public ITimerScheduler Timers { get; set; } = null!;
The ITimerScheduler is an Akka.NET construct that allows the actor to create either single use timers or recurring timers that will automatically be cancelled in the event that the actor terminates or restarts.
In the case of the WordCountJobActor we use it to schedule a JobTimeout message, which will fire if we don’t receive all of the output from the WordCounterManager within 30 seconds.
Timers.StartSingleTimer("job-timeout", JobTimeout.Instance, TimeSpan.FromSeconds(30));
In the event that we DO receive all of the data we need, we shut this actor down any way - but if we wanted to explicitly cancel that timer and keep the WordCountJobActor we could do that via the following code:
Timers.Cancel("job-timeout");
To schedule a recurring message that appears periodically, something we are not doing in this code sample, just use the ITimerScheduler.StartPeriodicTimer method:
Timers.StartPeriodicTimer(object key, object msg, TimeSpan interval, IActorRef sender)
Akka.Hosting and IRequiredActor<TActor>
We mentioned in “Akka.Hosting, Routers, and Dependency Injection” that one of the reasons for all of these ActorRegistry.Register<TActor>(IActorRef) calls is so we can inject actors using Microsoft.Extensions.DependencyInjection / the IServiceProvider. The IRequiredActor<TActor> type is how we do that:
public WordCountJobActor(
IRequiredActor<WordCounterManager> wordCounterManager,
IRequiredActor<ParserActor> parserActor)
{
_wordCounterManager = wordCounterManager;
_parserActor = parserActor;
}
The IRequiredActor<TActor> gets injected into the constructor of the WordCountJobActor in this instance, but this could just as easily be an ASP.NET Core controller, SignalR Hub, a BackgroundService, or anything else that can support Microsoft.Extensions.
We access the actual IActorRef from the IRequiredActor<TActor> via the .ActorRef property:
// begin processing
_parserActor.ActorRef.Tell(new ScanDocument(document));
Launching the WordCountJobActor
We are almost done with AkkaWordCounter2 - the next thing we need to do is open the AkkaWordCounter2.App/Config/ActorConfigurations.cs file and add the following lines to the bottom of the ActorConfigurations class:
public static AkkaConfigurationBuilder AddJobActor(this AkkaConfigurationBuilder builder)
{
return builder.WithActors((system, registry, resolver) =>
{
var props = resolver.Props<WordCountJobActor>();
var actor = system.ActorOf(props, "job");
registry.Register<WordCountJobActor>(actor);
});
}
public static AkkaConfigurationBuilder AddApplicationActors(this AkkaConfigurationBuilder builder)
{
return builder
.AddWordCounterActor()
.AddParserActors()
.AddJobActor();
}
This will add the AkkaConfigurationBuilder extension methods we want for configuring the WordCountJobActor specifically and an “aggregate” configuration that composes all of the individual actor methods together.
Next, let’s open AkkaWordCounter2.App/Program.cs and add the following code to our AddAkka method:
services.AddAkka("MyActorSystem", (builder, sp) =>
{
builder
.ConfigureLoggers(logConfig =>
{
logConfig.AddLoggerFactory();
})
.AddApplicationActors();
});
Now if we launch our program via dotnet run we should see the following
(nothing)
Nothing!
That’s because, even though we’ve started all of our actors and our ActorSystem via Akka.Hosting - we actually haven’t told them to do anything yet.
Configuring Startup Tasks in Akka.Hosting
What we need to do here is extract the WordCounterSettings we defined in “Using IOptions and Microsoft.Extensions.Configuration” and then use that configuration to populate the DocumentCommands.ScanDocuments message that the WordCountJobActor needs in order to run.
Akka.Hosting has just the thing we need for this: startup tasks, which launch after all of our ActorSystem configuration and actor initialization calls have run!
Add the following code to the AkkaConfigurationBuilder in AkkaWordCounter2.App/Program.cs:
.AddStartup(async (system, registry) =>
{
var settings = sp.GetRequiredService<IOptions<WordCounterSettings>>();
var jobActor = await registry.GetAsync<WordCountJobActor>();
var absoluteUris = settings.Value.DocumentUris.Select(uri => new AbsoluteUri(new Uri(uri))).ToArray();
jobActor.Tell(new DocumentCommands.ScanDocuments(absoluteUris));
// wait for the job to complete
var counts = await jobActor.Ask<DocumentEvents.CountsTabulatedForDocuments>(DocumentQueries.SubscribeToAllCounts.Instance, TimeSpan.FromMinutes(1));
foreach (var (word, count) in counts.WordFrequencies)
{
Console.WriteLine($"Word count for {word}: {count}");
}
});
This code will use the IServiceProvider to resolve the IOptions<WordCounterSettings> we defined earlier.
We will then use the ActorRegistry.GetAsync<TActor> method to retrieve the WordCountJobActor - and from there, we will send it the DocumentCommands.ScanDocuments message with all of the relevant document URIs.
And finally, we will await the DocumentQueries.SubscribeToAllCounts messge and wait for a DocumentEvents.CountsTabulatedForDocuments to return.
Running AkkaWordCounter2
Let’s run AkkaWordCounter2:
dotnet run
And we should see lots out output that looks like this:
Word count for directly: 1
Word count for message: 2
Word count for [contact: 1
Word count for templates: 4
Word count for failures: 2
Word count for includes: 1
Word count for (@AkkaDotNet): 1
Word count for 10s: 1
Word count for over: 1
Word count for locking: 1
Word count for use: 3
Word count for "Akka.Templates::*": 1
Word count for level: 1
Word count for automatically: 1
Word count for as: 4
Word count for Builds: 1
Word count for **Highly: 1
If your application doesn’t show this output please refer to the Unit-1 source code on https://github.com/petabridge/akka-bootcamp/
Wrapping Up
And there we have it!
You’ve finished Unit-1 of Akka.NET Bootcamp. Well done. Want to get notified about future additions to Akka.NET Bootcamp? Click here to learn more.
Further Reading
- Real World Akka.NET Clustering: State Machines
- Real World Akka.NET Clustering: Process Managers and Long-Running Operations - “Process Managers” and “Sagas” are interchangeable terms.
- Akka.NET Actors’ Hidden Super Power: Behavior Switching
-
In the real-world, both infinite and finite streams are pretty common. Finite streams tend to be temporary, bounded jobs: i.e. an order that needs to be fulfilled. Infinite streams tend to represent entities that can have long lifespans, i.e. a customer in the system. And a really important piece of naunce to bear in mind here: just because an object has an “infinite” lifespan doesn’t mean that it’s constantly busy or in-use. Infinite streams tend to have quick bursts of activity periodically, i.e. when the customer is actively placing an order, and then very long periods of inactivity. The actors representing these infinite streams tend to get shut down and then recreated once activity resumes. This is something that Akka.Cluster.Sharding automates. ↩
