We put together a new YouTube video on our Petabridge channel (which you should subscribe to) yesterday, “Akka.NET Actors’ Hidden Super Power: Switchable Behaviors” - all about one of my favorite features in Akka.NET: the Become method for swapping actor message-processing behavior dynamically at runtime.

In addition to the video, I wanted to expand on why behavior-switching is so powerful and how you can use it to transform inherently complex domain problems into something that is approachable, understandable, and expressable with a very small amount of purpose-built code.

Make Stateful Programming Easy with Behaviors

One of the primary drivers for adopting the actor model in the first place is when developers need to build stateful applications. Stateful applications can be client or embedded systems reacting to a combination of network / device / user inputs; or perhaps they’re server-side applications that need to perform concurrent, low-latency processing for best results.

If you’re going down the road of stateful processing you are inevitably going to run into domain problems that call for finite state machines - and this is where Become and actor behavior-switching can really simplify software development.

Example: Long-Running Workflows

Let’s take this example from our video (from the yet-to-be-announced product we’re working on) - conducting a scan of all package versions for a given NuGet package id (i.e. Akka.)

NuGet package scanning workflow

A little bit about this domain case:

  • “Fast” scans can be completed with a trivial amount of I/O and can find the most interesting NuGet package supply chain security issues usually - changes in latest version / ownership / license / signing key;
  • “Slow” scans can take a tremendous amount of I/O if there’s a large number of package versions and they’re used to find issues like retroactive common vulnerabilities and exposure (CVE) notices or deprecation reports.

We don’t want to run “slow” scans constantly - this system is designed to track hundreds of thousands of unique packages, we need to spread that load out over time.

Slow scans are long-running workflows that might take several minutes to complete; the fast scans might take a few seconds. In either case this is a state-driven exercise: diffing our current view of a package’s state with the latest view available on a NuGet feed such as https://nuget.org/.

Therefore, we’re going to use some behavior-switching with Become to make it easy for us to atomize this complexity.

public sealed class PackageDifferenceChecker : UntypedActor, IWithTimers, IWithStash{

	// ctor, properties, other stuff

	protected override void OnReceive(object message)
	{
		switch (message)
		{
		    case PackageScanCommands.CheckPackage checkPackage:
		    {
		        _log.Info("Checking package {0} with query type [{1}]",
		            checkPackage.PackageId, checkPackage.CheckType);
		        _packageCheckerState.ChangeFeedActor.Tell(
		            new ChangeFeedQueries.FetchChangeFeedState(checkPackage.PackageId));
		        Become(FetchingPackageState(checkPackage));
		        break;
		    }
		    default:
                Unhandled(message);
                break;
		}
	}
}

All actors must have a default behavior - in the case of UntypedActor classes that’s going to be your OnReceive method. For ReceiveActors, it’s usually the Receive<T> methods defined inside your actor’s constructor.

Where behavior-switching is going to come into play here is that in order to run either of these two scan types, we need to fetch the current model from our ChangeFeedActor and then decide what to do with that data depending upon the instructions included inside the PackageScanCommands.CheckPackage message. So we’ll define a FetchingPackageState behavior while we wait for the results of our Tell to the ChangeFeedActor.

private Receive FetchingPackageState(PackageScanCommands.CheckPackage command)
{
    return message =>
    {
        switch (message)
        {
            case ChangeFeedQueries.ChangeFeedStateQueryResponse { Status: QueryResponseStatus.Success } s:
            {
                _log.Debug("Success: received initial state for query [Query: {0}] [State: {1}]", command,
                    s.Result);
                switch (command.CheckType)
                {
                    case CheckType.Fast:
                        Debug.Assert(s.Result != null, "s.Result != null");
                        Become(DoFastScan(command, s.Result));
                        break;
                    case CheckType.Slow:
                        Become(PerformSlowScan(command, s.Result));
                        break;
                }

                Self.Tell(DoCheck.Instance);
                return true;
            }
            case ChangeFeedQueries.ChangeFeedStateQueryResponse { Status: QueryResponseStatus.NotFound } s:
            {
                // we always do a slow command for the initial scan
                var slowCommand = command with { CheckType = CheckType.Slow };
                _log.Info("Not found: received state for query [{0}] - need to perform initial scan", 
                	slowCommand, s.Result);

                Become(PerformSlowScan(slowCommand, null));
                Self.Tell(DoCheck.Instance);
                return true;
            }
            case ChangeFeedQueries.ChangeFeedStateQueryResponse { Status: QueryResponseStatus.Error } s:
            {
                var errMsg = $"Error: not able to retrieve state for query [{command}] due to [{s.Message}]";
                _log.Error(errMsg);
                command.ReplyTo.Tell(command.ToFailure(errMsg));
                Become(OnReceive);
                return true;
            }
            case PackageScanCommands.CheckPackage:
            {
                Stash.Stash();
                return true;
            }
            default:
                return false;
        }
    };
}

