Distributing State Reliably with Akka.Cluster.Sharding

A straightforward introduction to Akka.Cluster.Sharding

We’ve had a number of posts on Akka.Cluster.Sharding on this blog:

Both of those were written by Bartosz Sypytkowski, who originally contributed those features to Akka.NET. We’ve had a lot of demand for an updated introduction video to Akka.Cluster.Sharding that explores the concepts, behavior, code, and configuration of Akka.Cluster.Sharding in more detail than previously - and so I’ve produced a video doing just that: “Distributing State Reliably with Akka.Cluster.Sharding

This is my companion blog post to go along with the video - I’m not going to cover everything in the video, but I’ll provide a high-level synopsis that explains:

  • What Akka.Cluster.Sharding does;
  • When you should use it; and
  • How it works.

Watch “Distributing State Reliably with Akka.Cluster.Sharding” for the complete details, or read on for the high level overview.

What Akka.Cluster.Sharding Does and When To Use It

One of the primary use cases for the actor model and Akka.NET in general is: “to make it easy to build low latency, stateful applications safely and inexpensively.”

“Statefulness” is an aspect of systems both big and small. Akka.NET is frequently used in scenarios where building a distributed system is not necessary, i.e. running background tasks inside an ASP.NET Core application.

Akka.Cluster.Sharding is designed to handle large, cloud-scale applications where there might be thousands to millions of entities that all need concurrent processing at any given time.

Some examples of domains where this combination of features is helpful:

  • Industrial IOT applications, such as manufacturing / meter data management systems / real-time device tracking / etc;
  • “Bursty” enterprise applications, such as asset line management / payroll processing / retail analytics / etc;
  • Real-time applications such as chat / collaboration / VoIP / conferencing / and more;
  • Real-time financial applications, such as transaction procssing / fraud detection / real-time pricing / algorithmic trading / etc;
  • Real-time analytics and marketing automation;
  • Many many others, such as multi-player video games.

Akka.Cluster.Sharding is a tool that allows you to distribute stateful entities transparently across an Akka.NET cluster - it works more or less the same as Microsoft Orleans’ virtual actor model, the major difference being that Akka.NET still lets you use local actors alongside sharded / virtual actors no problem.

The taxonomy of Akka.Cluster.Sharding looks like this:

Akka.NET Cluster.Sharding Taxonomy

  • ShardRegion - a type of entity that will get distributed across the cluster; each entity comes with its own user-defined unique actor type;
  • Shard - a unit of distribution for allocating entities across the Akka.NET cluster. It’s the built-in parent in the “child per entity” pattern that Akka.NET leverages internally for this; and
  • Entity - your actor types, each instance representing a single, unique business entity within this specific ShardRegion.

The idea behind Akka.Cluster.Sharding is to achieve the following:

  1. Guarantee exactly 1 unique instance of each entity even across network partitions - this makes Akka.Cluster.Sharding the perfect tool to use in combination with something like Akka.Persistence;
  2. Guarantee an even distribution of state across all nodes hosting the same ShardRegion - if you launch 10 instances of your Akka.NET application into a cluster and all of them start the same ShardRegion, then the Shards in that region are algorithimically guaranteed to be distributed evenly across those 10 nodes. We use consistent hashing to guarantee this.
  3. Redistribute state safely when the cluster expands or contracts - Akka.Cluster.Sharding’s rebalancing mechanism is designed to gradually move state from heavily loaded nodes to less-loaded nodes when Akka.Cluster’s membership expands, pausing network traffic to rebalancing Shards while this is occurring. Akka.Cluster.Sharding will also quickly rebalance Shards off of Akka.Cluster members that are in the process of leaving the cluster too.
  4. Automatically manage the lifecycle of entity actors - entity actors are created on-demand whenever you message the ShardRegion, if that particular entity doesn’t already exist, and if an entity has gone 2 minutes without processing a message the entity actor will be automatically passivated (terminated) in order to keep memory / CPU resources constrainted to just “working” actors at any given time.

How Akka.Cluster.Sharding Works

To get a full picture of how Akka.Cluster.Sharding works, watch “Distributing State Reliably with Akka.Cluster.Sharding”, but I’ll provide a quick overview here using the Akka.Hosting APIs for Akka.Cluster.Sharding.

First, install the Akka.Cluster.Hosting NuGet package:

PS> Install-Package Akka.Cluster.Hosting

Next, we have to create a ShardRegion in order to actually host our entity actors:

builder.Services.AddAkka(ActorSystemConstants.ActorSystemName, configurationBuilder =>
{
	// other config calls
	configurationBuilder.WithShardRegion<IWithItem>("items",
	        s => ItemActor.PropsFor(s, akkaConfiguration.DistributedPubSubOptions.Enabled),
	        new ItemShardExtractor(),
	        new ShardOptions()
	        {
	            RememberEntities = akkaConfiguration.ShardingOptions.RememberEntities,
	            Role = ActorSystemConstants.BackendRole,
	            StateStoreMode = akkaConfiguration.ShardingOptions.UseDData
	                ? StateStoreMode.DData
	                : StateStoreMode.Persistence,
	            PassivateIdleEntityAfter = TimeSpan.FromMinutes(2)
	        });
});

