Creating Persistent Actors in Akka.NET with Akka.Persistence

How to Create Akka.NET Actors with Durable State

One of the most frequently asked questions I get is about how to create stateful actors that are also durable by default, meaning that the actor can recover its state from some sort of storage engine in the event that the entire Akka.NET process needs to be restarted.

Enter Akka.Persistence - an entire framework built into Akka.NET that’s designed to allow you to create actors with durable state that can be persisted on any database or storage system you want.

In this post we’re going to explore the concepts behind Akka.Persistence, how it works, and what some of the available storage options are.

NOTE: Akka.Persistence is still in beta at the time of publishing this post, so your mileage may vary.

Actors and State

Akka.Persistence is designed to deal with the following concerns Akka and Akka.NET users have had over the years:

  1. How do I guarantee delivery of messages between restarts of a process?
  2. How do I preserve the state of actors between restarts of a process?

A restart of a process can be as simple as shutting down or restarting a console app, ASP.NET process, or a Windows Service - it can also include redeploying a new version of an app in the event of a planned upgrade or an unplanned hardware failure. Ensuring that your actor’s data and any critical messages is persisted and recovered between those events is paramount to building a reliable system. This is where Akka.Persistence comes in.

In a normal Akka.NET actor, all of an actor’s state exists as internal fields within an actor class declaration.

