Distributing State Reliably with Akka.Cluster.Sharding
A straightforward introduction to Akka.Cluster.Sharding
We’ve had a number of posts on Akka.Cluster.Sharding on this blog:
- “Introduction to Akka.Cluster.Sharding in Akka.NET”
- “Technical Overview of Akka.Cluster.Sharding in Akka.NET”
Both of those were written by Bartosz Sypytkowski, who originally contributed those features to Akka.NET. We’ve had a lot of demand for an updated introduction video to Akka.Cluster.Sharding that explores the concepts, behavior, code, and configuration of Akka.Cluster.Sharding in more detail than previously - and so I’ve produced a video doing just that: “Distributing State Reliably with Akka.Cluster.Sharding”
This is my companion blog post to go along with the video - I’m not going to cover everything in the video, but I’ll provide a high-level synopsis that explains:
- What Akka.Cluster.Sharding does;
- When you should use it; and
- How it works.
Watch “Distributing State Reliably with Akka.Cluster.Sharding” for the complete details, or read on for the high level overview.
What Akka.Cluster.Sharding Does and When To Use It
One of the primary use cases for the actor model and Akka.NET in general is: “to make it easy to build low latency, stateful applications safely and inexpensively.”
“Statefulness” is an aspect of systems both big and small. Akka.NET is frequently used in scenarios where building a distributed system is not necessary, i.e. running background tasks inside an ASP.NET Core application.
Akka.Cluster.Sharding is designed to handle large, cloud-scale applications where there might be thousands to millions of entities that all need concurrent processing at any given time.
Some examples of domains where this combination of features is helpful:
- Industrial IOT applications, such as manufacturing / meter data management systems / real-time device tracking / etc;
- “Bursty” enterprise applications, such as asset line management / payroll processing / retail analytics / etc;
- Real-time applications such as chat / collaboration / VoIP / conferencing / and more;
- Real-time financial applications, such as transaction procssing / fraud detection / real-time pricing / algorithmic trading / etc;
- Real-time analytics and marketing automation;
- Many many others, such as multi-player video games.
Akka.Cluster.Sharding is a tool that allows you to distribute stateful entities transparently across an Akka.NET cluster - it works more or less the same as Microsoft Orleans’ virtual actor model, the major difference being that Akka.NET still lets you use local actors alongside sharded / virtual actors no problem.
The taxonomy of Akka.Cluster.Sharding looks like this:
ShardRegion
- a type of entity that will get distributed across the cluster; each entity comes with its own user-defined unique actor type;Shard
- a unit of distribution for allocating entities across the Akka.NET cluster. It’s the built-in parent in the “child per entity” pattern that Akka.NET leverages internally for this; andEntity
- your actor types, each instance representing a single, unique business entity within this specificShardRegion
.
The idea behind Akka.Cluster.Sharding is to achieve the following:
- Guarantee exactly 1 unique instance of each entity even across network partitions - this makes Akka.Cluster.Sharding the perfect tool to use in combination with something like Akka.Persistence;
- Guarantee an even distribution of state across all nodes hosting the same
ShardRegion
- if you launch 10 instances of your Akka.NET application into a cluster and all of them start the sameShardRegion
, then theShard
s in that region are algorithimically guaranteed to be distributed evenly across those 10 nodes. We use consistent hashing to guarantee this. - Redistribute state safely when the cluster expands or contracts - Akka.Cluster.Sharding’s rebalancing mechanism is designed to gradually move state from heavily loaded nodes to less-loaded nodes when Akka.Cluster’s membership expands, pausing network traffic to rebalancing
Shard
s while this is occurring. Akka.Cluster.Sharding will also quickly rebalanceShard
s off of Akka.Cluster members that are in the process of leaving the cluster too. - Automatically manage the lifecycle of entity actors - entity actors are created on-demand whenever you message the
ShardRegion
, if that particular entity doesn’t already exist, and if an entity has gone 2 minutes without processing a message the entity actor will be automatically passivated (terminated) in order to keep memory / CPU resources constrainted to just “working” actors at any given time.
How Akka.Cluster.Sharding Works
To get a full picture of how Akka.Cluster.Sharding works, watch “Distributing State Reliably with Akka.Cluster.Sharding”, but I’ll provide a quick overview here using the Akka.Hosting APIs for Akka.Cluster.Sharding.
First, install the Akka.Cluster.Hosting NuGet package:
PS> Install-Package Akka.Cluster.Hosting
Next, we have to create a ShardRegion
in order to actually host our entity actors:
builder.Services.AddAkka(ActorSystemConstants.ActorSystemName, configurationBuilder =>
{
// other config calls
configurationBuilder.WithShardRegion<IWithItem>("items",
s => ItemActor.PropsFor(s, akkaConfiguration.DistributedPubSubOptions.Enabled),
new ItemShardExtractor(),
new ShardOptions()
{
RememberEntities = akkaConfiguration.ShardingOptions.RememberEntities,
Role = ActorSystemConstants.BackendRole,
StateStoreMode = akkaConfiguration.ShardingOptions.UseDData
? StateStoreMode.DData
: StateStoreMode.Persistence,
PassivateIdleEntityAfter = TimeSpan.FromMinutes(2)
});
});
The WithShardRegion<IWithItem>
call will insert an IActorRef
with the key of IWithItem
into the ActorRegistry
, that other actors can use. This IActorRef
belongs to the ShardRegion
for the items
entity type - and this is the handle we’re going to use to send messages to our entities.
public static AkkaConfigurationBuilder WithItemMessagingActor(this AkkaConfigurationBuilder builder)
{
return builder.StartActors((system, registry) => {
var shardRegion = registry.Get<IWithItem>();
system.ActorOf(Props.Create(() => new ItemMessagingActor(shardRegion)), "item-messaging");
});
}
Sending a message to this ShardRegion
IActorRef
is how we automatically instantiate and message entity actors regardless of where they might be hosted inside your network.
s => ItemActor.PropsFor(s, akkaConfiguration.DistributedPubSubOptions.Enabled)
- this function is what we use to instantiate entity actors on this node. The string
parameter is the unique id of this specific entity, which we pass into the actor’s Props
as an argument.
The ItemShardExtractor
is user-defined code, responsible for extracting the unique id of each entity from the content of the messages sent to the ShardRegion
:
public class ItemShardExtractor : HashCodeMessageExtractor
{
// 200 nodes, 10 shards per node
public ItemShardExtractor() : base(200)
{
}
public override string? EntityId(object message)
{
if (message is IWithItem itemId)
{
return itemId.ItemId;
}
return null;
}
}
If you design your message types using the pattern-oriented message design we’ve been recommending for years, then the IMessageExtractor
you pass into the Akka.Cluster.Sharding configuration will look as simple as this.
This particular message extractor inherits from Akka.Cluster.Sharding’s HashCodeMessageExtractor
base class, which uses consistent hashing to map entity ids to shard ids - that’s an important detail, as it’s how we guarantee an even distribution that also scatters widely even for similar entity id values. Watch our video on “Consistent Hash Distributions Explained” for an illustration of this algorithm, if you’re curious:
The reason why most users opt to use the HashCodeMessageExtractor
is because it’s simple and fast - you specify the maximum number of shards in the constructor and that’s the end of it.
How many shards do you need? 10 per node when your cluster is running at maximum capacity is roughly the “right” amount. If your service is going to be running 400 instances of your ShardRegion
-hosting nodes at peak demand, then you’ll want 4000 shards, even if you only run with 20 instances during most hours. This is because the number of shards can’t be changed without restarting your entire cluster - as the shard count is meant to be a stable, agreed-upon value across all running nodes. So just build the headroom in and don’t over-engineer it here.
Finally, we have the ShardOptions
type built-into Akka.Hosting:
new ShardOptions()
{
RememberEntities = akkaConfiguration.ShardingOptions.RememberEntities,
Role = ActorSystemConstants.BackendRole,
StateStoreMode = akkaConfiguration.ShardingOptions.UseDData
? StateStoreMode.DData
: StateStoreMode.Persistence,
PassivateIdleEntityAfter = TimeSpan.FromMinutes(2)
}
The most important setting here is the Role
setting, which tells the rest of the cluster “on which Akka.Cluster role can entities belonging to this ShardRegion
be found?” This is absolutely essential for heterogeneous Akka.NET clusters.
The other settings control whether or not actors automatically passivate (RememberEntities = true
means entity actors are ALWAYS ON until you explicitly kill them) and how long their “idle timeout” should be.
Rebalancing of Entities
Akka.Cluster.Sharding aims to maintain an even distribution of Shards
across the cluster at all times - therefore, when nodes join the cluster they’re going to initially begin with 0
entities in each of their ShardRegion
s:
An internal component, the ShardCoordinator
, will notice that we have a new instance of the items
ShardRegion
that currently has no shards. Therefore, we’ll move a Shard
from host1
to host3
:
Shard
4 is now hosted on host3
- none of its entity actors have been started yet; by default the Shard
will wait until someone tries to message any of the entities that are assigned to it before instantiating those entity actors. However, if you set RememberEntities = true
all shards will be restarted automatically after a rebalance.
An important detail: message traffic to Shard
4 is paused across the entire cluster while the rebalance is occurring - this is done in order to prevent message loss. It’s not 100% bullet proof, but it’s generally pretty solid. You can use Akka.Delivery on top of Akka.Cluster.Sharding to eliminate message loss altogether.
Finally, we’ll move one more Shard
from host2
onto host3
and once that’s complete, we’re done rebalancing: all shards are now evenly distributed throughout this cluster.
If you want to see what happens when nodes hosting Shard
s leave the Akka.NET cluster, please watch “Distributing State Reliably with Akka.Cluster.Sharding.”
Observability with Akka.Cluster.Sharding
Finally, we have the issue of observability - given that Akka.NET is managing the lifecycle and instances of entity actors on your behalf, wouldn’t it be nice to know how many there are and where they are allocated?
There are two tools for doing so:
- Phobos - Akka.NET OpenTelemetry - comes with built-in metrics and dashboards for tracking each
ShardRegion
automatically. - Petabridge.Cmd’s
cluster-sharding
commands - these will allow you to get live views of yourShardRegion
distributions and can even locate, in real-time, individual entity actors within them.
- Read more about:
- Akka.NET
- Case Studies
- Videos
Observe and Monitor Your Akka.NET Applications with Phobos
Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?
Click here to learn more.