The WithShardRegion<IWithItem> call will insert an IActorRef with the key of IWithItem into the ActorRegistry, that other actors can use. This IActorRef belongs to the ShardRegion for the items entity type - and this is the handle we’re going to use to send messages to our entities.

public static AkkaConfigurationBuilder WithItemMessagingActor(this AkkaConfigurationBuilder builder)
{
	return builder.StartActors((system, registry) => {
		var shardRegion = registry.Get<IWithItem>();
		system.ActorOf(Props.Create(() => new ItemMessagingActor(shardRegion)), "item-messaging");
	});
}

Sending a message to this ShardRegion IActorRef is how we automatically instantiate and message entity actors regardless of where they might be hosted inside your network.

Creating entity actors via messaging in Akka.Cluster.Sharding

s => ItemActor.PropsFor(s, akkaConfiguration.DistributedPubSubOptions.Enabled) - this function is what we use to instantiate entity actors on this node. The string parameter is the unique id of this specific entity, which we pass into the actor’s Props as an argument.

The ItemShardExtractor is user-defined code, responsible for extracting the unique id of each entity from the content of the messages sent to the ShardRegion:

public class ItemShardExtractor : HashCodeMessageExtractor
{
    // 200 nodes, 10 shards per node
    public ItemShardExtractor() : base(200)
    {
    }

    public override string? EntityId(object message)
    {
        if (message is IWithItem itemId)
        {
            return itemId.ItemId;
        }
        return null;
    }
}

If you design your message types using the pattern-oriented message design we’ve been recommending for years, then the IMessageExtractor you pass into the Akka.Cluster.Sharding configuration will look as simple as this.

This particular message extractor inherits from Akka.Cluster.Sharding’s HashCodeMessageExtractor base class, which uses consistent hashing to map entity ids to shard ids - that’s an important detail, as it’s how we guarantee an even distribution that also scatters widely even for similar entity id values. Watch our video on “Consistent Hash Distributions Explained” for an illustration of this algorithm, if you’re curious:

The reason why most users opt to use the HashCodeMessageExtractor is because it’s simple and fast - you specify the maximum number of shards in the constructor and that’s the end of it.

How many shards do you need? 10 per node when your cluster is running at maximum capacity is roughly the “right” amount. If your service is going to be running 400 instances of your ShardRegion-hosting nodes at peak demand, then you’ll want 4000 shards, even if you only run with 20 instances during most hours. This is because the number of shards can’t be changed without restarting your entire cluster - as the shard count is meant to be a stable, agreed-upon value across all running nodes. So just build the headroom in and don’t over-engineer it here.

Finally, we have the ShardOptions type built-into Akka.Hosting:

new ShardOptions()
{
    RememberEntities = akkaConfiguration.ShardingOptions.RememberEntities,
    Role = ActorSystemConstants.BackendRole,
    StateStoreMode = akkaConfiguration.ShardingOptions.UseDData
        ? StateStoreMode.DData
        : StateStoreMode.Persistence,
    PassivateIdleEntityAfter = TimeSpan.FromMinutes(2)
}

The most important setting here is the Role setting, which tells the rest of the cluster “on which Akka.Cluster role can entities belonging to this ShardRegion be found?” This is absolutely essential for heterogeneous Akka.NET clusters.

The other settings control whether or not actors automatically passivate (RememberEntities = true means entity actors are ALWAYS ON until you explicitly kill them) and how long their “idle timeout” should be.

Rebalancing of Entities

Akka.Cluster.Sharding aims to maintain an even distribution of Shards across the cluster at all times - therefore, when nodes join the cluster they’re going to initially begin with 0 entities in each of their ShardRegions:

Akka.Cluster.Sharding rebalance - step 1

An internal component, the ShardCoordinator, will notice that we have a new instance of the items ShardRegion that currently has no shards. Therefore, we’ll move a Shard from host1 to host3:

Akka.Cluster.Sharding rebalance - step 2

Shard 4 is now hosted on host3 - none of its entity actors have been started yet; by default the Shard will wait until someone tries to message any of the entities that are assigned to it before instantiating those entity actors. However, if you set RememberEntities = true all shards will be restarted automatically after a rebalance.

An important detail: message traffic to Shard 4 is paused across the entire cluster while the rebalance is occurring - this is done in order to prevent message loss. It’s not 100% bullet proof, but it’s generally pretty solid. You can use Akka.Delivery on top of Akka.Cluster.Sharding to eliminate message loss altogether.

Akka.Cluster.Sharding rebalance - step 3

Finally, we’ll move one more Shard from host2 onto host3 and once that’s complete, we’re done rebalancing: all shards are now evenly distributed throughout this cluster.

If you want to see what happens when nodes hosting Shards leave the Akka.NET cluster, please watch “Distributing State Reliably with Akka.Cluster.Sharding.”

Observability with Akka.Cluster.Sharding

Finally, we have the issue of observability - given that Akka.NET is managing the lifecycle and instances of entity actors on your behalf, wouldn’t it be nice to know how many there are and where they are allocated?

Phobos Akka.Cluster.Sharding dashboard in Grafana with Prometheus

There are two tools for doing so:

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on March 22, 2024

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.