One of the first questions developers ask once they learn how Akka.NET actors work is
If actors can only process one message at a time, can I still use
async
methods orTask<T>
objects inside my actors?
The answer is yes! You can still use asynchronous methods and Task<T>
objects inside your actors - using the PipeTo
pattern!
(2/05/2022) Update
We have completely revised our guidance around await
vs. PipeTo
in our latest post”: Async / Await vs. PipeTo in Akka.NET Actors”
(8/20/2016) Update
Since Akka.NET 1.0 was released, Akka.NET actors have fully supported async
/ await
inside actors. But there’s a catch involved. We still strongly recommend PipeTo
over asyc
/ await
for performance and cohesion reasons but there are scenarios where the latter makes life easier. Keep reading!
Actors Process Messages One at A Time
So actors process the contents of their inbox like this:
The actor’s mailbox pushes a new message into the actor’s OnReceive
method once the previous call to OnReceive
exits.
This is an important concept, because this is how Akka.NET enforces thread-safety for all of the code that executes inside an actor - by making sure an actor’s message processing code (
OnReceive
) can only be run one invocation at a time.
That being said, it’s still possible to take advantage of async
methods and methods that return Task<T>
objects inside the OnReceive
method - you just have to use the PipeTo
extension method!
Async Message Processing Using PipeTo
The PipeTo
pattern is a dead-simple C# extension method built into Akka.NET that you can tack onto at the end of any Task<T>
object.
public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, ActorRef sender = null)
The goal behind
PipeTo
is to treat every async operation just like any other method that can produce a message for an actor’s mailbox - and that’s the right way to think about actors and concurrentTask<T>
s in Akka.NET.A
Task<T>
is not something youawait
on in Akka.NET - it’s just something else that produces a message that your actor processes through its inbox!
The PipeTo
method takes an ICanTell
object as a required argument, which tells the method where to pipe the results of an asynchronous Task<T>
.
Here are all of the Akka.NET classes that you can use with ICanTell
:
ActorRef
- a reference to an actor instance.ActorSelection
- a selection of actors at a specified address. This is what gets returned whenever you look up an actor based on its path.
Most of the time, you’re going to want to have your actors pipe the results of a task back to themselves. Here’s an example of a real-world use case for PipeTo
, from our official Akka.NET PipeTo code sample.
//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(Self);
});
View the full source for this example..
Whenever you kick off a Task<T>
and use PipeTo
to deliver the results to some ActorRef
or ActorSelection
, here’s how your actor is really processing its inbox.
In this case we’re using PipeTo
to send the results back to itself, but you can just as easily send these results to different actor.
But the important thing to notice in this animation is that the actor is still processing other messages while the asynchronous operation is happening. That’s why
PipeTo
is great for allowing your actors to parallelize long-running tasks, like HTTP requests.
Composing Task<T>
Instances Using ContinueWith
And PipeTo
Have some post-processing you need to do on a Task<T>
before the result gets piped into an actor’s inbox? No problem - you can still use ContinueWith
and all of the other TPL design patterns you used in procedural C# programming.
Here’s another example from our PipeTo code sample:
//asynchronously download the image and pipe the results to ourself
_httpClient.GetAsync(imageUrl).ContinueWith(httpRequest =>
{
var response = httpRequest.Result;
//successful img download
if (response.StatusCode == HttpStatusCode.OK)
{
var contentStream = response.Content.ReadAsStreamAsync();
try
{
contentStream.Wait(TimeSpan.FromSeconds(1));
return new ImageDownloadResult(image, response.StatusCode, contentStream.Result);
}
catch //timeout exceptions!
{
return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
}
}
return new ImageDownloadResult(image, response.StatusCode);
},
TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);
View the full source for this example..
So in this case, we’re downloading an image via a HttpClient inside an Akka.NET actor, and we want to check the status code of the HTTP response before we use PipeTo
to deliver a message back to this actor.
So we do the HTTP code handling inside a ContinueWith
block and use that to return an ImageDownloadResult
message that will be piped to the actor using the PipeTo
block. Pretty easy!
Actors + Async FAQ
Re-posted from the Questions section of our Akka.NET PipeTo
sample.
Why can’t you use async
and await
inside the actor’s OnReceive
methods?
As we discussed in “Akka.NET: What is an Actor?” - the mailbox pushes messages into your actor’s OnReceive
method as soon as the previous iteration of the OnReceive
function exits. So whenever you await
an async
operation inside the OnReceive
method, you prematurely exit the OnReceive
method and the mailbox will push a new message into it.
Update 8/20/2016: The below is no longer true. You can use async
and await
inside actors now and have been able to since Akka.NET 1.0 (released 4 months or so after this blog post.)
Here’s how:
public class MyActor : ReceiveActor{
public MyActor(){
ReceiveAsync<string>(async _ =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
Sender.Tell("done");
});
}
}
On a ReceiveActor
, use ReceiveAsync<T>
where T is the type of message this receive handler expects. From there you can use async
and await
inside the actor to your hearts’ desire.
However, there is a cost associated with this. While your actor awaits any given Task<T>
, the actor will not be able to process any other messages sent to it until it finishes processing the message in its entirety. The reason for this is that effectively each code segment after an await
statement is a Task
continuation, and therefore if that code were to modify or use any of the actor’s internal state it would violate the actor’s state isolation guarantees. I.E. your continuation might be using state that was modified by the actor handling a different message. We suspend the mailbox on actors when using async
and await
until the last continuation is executed in order to avoid this problem.
Rest of the original post as follows.
Await
breaks the “actors process one message at a time” guarantee, and suddenly your actor’s context might be different. Variables such as theSender
of the previous message may be different, or the actor might even be shutting down when theawait
call returns to the previous context.
So just don’t do it. Await
is evil inside an actor. Await
is just syntactic sugar anyway. Use ContinueWith
and PipeTo
instead. Turn the results of async
operations into messages that get delivered into your actor’s inbox and you can take advantage of Task
and TPL methods just like you did before.
What is this TTaskContinuationOptions.ExecuteSynchronously
flag you keep using on ContinueWith
inside an actor?
This is a task continuation hint we pass into the TPL when executing a Task
; and in this case we’re telling the TPL to make sure that the ContinueWith
task gets executed “inline,” meaning on the same thread as the one that completed the preceding task. Sometimes the TPL scheduler can’t always honor this, but it will take the hint and try to inline the task depending on how the underlying scheduler is implemented.
Really, this is just a type of performance optimization I use for tasks that are extremely simple and not resource intensive, i.e. mapping one type of result object to another type.
Do I need to worry about closing over (closures) my actor’s internal state when using PipeTo
?
Yes, you need to close over any state whose value might change between messages that you need to use inside your ContinueWith
or PipeTo
calls.
So for instance, the Sender
property of your actor will almost definitely change between messages. You’ll need to use a C# closure for this property in order to guarantee that any asynchronous methods that depend on this property get the right value.
Here’s an example:
//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
//instance variable for closure
var senderClosure = Sender;
SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
//reply back to the sender
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(senderClosure);
});
Doing a closure is as simple as stuffing the property into an instance variable (var
) and using that instance variable in your call instead of the field or property defined on your actor.
More About PipeTo
Want to see PipeTo
in action? Check out our official Akka.NET PipeTo code sample for a thoroughly documented and explained example.
And feel free to ask us any questions in the comments!
If you liked this post, you can share it with your followers or follow us on Twitter!
- Read more about:
- Akka.NET
- Case Studies
- Videos
Observe and Monitor Your Akka.NET Applications with Phobos
Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?
Click here to learn more.