Effective Actor Messaging
15 minutes to readActor-to-Actor Messaging
We have our basic AkkaWordCounter
project from the previous lesson and we’re going to modify it to do some more interesting things.
In this particular lesson, we’re going to focus on leveraging actor-to-actor messaging to build something that looks a lot closer to a real-world Akka.NET application.
Goal: Analyze Word Count Frequencies
The goal of this application is to count the frequencies of words as they appear in bodies of text. Therefore, we’re going to need a few components in order to make this happen:
- An actor to store and process the actual frequencies;
- An actor to retrieve and parse a document containing text; and
- A messaging protocol designed to propagate information between these actors and display the final results when completed.
In our live Akka.NET training courses we also recommend starting with the design of your messaging protocols, so what’s what we’re going to do first.
Building Messaging Protocols
In the subsequent units of Akka.NET Bootcamp we’ll be building more sophisticated messaging protocols, but for now we’re going to stick with a fairly simple one. Simple and well-organized.
public static class DocumentCommands {
public sealed record ProcessDocument(string RawText);
}
// Counter Inputs
public static class CounterCommands {
public sealed record CountTokens(IReadOnlyList<string> Tokens);
// parser reached the end of the document
public sealed record ExpectNoMoreTokens();
}
// Counter Queries
public static class CounterQueries {
// Send this actor a notification once counting is complete
public sealed record FetchCounts(IActorRef Subscriber);
}
Copy and paste this code or type it into a new file, Messages.cs
, inside your AkkaWordCounter
project.
Messages as Effects Systems
We’re following a fairly common message organization pattern in Akka.NET: defining messages as effects systems.
- Commands are instructions for actors that they need to do something. Commands are often validated before they’re processed. When a command is successfully processed it will often product an effect, represented as an “event” message typically.
- Events are often the output of commands - they are matters of fact: things that have already happened and need to be incorporated into the state model. Our
AkkaWordCounter
application doesn’t use any events at the moment. - Queries are read-only instructions to actors to either fetch the value of some state now or to notify us about state changes when / if / as they happen.
There are other types of messages that are commonly used too, such as an acknowledgement messages indicating that a command has been accepted or rejected for processing.
This manner of thinking about message definitions is advantageous because it makes it really easy to understand which messages are potentially dangerous (commands) and which ones are not (queries and events.)
Organizing Messages by Entity Type
The first method for organizing messages is by their role in the effects system we described earlier; we combine that method with a second one: grouping messages by the type of entity they act against.
In our case we know we’re going to have two different types of “things” in our application, each represented by its own actor type:
- A document parser actor and
- A counter actor.
Therefore, we’re going to group messages like so:
- Commands: Document ==
static class DocumentCommands
- Commands: Counter ==
static class CounterCommands
- Queries: Counter ==
static class CounterQueries
We’re nesting all of our individual message definitions inside a static class
with a corresponding name just to make it really, really, really clear what type of entity this message is intended for AND what sorts of effects this message might have. That’s why we’re classifying things this way - and it’s good habit to get into from the beginning.
Message Types Should Always Be Immutable
Unless you explicitly configure the Props
to do otherwise, every time you call ActorOf
in Akka.NET you’re going to spawn a local IActorRef
that runs inside the current process.
This means that if you broadcast the message to multiple actors those actors will have access to the same instance of that message in-memory. Therefore, if your messages are mutable (i.e. public setters, uses a mutable collection as a public property, etc) then you run the risk of creating unpredictable side-effects between actors.
The easiest way to avoid this problem is to do the following:
- Always declare your messages as
record
types when working in C# and - Never use a mutable collection - always use either read-only collections or System.Collections.Immutable.
Execution Flow
We’ve defined our messages - so how do we expect these messages to be used within our application? We should imagine this first before we start writing new actor code.
sequenceDiagram
participant Client
participant ParserActor
participant CounterActor
Client->>ParserActor: ProcessDocument(RawText)
Client->>CounterActor: FetchCounts(Subscriber)
ParserActor->>ParserActor: Chunk Tokens
ParserActor->>CounterActor: CountTokens(Tokens)
CounterActor->>CounterActor: Tabulate Frequencies
ParserActor->>CounterActor: ExpectNoMoreTokens()
CounterActor->>CounterActor: Finalize Totals
CounterActor->>Client: Dispense Counts
A really important and powerful detail about this messaging protocol we’ve designed: we are going to send our CounterQueries.FetchCounts
message to the CounterActor
before it processes any data and we will still get the correct result back.
This is a massive difference between actors and regular procedural or object-oriented code: we can defer replying to messages until we’re good and ready in the actor model, wheras a function call has to return something right now. That thing we return could be a promise to fulfill the request later, i.e. a Task<T>
, but as we’ll see throughout bootcamp the actor model is much, much, much more flexible by comparison.
Otherwise, the messaging protocol is simple:
- We have the
ParserActor
start chunking a document intoCountTokens
messages; - The
ParserActor
streams theCountTokens
messages to theCounterActor
; - Once the
ParserActor
reaches the end of the file, we send aExpectNoMoreTokens
message to theCounterActor
; and - The
CounterActor
responds to anyone who has sent it aFetchCounts
message with the final tabluations of all word frequencies.
That’s pretty slick - let’s implement it!
Actor Implementations
First thing we are going to do is implement the CounterActor
. Please create a new file called CounterActor.cs
and then type out (or copy-and-paste) the following:
using static CounterQueries;
using static CounterCommands; // make message handlers less verbose
public sealed class CounterActor : UntypedActor {
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary<string, int> _tokenCounts = new();
private bool _doneCounting = false;
// for actors who sent us a FetchCounts before we were done counting
private readonly HashSet<IActorRef> _subscribers = new();
protected override void OnReceive(object message) {
switch(message){
case CountTokens tokens:
{
foreach(var t in tokens.Tokens){
if(!_tokenCounts.TryAdd(t, 1)){
_tokenCounts[t] += 1;
}
}
break;
}
case ExpectNoMoreTokens:
{
_doneCounting = true;
_log.Info(
"Completed counting tokens - found [{0}] unique tokens",
_tokenCounts.Count);
// ensure the output is immutable
// cheaper to do this once at the end versus every time we count
var totals = _tokenCounts.ToImmutableDictionary();
foreach(var s in _subscribers)
{
s.Tell(totals);
}
// don't need to track subscribers anymore
_subscribers.Clear();
break;
}
case FetchCounts when _doneCounting:
// instantly reply with the results
Sender.Tell(_tokenCounts.ToImmutableDictionary());
break;
case FetchCounts fetch:
{
_subscribers.Add(fetch.Subscriber);
break;
}
default:
Unhandled(message);
break;
}
}
}
Next, we’re going to implement our ParserActor
- create a new file called ParserActor.cs
and type in the following code:
using static DocumentCommands;
using static CounterCommands; // make message handlers less verbose
public sealed class ParserActor : UntypedActor {
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IActorRef _countingActor;
public ParserActor(IActorRef countingActor)
{
_countingActor = countingActor;
}
private const int TokenBatchSize = 10;
protected override void OnReceive(object message){
switch(message){
case ProcessDocument process:
{
// chunk tokens into buckets of 10
foreach(var tokenBatch in process.RawText.Split(" ").Chunk(TokenBatchSize)){
_countingActor.Tell(new CountTokens(tokenBatch));
}
// we are finished
_countingActor.Tell(new ExpectNoMoreTokens());
break;
}
default:
Unhandled(message);
break;
}
}
}
This is all pretty straightforward based on the description of our messages and our execution flow.
Now with both of these actors implements, let’s put it all together.
Starting the Actors
We need to modify Program.cs
to start both the CounterActor
and the ParserActor
.
Type out this code so it executes just before we call .Terminate
on the ActorSystem
.
var counterActor = myActorSystem.ActorOf(Props.Create<CounterActor>(),
"CounterActor");
var parserActor = myActorSystem.ActorOf(Props.Create(() => new ParserActor(counterActor)),
"ParserActor");
Task<IDictionary<string, int>> completionPromise = counterActor
.Ask<IDictionary<string, int>>(@ref => new CounterQueries.FetchCounts(@ref), null,
CancellationToken.None);
parserActor.Tell(new DocumentCommands.ProcessDocument(
"""
This is a test of the Akka.NET Word Counter.
I would go
"""
));
IDictionary<string, int> counts = await completionPromise;
foreach(var kvp in counts)
{
// going to use string interpolation here because we don't care about perf
myActorSystem.Log.Info($"{kvp.Key}: {kvp.Value} instances");
}
/* THIS CODE WAS ALREADY HERE BEFORE */
await myActorSystem.Terminate();
One bit of funny-looking code is the overload of Ask<T>
that we’re using - remember earlier how we said that Ask<T>
creates a temporary, one-time use IActorRef
and uses replies to that IActorRef
to complete the Task<T>
? What this overload is doing is it allows us to explicitly capture that temporary actor reference and pass it into our CounterQueries.FetchCounts
message as an argument.
Otherwise, this code does exactly what we planned in our diagram. Let’s run it and see the results:
[INFO][{DateTime}][Thread 0011][akka://LocalSystem/user/CounterActor] Completed counting tokens - found [11] unique tokens
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] the: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] of: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] Counter.
I: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] Word: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] is: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] test: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] Akka.NET: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] go: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] This: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] a: 1 instances
[INFO][{DateTime}][Thread 0011][ActorSystem(LocalSystem)] would: 1 instances
Looks good! The text isn’t super-interesting yet, but we’re headed in the right direction.
Wrapping Up
So that’ll do for Unit 0! This covers the very basics of Akka.NET and should be enough to get you started designing simple actor applications.
In the next unit, Unit 1 - we’re going to take our Akka.NET education even further and start learning how to integrate Akka.NET with Microsoft.Extensions, create child actors, work with await
, create scheduled messages, and much more.
Further Reading
- Akka.NET Application Design: Don’t Create Bespoke Frameworks; Use Repeatable Patterns - explains the “effects system” paradigm for message definitions in more detail, but this also gets covered more in Lesson 1.