public MyActor : ReceiveActor{
	public class GetMessages{}

	private List<string> _msgs = new List<string>(); //INTERNAL STATE

	public MyActor(){
		Receive<string>(str => _msgs.Add(str));
		Receive<GetMessages>(get => Sender.Tell(new IReadOnlyList<string>(_msgs));
	}

}

In a persistent Akka.NET actor, we still have our application state in-memory - but our actors recover it on restart from a durable store by replaying messages and aggregates of messages from our persistent store.

Here’s what a persistent version of the previous MyActor implementation might look like:

public MyActor : ReceivePersistentActor{
	public class GetMessages{}

	private List<string> _msgs = new List<string>(); //INTERNAL STATE

	public override string PersistenceId
    {
        get
        {
            return "HardCoded";
        }
    }

	public MyActor(){
		// recover
		Recover<string>(str => _msgs.Add(str));

		// commands
		Command<string>(str => Persist(str, s => {
			_msgs.Add(str); //add msg to in-memory event store after persisting
		}));
		Command<GetMessages>(get => Sender.Tell(new IReadOnlyList<string>(_msgs));
	}

}

The first thing you may notice about the persistent implementation of this actor is that there’s no actual persistence code anywhere. No IRepository or DbConnection. Just a call to the Persist method inherited from the ReceivePersistentActor base class.

That’s no accident - Akka.Persistence is driven heavily by configuration (more on that in a moment) and abstracts away the details of which database it’s storing messages to and how those messages are represented in the store.

The Persist method call is what causes the persistent actor to journal each new string message into the configured store. However, the ReceivePersistentActor also happens to differ from a typical ReceiveActor in that we don’t declare any Receie<T> handlers in our constructor. Instead we have two different types of handlers:

  1. Recover<T> - this method handles a message when a persistent actor is going through its “recovery” phase, where it replays stored messages from the database. All of the messages received by any Recover<T> handler are previous messages that were saved to the journal at some point in time.
  2. Command<T> - once the actor has finished recovering its state from the database, it can begin handling normal messages from other actors again. That’s what Command<T> is for - it serves the same purpose as a Receive<T> handler in a typical ReceiveActor.

Snapshots and Journals

In Akka.Persistence there are two different ways we can store events and actor state:

  1. Journals - these are event stores that write individual messages into an immutable, append-only log. All but one of the typed Recover<T> commands are designed to handle stored messages replayed from the journal. This is really where we apply “event sourcing.”
  2. Snapshot Store - rather than storing and replaying each individual message (which can get very expensive once the journal becomes sufficiently large,) you can save a snapshot of the entire state of the actor and replay that all at once. A snapshot is really just an optimization for what would otherwise be really large journals.

Both of these concepts are part of a design pattern called Event Sourcing, which Akka.Persistence uses to recover and persist internal state from a durable store. Event sourcing is ideal for message-oriented systems like Akka.NET actors for a variety of reasons, not the least of which is that it plays nicely with other core Akka.NET concepts such as immutability and reactive messaging.

Working with the Journal

Here’s an example of a persistent actor from Petabridge’s Akka.NET Design Patterns training - a chat room actor responsible for persisting the state of a chat room to disk, much like the Akka.NET Gitter chat:

Akka.NET Event-Sourced Chatroom Actor

Each time a user sends a message to the chat room, a ChatMessage object is produced that contains the following data:

public class ChatMessage : IComparable<ChatMessage>, IRouteToRoom
{
    public ChatMessage(string id, string message, DateTime when, string roomName, string userName)
    {
        UserName = userName;
        RoomName = roomName;
        When = when;
        Message = message;
        Id = id;
    }

    public string Id { get; private set; }

    public string UserName { get; private set; }

    public string RoomName { get; private set; }

    public string Message { get; private set; }

    public DateTime When { get; private set; }
}

In order for our chat room to be able to provide users with a history of messages that were written while they were away, we need to have a persistent record of the messages written into the chatroom over time. Using the Journaling capability of the Akka.Persistence library is a perfect fit for this - it allows us to build a history of the chatroom based on the time each message was sent.

The Akka.Persistence journal is the append-only log that constitutes the “single source of truth” for what actually happened in this chatroom, and after three messages are written to the chatroom the journal will look like this:

Journaled messages over time

Each of these messages will be replayed and added to our actor’s in-memory event store, which is what the actor still uses when interacting with other actors inside your ActorSystem.

There’s no round-tripping back and forth to the database with Akka.Persistence - persistent actors asynchronously flush writes to the database as they receive messages worth saving, but they only read from the database at startup. Otherwise, persistent actors are designed to behave just like any other stateful actor - they respond to requests using data they retain in-memory, the only difference being that the in-memory data is backed up to the database and can be recovered in the event of a crash or restart.

Working with Snapshots

Snapshots are typically worked with in combination with the journal - you want to prevent the journal from growing excessively large, otherwise your actors start to take forever to recover. Therefore it’s a best practice to “compress” the state of your actor into periodic snapshots and delete the journal.

Here’s what our persistent actor implementation might look like if we add snapshots to the mix:

public MyActor : ReceivePersistentActor{
	public class GetMessages{}

	private List<string> _msgs = new List<string>(); //INTERNAL STATE
	private int _msgsSinceLastSnapshot = 0;

	public override string PersistenceId
    {
        get
        {
            return "HardCoded";
        }
    }

	public MyActor(){
		// recover
		Recover<string>(str => _msgs.Add(str)); // from the journal
		Recover<SnapshotOffer>(offer => {
			var messages = offer.Snapshot as List<string>;
			if(messages != null) // null check
				_msgs = _msgs.Concat(messages);
		});

		// commands
		Command<string>(str => Persist(str, s => {
			_msgs.Add(str); //add msg to in-memory event store after persisting
			if(++_msgsSinceLastSnapshot % 100 == 0){
				//time to save a snapshot
				SaveSnapshot(_msgs);
			}
		}));
		Command<SaveSnapshotSuccess>(success => {
			// soft-delete the journal up until the sequence # at
			// which the snapshot was taken
			DeleteMessages(success.Metadata.SequenceNr, false); 
		});
		Command<SaveSnapshotFailure>(failure => {
			// handle snapshot save failure...
		});
		Command<GetMessages>(get => Sender.Tell(new IReadOnlyList<string>(_msgs));
	}

}

There’s more going in inside this actor, but it’s not fundamentally much more code than what we had when we were relying solely on the Akka.Persistence journal. The snapshots in this actor will help us drastically speed up recovery time - specifically it’s the combiation of these two methods that ensure that:

Command<string>(str => Persist(str, s => {
	_msgs.Add(str); //add msg to in-memory event store after persisting
	if(++_msgsSinceLastSnapshot % 100 == 0){
		//time to save a snapshot
		SaveSnapshot(_msgs_);
	}
}));
Command<SaveSnapshotSuccess>(success => {
	// soft-delete the journal up until the sequence # at
	// which the snapshot was taken
	DeleteMessages(success.Metadata.SequenceNr, false); 
});

The Command<string> will persist the message to the journal, but for every 100 messages we’ve journaled we’ll compress the entire state of the actor into a single snapshot via the SaveSnapshot method. That snapshot will be loaded in a single read from the snapshot store using the Recover<SnapshotOffer> recover method the next time a persistent actor with this PeristenceId recovers.

By default, persistent actors will only recover their most recent snapshot - but there are methods available for being able to fetch previous snapshot versions.

The Command<SaveSnapshotSuccess> lets us know that we’ve successfully saved our snapshot and provides us with the sequence number of the snapshot. Using that sequence number we can tell Akka.Persistence to (soft) delete all of the messages in the journal up to that sequence number, which eliminates them as loading overhead.

Now that we’ve covered a little bit about how the persistence mechanisms work, let’s talk about how Akka.Persistence actually works with a physical database store like SQL Server.

How Akka.Persistence Works with Databases

For this section, I’ll use the Akka.Persistence.SqlServer implementation as an example.

One of the key traits of all persistent actors that we have not covered is this abstract field that you have to fill in on every ReceivePersistentActor implementation:

public override string PersistenceId
{
    get
    {
        return "HardCoded";
    }
}

The PersistenceId field is important - it uniquely identifies an entity that is persisting its state using Akka.Persistence, and there should be exactly one persistent actor at any given time for a single PersistenceId.

Think of it this way - imagine you’re using the Child-per-Entity Pattern and have lots of different stateful entities that all need to persist their state to a durable store. The PersistenceId is what allows Akka.Persistence to distinguish which saved events belong to which entity.

In the layout of the SQL Server tables used by Akka.Persistence, here’s what the journal looks like:

SQL Server Akka.Persistence Journal

The PersistenceId and the SequenceNr, together, form the primary key. And the sequence number is a value that monotonically increases in-memory inside the persistent actor - so imagine if you have two actors with the same PersistenceId but different sequence numbers writing to the same store. It will be chaos and will inevitably error out - so that’s why it’s crucial that every PersistenceId be globally unique within your ActorSystem (at least for all actors writing to that store.)

You might also notice that the Payload field consists of bytes and the PayloadType includes the .NET fully-qualified name of the type of message - that’s how Akka.Persistence is able to be so transparent: it serializes messages and writes the binary output into the store.

The SnapshotStore schema in SQL Server is virtually identical:

SQL Server Akka.Persistence SnapshotStore

Available Akka.Persistence Providers

There are lots of Akka.Persistence options already available for you to use as of writing this.

The default, built-in providers are the InMemoryJournal, which journals events in-memory, and the LocalSnapshotStore, which saves snapshots to local disk.

However, we have lots of implementations available from the Akka.NET project itself as well as members of the community:

  1. Akka.Persistence.Sqlite
  2. Akka.Persistence.SqlServer
  3. Akka.Persistence.PostgreSql
  4. Akka.Persistence.MongoDb
  5. Akka.Persistence.EventStore
  6. Akka.Persistence.Azure
  7. Akka.Persistence.Redis

I’ve even seen MySQL and RavenDB implementations running around in the wild, but I don’t know if that code is published.

Configuring Your Akka.Persistence Providers

Within Akka.Persistence, you can configure your snapshot store and your journal independently from each other - you could journal messages to MongoDb and save snapshots to Azure Blob Storage if you wanted to. Akka.Persistence will allow that - all you have to do is specify the providers you want inside the akka.persistence HOCON section and provide the plugin-specific settings required for each plugin.

Here’s an example using Akka.Persistence.SqlServer:

akka.persistence{
  journal {
    plugin = "akka.persistence.journal.sql-server"
    sql-server {
        class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
        schema-name = dbo
        auto-initialize = on
        connection-string = "Data Source=(LocalDB)\\v11.0;AttachDbFilename=|DataDirectory|\\AkkaChat.mdf;Integrated Security=True"
    }
  } 
  snapshot-store{
    plugin = "akka.persistence.snapshot-store.sql-server"
    sql-server {
        class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
        schema-name = dbo
        auto-initialize = on
        connection-string = "Data Source=(LocalDB)\\v11.0;AttachDbFilename=|DataDirectory|\\AkkaChat.mdf;Integrated Security=True"
    }
  }
}

And that’s it - see each individual plugin for reference on what the required settings are.

If you’re interested in learning more about Akka.Persistence, consider signing up for our Akka.NET Design Patterns Virtual Training. We cover it in more depth there.

Otherwise, ask any questions you might have in the comments!

UPDATE: If you’re curious to learn more about Akka.Persistence, check out our post on “Guaranteeing Delivery of Messages with Akka.Persistence.AtLeastOnceDeliveryActor.”

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on January 7, 2016

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.