One of the first questions developers ask once they learn how Akka.NET actors work is

If actors can only process one message at a time, can I still use async methods or Task<T> objects inside my actors?

The answer is yes! You can still use asynchronous methods and Task<T> objects inside your actors - using the PipeTo pattern!

(2/05/2022) Update

We have completely revised our guidance around await vs. PipeTo in our latest post”: Async / Await vs. PipeTo in Akka.NET Actors

(8/20/2016) Update

Since Akka.NET 1.0 was released, Akka.NET actors have fully supported async / await inside actors. But there’s a catch involved. We still strongly recommend PipeTo over asyc / await for performance and cohesion reasons but there are scenarios where the latter makes life easier. Keep reading!

Actors Process Messages One at A Time

So actors process the contents of their inbox like this:

Animation - Akka.NET actors processing messages in their inbox

The actor’s mailbox pushes a new message into the actor’s OnReceive method once the previous call to OnReceive exits.

This is an important concept, because this is how Akka.NET enforces thread-safety for all of the code that executes inside an actor - by making sure an actor’s message processing code (OnReceive) can only be run one invocation at a time.

That being said, it’s still possible to take advantage of async methods and methods that return Task<T> objects inside the OnReceive method - you just have to use the PipeTo extension method!

Async Message Processing Using PipeTo

The PipeTo pattern is a dead-simple C# extension method built into Akka.NET that you can tack onto at the end of any Task<T> object.

public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, ActorRef sender = null)

The goal behind PipeTo is to treat every async operation just like any other method that can produce a message for an actor’s mailbox - and that’s the right way to think about actors and concurrent Task<T>s in Akka.NET.

A Task<T> is not something you await on in Akka.NET - it’s just something else that produces a message that your actor processes through its inbox!

The PipeTo method takes an ICanTell object as a required argument, which tells the method where to pipe the results of an asynchronous Task<T>.

Here are all of the Akka.NET classes that you can use with ICanTell:

  • ActorRef - a reference to an actor instance.
  • ActorSelection - a selection of actors at a specified address. This is what gets returned whenever you look up an actor based on its path.

Most of the time, you’re going to want to have your actors pipe the results of a task back to themselves. Here’s an example of a real-world use case for PipeTo, from our official Akka.NET PipeTo code sample.

//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
    SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
    _feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(Self);
});

View the full source for this example..

Whenever you kick off a Task<T> and use PipeTo to deliver the results to some ActorRef or ActorSelection, here’s how your actor is really processing its inbox.

Animation - Akka.NET actors processing messages asynchronously in their inbox using PipeTo

In this case we’re using PipeTo to send the results back to itself, but you can just as easily send these results to different actor.

But the important thing to notice in this animation is that the actor is still processing other messages while the asynchronous operation is happening. That’s why PipeTo is great for allowing your actors to parallelize long-running tasks, like HTTP requests.

Composing Task<T> Instances Using ContinueWith And PipeTo

Have some post-processing you need to do on a Task<T> before the result gets piped into an actor’s inbox? No problem - you can still use ContinueWith and all of the other TPL design patterns you used in procedural C# programming.

Here’s another example from our PipeTo code sample:

//asynchronously download the image and pipe the results to ourself
_httpClient.GetAsync(imageUrl).ContinueWith(httpRequest =>
{
    var response = httpRequest.Result;

    //successful img download
    if (response.StatusCode == HttpStatusCode.OK)
    {
        var contentStream = response.Content.ReadAsStreamAsync();
        try
        {
            contentStream.Wait(TimeSpan.FromSeconds(1));
            return new ImageDownloadResult(image, response.StatusCode, contentStream.Result);
        }
        catch //timeout exceptions!
        {
            return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
        }
    }

    return new ImageDownloadResult(image, response.StatusCode);
},
  TaskContinuationOptions.ExecuteSynchronously)
 .PipeTo(Self);

View the full source for this example..

So in this case, we’re downloading an image via a HttpClient inside an Akka.NET actor, and we want to check the status code of the HTTP response before we use PipeTo to deliver a message back to this actor.

So we do the HTTP code handling inside a ContinueWith block and use that to return an ImageDownloadResult message that will be piped to the actor using the PipeTo block. Pretty easy!

Actors + Async FAQ

Re-posted from the Questions section of our Akka.NET PipeTo sample.

Why can’t you use async and await inside the actor’s OnReceive methods?

As we discussed in “Akka.NET: What is an Actor?” - the mailbox pushes messages into your actor’s OnReceive method as soon as the previous iteration of the OnReceive function exits. So whenever you await an async operation inside the OnReceive method, you prematurely exit the OnReceive method and the mailbox will push a new message into it.

Update 8/20/2016: The below is no longer true. You can use async and await inside actors now and have been able to since Akka.NET 1.0 (released 4 months or so after this blog post.)

Here’s how:

public class MyActor : ReceiveActor{
    public MyActor(){
        ReceiveAsync<string>(async _ =>
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
                Sender.Tell("done");
            });
    }
}

On a ReceiveActor, use ReceiveAsync<T> where T is the type of message this receive handler expects. From there you can use async and await inside the actor to your hearts’ desire.

However, there is a cost associated with this. While your actor awaits any given Task<T>, the actor will not be able to process any other messages sent to it until it finishes processing the message in its entirety. The reason for this is that effectively each code segment after an await statement is a Task continuation, and therefore if that code were to modify or use any of the actor’s internal state it would violate the actor’s state isolation guarantees. I.E. your continuation might be using state that was modified by the actor handling a different message. We suspend the mailbox on actors when using async and await until the last continuation is executed in order to avoid this problem.

Rest of the original post as follows.

Await breaks the “actors process one message at a time” guarantee, and suddenly your actor’s context might be different. Variables such as the Sender of the previous message may be different, or the actor might even be shutting down when the await call returns to the previous context.

So just don’t do it. Await is evil inside an actor. Await is just syntactic sugar anyway. Use ContinueWith and PipeTo instead. Turn the results of async operations into messages that get delivered into your actor’s inbox and you can take advantage of Task and TPL methods just like you did before.

What is this TTaskContinuationOptions.ExecuteSynchronously flag you keep using on ContinueWith inside an actor?

This is a task continuation hint we pass into the TPL when executing a Task; and in this case we’re telling the TPL to make sure that the ContinueWith task gets executed “inline,” meaning on the same thread as the one that completed the preceding task. Sometimes the TPL scheduler can’t always honor this, but it will take the hint and try to inline the task depending on how the underlying scheduler is implemented.

Really, this is just a type of performance optimization I use for tasks that are extremely simple and not resource intensive, i.e. mapping one type of result object to another type.

Do I need to worry about closing over (closures) my actor’s internal state when using PipeTo?

Yes, you need to close over any state whose value might change between messages that you need to use inside your ContinueWith or PipeTo calls.

So for instance, the Sender property of your actor will almost definitely change between messages. You’ll need to use a C# closure for this property in order to guarantee that any asynchronous methods that depend on this property get the right value.

Here’s an example:

//time to kick off the feed parsing process, and send the results to ourselves
Receive<BeginProcessFeed>(feed =>
{
	//instance variable for closure
	var senderClosure = Sender;
    SendMessage(string.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));

	//reply back to the sender
    _feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(senderClosure);
});

Doing a closure is as simple as stuffing the property into an instance variable (var) and using that instance variable in your call instead of the field or property defined on your actor.

More About PipeTo

Want to see PipeTo in action? Check out our official Akka.NET PipeTo code sample for a thoroughly documented and explained example.

And feel free to ask us any questions in the comments!

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on January 27, 2015

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.