Note: the call to the ChangeFeedActor can’t time out - it’s an in-memory sibiling of the PackageDifferenceChecker actor. If the ChangeFeedActor dies the PackageDifferenceChecker will also be killed by their shared parent.

To switch behaviors, you just need to call Become(Receive) - and a Receive delegate is just an Action<object>, which is about as simple of a delegate as you can find in .NET.

We close over the original PackageScanCommands.CheckPackage message we were sent because that’s now part of our state and we combine it with the results from ChangeFeedQueries.ChangeFeedStateQueryResponse. You can read for yourself what that code is doing - but in essence, we’re gathering the data we need to execute a fast or a slow scan.

private Receive DoFastScan(PackageScanCommands.CheckPackage command, 
		PackageChangeFeedState changeFeedState)
{
    return message =>
    {
        switch (message)
        {
            case DoCheck:
            {
                // 5 minute timeout for the entire job
                Timers.StartSingleTimer(QueryTooLongTimeout.Instance, QueryTooLongTimeout.Instance,
                    TimeSpan.FromMinutes(5));
                RunTask(async () => { await FetchOwners(command); });
                return true;
            }
            case ImmutableHashSet<PackageOwner> owners:
            {
                ProcessOwners(command, changeFeedState, owners);
                return true;
            }
            case DoStage2:
            {
                RunTask(async () =>
                {
                    // need to fetch all the package version metadata and compare to internal state
                    await FetchMostRecentAndNewerVersions(command, changeFeedState);
                });
                return true;
            }
            case QueryCompleted completed:
            {
                _log.Info("Successfully completed query [{0}] with [{1}] events produced", command,
                    completed.RecordsProcessed);
                command.ReplyTo.Tell(command.ToSuccess());
                ResetBehavior();
                return true;
            }
            case Status.Failure failure:
            {
                ProcessFailure(command, failure);
                return true;
            }
            case QueryTooLongTimeout:
            {
                ProcessTimeout(command);
                return true;
            }
            case PackageScanCommands.CheckPackage:
            {
                Stash.Stash();
                return true;
            }
            default:
                return false;
        }
    };
}

The slow scan has a somewhat different implementation, time constraints, and steps than the fast scan. Yet, all of that detail is abstracted into its own behavior and most importantly: made invisible to the caller requesting the scans in the first place.

This is the key benefit of behavior-switching - all of this complexity gets atomized into internal implementation details that callers don’t have to know about. I don’t need to route messages to two separate actors depending on what scan type I need, since both scans incorporate the same domain + actor and message types - all of that decision-making is made transparent, aka “easy mode.”

In addition to keeping the scan states and behaviors separate by encapsulating each into their own Receive methods, I can also leverage the IStash to impose flow control on a per-behavior basis too.

Deferred Processing of Messages with Stashing

One important detail I want to call attention to is the use of the IStash:

case PackageScanCommands.CheckPackage:
{
    Stash.Stash();
    return true;
}

Stashing is often used in combination with behavior-switching - it allows us to set aside messages the actor has already received but can’t yet process. In this particular instance, we might receive multiple scan requests for the same package (it’s possible both a fast and a slow scan can be requested at the same time) - but in order to keep the actor’s design as simple as possible, we disallow running multiple scans concurrently.

Instead, we call Stash.Stash() to buffer any additional scan requests into an in-memory queue maintained by the PackageDifferenceChecker actor - those requests will be unstashed and placed at the front of the actor’s mailbox once we have completed the current scan.

private void ResetBehavior()
{
    Stash.Unstash(); // if we have any additional pending queries
    Become(OnReceive); // shift back to original behavior
    Timers.CancelAll(); // cancel all timers
}

This resets us back to our original state so we can re-run the scan process all over again, in the event that there are additional requests pending inside the IStash.

Stashing is something you can do using just the base Akka package - it’s succinct, powerful, and low-ceremony. If you need to model finite state machines, long-running processes, sagas, or any other stateful workflow then I strongly recommend that you consider behavior-switching as a means of simplifying your work. You’ll be glad you did.

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on September 17, 2024

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.