Thus far we’ve gotten Akka.CQRS to get its work done via clustered consistent hash routing, hand-rolled publish and subscribe, and the child-per-entity pattern. These are all fairly simple building blocks for Akka.Cluster applications.
And yet, it is because of the simplicity of these very building blocks that we haven’t had Akka.CQRS.Pricing or the Akka.CQRS.Pricing.Web applications involved in any of our work thus far. And not to mention we’ve had some issues cleanly moving our OrderBook state on the Akka.CQRS.TradeProcessor service due to the simplistic nature of clustered routers in Akka.NET.
In order to address some of these problems, we’re going to adopt the tools introduced to Akka.NET developers via the Akka.Cluster.Tools and Akka.Cluster.Sharding NuGet packages.
Getting Started
To start this lesson, we need to checkout to the lesson4
branch in our git repository:
PS> git checkout lesson4
First thing we’re going to do is open up the Akka.CQRS.Infrastructure
project and take a look at the Akka.CQRS.Infrastructure.csproj
definition:
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\common.props" />
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Shared, non-domain-specific infrastructure used by various Akka.CQRS services.</Description>
<Configurations>Debug;Release;Phobos</Configurations>
</PropertyGroup>
<ItemGroup>
<None Remove="Ops\phobos.conf" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Ops\ops.conf" />
<EmbeddedResource Include="Ops\phobos.conf" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Cluster.Sharding" Version="$(AkkaVersion)-beta*" />
<PackageReference Include="Akka.Persistence.Extras" Version="$(AkkaPersistenceExtrasVersion)" />
<PackageReference Include="Akka.Bootstrap.Docker" Version="$(AkkaBootstrapVersion)" />
</ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Phobos'">
<!-- Uncomment these to install Phobos binaries -->
<!--<PackageReference Include="Phobos.Actor.Cluster" Version="$(PhobosVersion)" />
<PackageReference Include="Phobos.Tracing.Jaeger" Version="$(PhobosTracingVersion)" />
<PackageReference Include="Phobos.Monitoring.StatsD" Version="$(PhobosMonitoringVersion)" />-->
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Akka.CQRS\Akka.CQRS.csproj" />
</ItemGroup>
You’ll notice that we already have the Akka.Cluster.Sharding package referenced here - since Akka.Cluster.Sharding depends on Akka.Cluster.Tools, and since all of our .*Service
and *.Web
projects reference Akka.CQRS.Infrastructure, this means that we’re good to go on NuGet packages. No need to install any at this point in time. We just need to rebuild the Akka.CQRS.sln and that will automatically restore the packages we need.
NOTE: Akka.Cluster.Sharding depends on Akka.Cluster.Tools because the default “persistent” mode for storing sharding data depends on having access to a Cluster Singleton that can maintain a single, consistent view of the shard allocation data. That’s why this dependency exists.
Now at the moment, we’re not actually using Akka.Cluster.Sharding or any of the Akka.Cluster.Tools features anywhere in our applications thus far. That’s about to change.
The next thing we need to do is to open the Akka.CQRS.Infrastructure.AppBootstrap.cs
file and uncomment the following lines:
config = config
.WithFallback(GetOpsConfig())
.WithFallback(TradeEventSerializer.Config)
//.WithFallback(ClusterSharding.DefaultConfig())
//.WithFallback(DistributedData.DistributedData.DefaultConfig()) // needed for DData sharding
//.WithFallback(ClusterClientReceptionist.DefaultConfig())
//.WithFallback(DistributedPubSub.DefaultConfig())
.BootstrapFromDocker();
Uncomment these lines to enable Cluster.Sharding, Cluster.Client, and DistributedPubSub serialization for all nodes in our cluster.
NOTE: starting with Akka.NET 1.4, this may no longer be necessary as all built-in HOCON configurations will be loaded automatically by Akka.NET configuration library.
Now it’s time for us to introduce these tools where they’re needed inside the Akka.CQRS solution.
Working with Akka.Cluster.Sharding
Akka.Cluster.Sharding is a much more sophisticated version of what we previously accomplished using consistent hash routing and the child-per-entity pattern, but there are some major differences:
- Akka.Cluster.Sharding guarantees that there will be at most one instance of every unique persistent entity in our cluster, even in the event of network partitions;
- Akka.Cluster.Sharding will buffer messages intended for entities that are being moved from one node in the cluster to another - thereby improving the odds that they’ll be delivered during partition hand-offs;
- Akka.Cluster.Sharding is designed to guarantee an even distribution of state across the cluster even as it scales up and down - this is so we don’t run the risk of having some nodes acting as “hot spots” while the rest of the cluster stays idle; and
- Akka.Cluster.Sharding includes some built-in mechanisms for passivating entities - getting them to safely write the rest of their state to a persistent store prior to being stopped and moved onto a new node. Historically this has usually been done by just setting a
ReceiveTimeout
inside your actors, but it can now be done explicitly when setting up theShardRegion
for your entities.
The first thing we’re going to do is create a ShardRegion
inside our Akka.CQRS.TradeProcessor.Service/Program.cs
:
static int Main(string[] args)
{
var config = File.ReadAllText("app.conf");
var conf = ConfigurationFactory.ParseString(config).BoostrapApplication(new AppBootstrapConfig(true, true));
var actorSystem = ActorSystem.Create("AkkaTrader", conf);
var orderBooks = actorSystem.ActorOf(Props.Create(() => new OrderBookMasterActor()), "orderbooks");
// start Petabridge.Cmd (for external monitoring / supervision)
var pbm = PetabridgeCmd.Get(actorSystem);
pbm.RegisterCommandPalette(ClusterCommands.Instance);
pbm.RegisterCommandPalette(ClusterShardingCommands.Instance);
pbm.RegisterCommandPalette(RemoteCommands.Instance);
pbm.Start();
actorSystem.WhenTerminated.Wait();
return 0;
}
Delete the line where we assign orderBooks
and replace it with the following code:
Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
var sharding = ClusterSharding.Get(actorSystem);
var shardRegion = sharding.Start("orderBook", s => OrderBookActor.PropsFor(s), ClusterShardingSettings.Create(actorSystem),
new StockShardMsgRouter());
});
What does this code do? First, the Cluster.RegisterOnMemberUp
method creates a callback that gets invoked once this node has successfully joined the cluster, so none of the code inside that lambda method is run until the cluster is online. You don’t have to wait until your node has joined the cluster before starting the sharding system, but this is something we routinely do at Petabridge.
Next, we get a reference to the sharding system itself:
var sharding = ClusterSharding.Get(actorSystem);
We can use the ClusterSharding
object to spawn new ShardRegion
s or ShardRegionProxies
, which we will get to in a moment. First, we’re going to create an “orderBook” ShardRegion
we can use for hosting OrderBookActor
s:
var shardRegion = sharding.Start("orderBook", s => OrderBookActor.PropsFor(s), ClusterShardingSettings.Create(actorSystem),
new StockShardMsgRouter());
The shardRegion
variable, in this case, is an IActorRef
- and we can use that shardRegion
actor to send and distribute messages to any of the “orderBook” entity actors hosted anywhere in our Akka.Cluster. In order for a node to host “orderBook” entities, it must run the exact same piece of code we’ve just executed here - thankfully, Docker containers do this automatically.
The second argument we pass into the ClusterSharding.Start
method is a Func<String, Props
> - it’s a factory method that receives, as its input, the name of the sharded entity and returns a Props
that can be used to create the actor that will represent that entity inside the sharding system. “How do we know how to name entities inside Akka.Cluster.Sharding?,” you ask?
Using the StockShardMsgRouter
defined in the Akka.CQRS.Infrastructure project, which we need to uncomment in order to allow our solution to compile:
/*
/// <summary>
/// Used to route sharding messages to order book actors hosted via Akka.Cluster.Sharding.
/// </summary>
public sealed class StockShardMsgRouter : HashCodeMessageExtractor
{
/// <summary>
/// 3 nodes hosting order books, 10 shards per node.
/// </summary>
public const int DefaultShardCount = 30;
public StockShardMsgRouter() : this(DefaultShardCount)
{
}
public StockShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string EntityId(object message)
{
if (message is IWithStockId stockMsg)
{
return stockMsg.StockId;
}
if (message is IConfirmableMessageEnvelope<IWithStockId> envelope)
{
return envelope.Message.StockId;
}
return null;
}
}
*/
Uncomment this class - we’re going to use it to route messages any time we use the Akka.Cluster.Sharding system in the Akka.CQRS solution.
The StockShardMsgRouter
implements a HashCodeMessageExtractor
base class from the Akka.Cluster.Sharding namespace - message extractors are used for doing two things in Akka.Cluster.Sharding:
- Determining which entity belongs to which shard - in this case we use consistent hash routing to determine which shard an entity belongs to, and in the base class constructor of the
HashCodeMessageExtractor
we specify the maxmimum number of shards that can exist for the “orderBook”ShardRegion
. If there are 30 shards and the hash value of the entity’s id falls into the portion of the hash range owned by shard 21, then that entity will be created as the child of shard 21. - Determining the names of each entity actor - this is what the
EntityId
method on theStockShardMsgRouter
does. In this instance, we are simply looking for theIWithStockId
interface in the messages sent through the sharding system and any time we find it, we return theStockId
and use that as the entity’s id. All messages with aStockId
of “MSFT” will all be routed to the single “MSFT” actor that exists globally inside our cluster.
For every ShardRegion
, there exists 1-N shards, and each one of those shards acts as the parent actor of all of the individual entity actors underneath. It’s the role of the IMessageExtractor
to determine which entity a message belongs to and which shard that entity belongs to. Sharded entity actors are created on-demand when they’re sent a message for the very first time, either via a ShardRegion
or a ShardRegionProxy
. That’s why the StockShardMessageRouter
is the next crucial piece in our setup for Akka.Cluster.Sharding.
ShardRegionProxy
and the Akka.CQRS.TradePlacers.Service
Now that we’re capable of hosting the “orderBook” ShardRegion
on all of the Akka.CQRS.TradeProcessor.Service instances, we need to fix the Akka.CQRS.TradePlacers.Service so it can communicate with our sharded actors instead of using the consistent hash router we depended on earlier.
Let’s take a look at Akka.CQRS.TradePlacers/Program.cs
and make some changes:
static int Main(string[] args)
{
var config = File.ReadAllText("app.conf");
var conf = ConfigurationFactory.ParseString(config).BoostrapApplication(new AppBootstrapConfig(false, true));
var actorSystem = ActorSystem.Create("AkkaTrader", conf);
var tradeRouter = actorSystem.ActorOf(
Props.Empty.WithRouter(new ClusterRouterGroup(
new ConsistentHashingGroup(new[] {"/user/orderbooks"},
TradeEventConsistentHashMapping.TradeEventMapping),
new ClusterRouterGroupSettings(10000, new []{ "/user/orderbooks" }, true, useRole:"trade-processor"))), "tradesRouter");
Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
var shardRegionProxy = tradeRouter;
var subManager = new ActorTradeSubscriptionManager(tradeRouter);
foreach (var stock in AvailableTickerSymbols.Symbols)
{
var max = (decimal)ThreadLocalRandom.Current.Next(20, 45);
var min = (decimal) ThreadLocalRandom.Current.Next(10, 15);
var range = new PriceRange(min, 0.0m, max);
// start bidders
foreach (var i in Enumerable.Repeat(1, ThreadLocalRandom.Current.Next(1, 6)))
{
actorSystem.ActorOf(Props.Create(() => new BidderActor(subManager, stock, range, shardRegionProxy)));
}
// start askers
foreach (var i in Enumerable.Repeat(1, ThreadLocalRandom.Current.Next(1, 6)))
{
actorSystem.ActorOf(Props.Create(() => new AskerActor(subManager, stock, range, shardRegionProxy)));
}
}
});
// start Petabridge.Cmd (for external monitoring / supervision)
var pbm = PetabridgeCmd.Get(actorSystem);
pbm.RegisterCommandPalette(ClusterCommands.Instance);
pbm.RegisterCommandPalette(ClusterShardingCommands.Instance);
pbm.RegisterCommandPalette(RemoteCommands.Instance);
pbm.Start();
actorSystem.WhenTerminated.Wait();
return 0;
}
First things first, delete the tradeRouter
- we’re not going to need it anymore.
Next, replace the following two lines:
var shardRegionProxy = tradeRouter;
var subManager = new ActorTradeSubscriptionManager(tradeRouter);
With these ones:
var sharding = ClusterSharding.Get(actorSystem);
var shardRegionProxy = sharding.StartProxy("orderBook", "trade-processor", new StockShardMsgRouter());
var subManager = Akka.CQRS.Subscriptions.DistributedPubSub.DistributedPubSubTradeEventSubscriptionManager.For(actorSystem);
No further code changes are required to Program.cs
beyond this point. What this code does is two things:
- Creates a
ShardRegionProxy
that theBidderActor
andAskerActor
instances can use to transmit trades to theOrderBookActor
s hosted inside the “orderBook”ShardRegion
on the Akka.CQRS.TradeProcessor.Service nodes. TheShardRegionProxy
, as you may notice, uses the exact sameStockShardMsgRouter
to distribute messages to the entity actors - this is no accident. In order for any node to be able to reliably contact a sharded entity actor it must have a copy of the sameIMessageExtractor
algorithm, which is why they’re typically defined in shared libraries like Akka.CQRS.Infrastructure. - We’re now using
Akka.Cluster.Tools.DistributedPubSub
implementation of theITradeEventSubscriptionManager
- an abstraction we created in order to make it easier to switch from a hand-rolled pub-sub implementation toDistributedPubSub
for the purposes of this workshop.
We’ll get back to DistributedPubSub
in a moment, but for the time being - ShardRegionProxy
.
sharding.StartProxy("orderBook", "trade-processor", new StockShardMsgRouter());
Here’s what this call does:
- Create a
ShardRegionProxy
for region “orderBook”; - Tell the
ShardRegionProxy
to look for “orderBook”ShardRegion
s only on Akka.Cluster nodes with a defined “trade-processor” role; and - Use the
StockShardMsgRouter
to determine the shard number and entity id for all messages we want to distribute to “orderBook” entity actors.
The ShardRegionProxy
is essentially a pointer to the sharded entities in the cluster - it gets its information from the ShardingCoordinator
and then uses that to direct its messages towards the appropriate node in the cluster. Whenever the shape or topology of the Akka.Cluster.Sharding system changes (i.e. when shards are rebalanced or a ShardRegion
leaves the cluster) those changes will also be propagated to all of the ShardRegionProxy
instances - just like their ShardRegion
counterparts.
Adding Akka.Cluster.Sharding HOCON to All Nodes
One thing that is currently missing is that none of our services currently have the HOCON necessary to tell Akka.Cluster.Sharding which nodes can host shards and which nodes cannot. We’re going to fix that issue right now.
Configure Akka.CQRS.Pricing.Service
First, open the Akka.CQRS.Pricing.Service/app.conf file and insert the following content INSIDE the akka.cluster
HOCON section:
akka {
extensions = ["Akka.Cluster.Tools.Client.ClusterClientReceptionistExtensionProvider, Akka.Cluster.Tools"]
actor {
provider = cluster
}
remote {
dot-netty.tcp {
hostname = "127.0.0.1"
port = 6055
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = ["akka.tcp://[email protected]:6055"]
roles = ["pricing-engine" , "trade-events"]
pub-sub{
role = "trade-events"
}
sharding{
role = "pricing-engine"
state-store-mode = ddata
}
client.receptionist.role = pricing-engine # stops ClusterClient gossip from going to the worker nodes
}
persistence{
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb.class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
mongodb.collection = "EventJournal"
mongodb.event-adapters = {
stock-tagger = "Akka.CQRS.Infrastructure.StockEventTagger, Akka.CQRS.Infrastructure"
}
mongodb.event-adapter-bindings = {
"Akka.CQRS.IWithStockId, Akka.CQRS" = stock-tagger
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb.class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
mongodb.collection = "SnapshotStore"
}
query {
mongodb {
class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
refresh-interval = 1s
}
}
}
}
This will set up everything we need for Akka.Cluster.Sharding, ClusterClient, and DistributedPubSub inside the Akka.CQRS.Pricing.Service. Effectively, what we’re doing is telling these different Akka.Cluster components “which roles can X run on?” - in the case of DistributedPubSub
, it’s only the nodes with the trade-events
role that have access to the DistributedPubSubMediator
- the actor responsible for circulating topics and the addresses of nodes who are subscribed to those topics. Ditto for Akka.Cluster.Tools.ClusterClient - only “pricing-engine” nodes can host the ClusterClientReceiptionist
who accepts incoming connections from remote ClusterClient
instances.
Notice that we’re saving the Akka.Cluster.Sharding state using Akka.DistributedData, an in-memory eventually consistent entity replication system for Akka.NET. Akka.DistributedData is still in beta (until Akka.NET v1.4.0 is releaesed) but we’re going to use it here so we can avoid having yet-another-thing depend on MongoDb.
Also, notice that the role for Akka.Cluster.Sharding is “pricing-engine” rather than “trade-processor” - this is because the Akka.CQRS.Pricing.Service hosts its own sharded entities, “priceAggregator”s, and those are kept separate from the “orderBook” enities hosted on the Akka.CQRS.TradeProcessor.Service.
Configure Akka.CQRS.TradePlacers.Service
Next, we need to configure the Akka.CQRS.TradePlacers.Service/app.conf
file and insert the following content INSIDE the akka.cluster
HOCON section:
akka {
actor {
provider = cluster
deployment{
/tradesRouter {
router = consistent-hashing-group
routees.paths = ["/user/orderbooks"]
cluster {
enabled = on
use-role = trade-processor
}
}
}
}
remote {
dot-netty.tcp {
hostname = "127.0.0.1"
port = 5054
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = ["akka.tcp://[email protected]:5055"]
roles = ["trader", "trade-events"]
pub-sub{
role = "trade-events"
}
sharding{
role = "trade-processor"
}
}
}
Configure Akka.CQRS.TradeProcessor.Service
Finally, we need to configure the Akka.CQRS.TradeProcessor.Service/app.conf
file and insert the following content INSIDE the akka.cluster
HOCON section:
akka {
actor {
provider = cluster
}
remote {
dot-netty.tcp {
hostname = "127.0.0.1"
port = 5055
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = ["akka.tcp://[email protected]:5055"]
roles = ["trade-processor" , "trade-events"]
pub-sub{
role = "trade-events"
}
sharding{
role = "trade-processor"
state-store-mode = ddata
}
}
persistence{
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb.class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
mongodb.collection = "EventJournal"
#mongodb.stored-as = binary
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb.class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
mongodb.collection = "SnapshotStore"
#mongodb.stored-as = binary
}
}
}
This will ensure that the “orderBook” ShardRegion
will be hosted correctly once it’s started by the Akka.CQRS.TradeProcessor.Service
.
Working with DistributedPubSub
Now that we have Akka.Cluster.Sharding up and running, we should talk about the role of DistributedPubSub
inside Akka.CQRS.
Prior to the changes we implement in this lesson, all of the publish-subscribe activity inside the Akka.CQRS solution between the TradePlacer and TradeProcessor services was done using custom mechanisms like the InMemoryTradeEventPublisher
:
/// <summary>
/// Used locally, in-memory by a single order book actor. Belongs to a single ticker symbol.
/// </summary>
public sealed class InMemoryTradeEventPublisher : TradeEventSubscriptionManagerBase, ITradeEventPublisher
{
private readonly Dictionary<TradeEventType, HashSet<IActorRef>> _subscribers;
public InMemoryTradeEventPublisher() : this(new Dictionary<TradeEventType, HashSet<IActorRef>>()) { }
public InMemoryTradeEventPublisher(Dictionary<TradeEventType, HashSet<IActorRef>> subscribers)
{
_subscribers = subscribers;
}
public void Publish(string tickerSymbol, ITradeEvent @event)
{
var eventType = @event.ToTradeEventType();
EnsureSub(eventType);
foreach(var sub in _subscribers[eventType])
sub.Tell(@event);
}
public override Task<TradeSubscribeAck> Subscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
{
foreach (var e in events)
{
EnsureSub(e);
_subscribers[e].Add(subscriber);
}
return Task.FromResult(new TradeSubscribeAck(tickerSymbol, events));
}
private void EnsureSub(TradeEventType e)
{
if (!_subscribers.ContainsKey(e))
{
_subscribers[e] = new HashSet<IActorRef>();
}
}
public override Task<TradeUnsubscribeAck> Unsubscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
{
foreach (var e in events)
{
EnsureSub(e);
_subscribers[e].Remove(subscriber);
}
return Task.FromResult(new TradeUnsubscribeAck(tickerSymbol, events));
}
}
We ran one of these inside each individual OrderBookActor
and used that to publish back and forth directly to the AskerActor
and BidderActor
instances running on each of the Akka.CQRS.TradePlacers.Service nodes - all of the subscription state was, therefore, contained locally inside the OrderBookActor
’s memory. This approach has a number of important limitations:
- In the event that the
OrderBookActor
died and had to be recreated, possibly even on a new node, the subscriber actors had to be actively programmed to recreate their subscription to the newOrderBookActor
once it was recreated. This can be difficult to do and can result in pauses in data availability to subscribers. - All actors who want to subscribe to order book events for a specific ticker symbol need to know how to contact the
OrderBookActor
s directly.
We’re able to resolve these issues through the use of Akka.Cluster.Tools.DistributedPubSub
, a decentralized message/topic broker that allows individual actors to subscribe to and publish to topics throughout the cluster without having to directly contact any of the other actors involved.
The DistributedPubSubTradeEventSubscriptionManager
is an abstraction we use inside Akka.CQRS to help make it easy for actors to subscribe to DistributedPubSub
topics published by the OrderBookActor
s without having to know the exact place and location of them in the cluster at any given time:
// <summary>
/// <see cref="ITradeEventSubscriptionManager"/> that uses the <see cref="DistributedPubSub.Mediator"/> under the hood.
/// </summary>
public sealed class DistributedPubSubTradeEventSubscriptionManager : TradeEventSubscriptionManagerBase
{
private readonly IActorRef _mediator;
public DistributedPubSubTradeEventSubscriptionManager(IActorRef mediator)
{
_mediator = mediator;
}
public override async Task<TradeSubscribeAck> Subscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
{
var tasks = ToTopics(tickerSymbol, events).Select(x =>
_mediator.Ask<SubscribeAck>(new Subscribe(x, subscriber), TimeSpan.FromSeconds(3)));
await Task.WhenAll(tasks).ConfigureAwait(false);
return new TradeSubscribeAck(tickerSymbol, events);
}
public override async Task<TradeUnsubscribeAck> Unsubscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
{
var tasks = ToTopics(tickerSymbol, events).Select(x =>
_mediator.Ask<UnsubscribeAck>(new Unsubscribe(x, subscriber), TimeSpan.FromSeconds(3)));
await Task.WhenAll(tasks).ConfigureAwait(false);
return new TradeUnsubscribeAck(tickerSymbol, events);
}
public static DistributedPubSubTradeEventSubscriptionManager For(ActorSystem sys)
{
var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
return new DistributedPubSubTradeEventSubscriptionManager(mediator);
}
}
DistributedPubSub
works through a built-in actor called the “mediator”, which we can access through the DistributedPubSub.Get(sys).Mediator
method - the mediator is responsible for doing the following things:
- Memorizing which actors are subscribed to which topics on the current local node;
- Gossiping to mediators running on other nodes (usually ones running in a specific role) about which topics any actor on the current local nodes are subscribed to; and
- Executing
Publish
operations out to all of the appropriate local actors AND to the mediators running on other nodes which also have subscribers for the published topic.
To subscribe to a topic in DistributedPubSub
, we just need to send a Subscribe(string topicId, IActorRef subscriber)
message to the mediator actor:
var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
var subscribeAsk = await _mediator.Ask<SubscribeAck>(new Subscribe(x, subscriber), TimeSpan.FromSeconds(3)));
To publish to a topic in DistributedPubSub
, we need to send a Publish(string topicId, object msg)
to the mediator:
var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
mediator.Tell(new Publish("MSFT", new Bid(...));
DistributedPubSub is traffic-optimized. If there are a thousand actors running on node 2 that are all subscribed to topic “MSFT” and an actor on node 1 publishes an event to the topic “MSFT”, then the mediator on node 1 will only publish 1 copy of that message to the mediator running on node 2. The mediator on node 2 will then publish that message for topic “MSFT” locally, in-memory to all of the actors running on node who are subscribed to the “MSFT” topic.
In the course of the code that we changed earlier, we ensured that the OrderBookActor
is now running using DistributedPubSub
because the Cluster.Sharding system is starting the actor with the following constructor:
public OrderBookActor(string tickerSymbol)
: this(tickerSymbol, null,
DistributedPubSubTradeEventPublisher.For(Context.System),
NoOpTradeEventSubscriptionManager.Instance, Context.Parent) { }
So all of the OrderBookActor
ITradeEvent
instances are now published to topics that can be subscribed to and from any other node in the cluster that has access to DistributedPubSub
. This makes it possible for us to get the Akka.CQRS.Pricing service up and running for the first time.
Running Akka.CQRS.Pricing
The Pricing service is designed to aggregate Match
events emitted by the OrderBookActor
s running on the Akka.CQRS.TradeProcessor.Service nodes and use that to generate real-time pricing and volume data about the market activity made against each trade symbol. So far the Akka.CQRS.Pricing.Service has been disabled in our solution because we really needed DistributedPubSub
in order to make it feasible to run, and that’s exactly what we’re going to fix now.
Open the Akka.CQRS.Pricing.Service/Program.cs
file and uncomment the main method body:
static int Main(string[] args)
{
var config = File.ReadAllText("app.conf");
var conf = ConfigurationFactory.ParseString(config);
/*
var actorSystem = ActorSystem.Create("AkkaTrader", conf.BoostrapApplication(new AppBootstrapConfig(true, true)));
var sharding = ClusterSharding.Get(actorSystem);
var shardRegion = sharding.Start("priceAggregator",
s => Props.Create(() => new MatchAggregator(s)),
ClusterShardingSettings.Create(actorSystem),
new StockShardMsgRouter());
var clientHandler =
actorSystem.ActorOf(Props.Create(() => new ClientHandlerActor(shardRegion)), "subscriptions");
// make ourselves available to ClusterClient at /user/subscriptions
ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);
Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
foreach (var ticker in AvailableTickerSymbols.Symbols)
{
shardRegion.Tell(new Ping(ticker));
}
});
// start Petabridge.Cmd (for external monitoring / supervision)
var pbm = PetabridgeCmd.Get(actorSystem);
void RegisterPalette(CommandPaletteHandler h)
{
if (pbm.RegisterCommandPalette(h))
{
Console.WriteLine("Petabridge.Cmd - Registered {0}", h.Palette.ModuleName);
}
else
{
Console.WriteLine("Petabridge.Cmd - DID NOT REGISTER {0}", h.Palette.ModuleName);
}
}
RegisterPalette(ClusterCommands.Instance);
RegisterPalette(RemoteCommands.Instance);
RegisterPalette(ClusterShardingCommands.Instance);
RegisterPalette(new PriceCommands(shardRegion));
pbm.Start();
actorSystem.WhenTerminated.Wait();
*/
return 0;
}
Remove the block comment from this method body and the Akka.CQRS.Pricing system will be good to go, provided that you also applied the HOCON changes from earlier in this lesson.
The MatchAggregator
actor is the key player inside the Akka.CQRS.Pricing service - it’s responsible for subscribing to the ITradeEvent
s published by the OrderBookActor
and aggregating those trade events:
Command<Match>(m => TickerSymbol.Equals(m.StockId), m =>
{
_log.Info("Received MATCH for {0} - price: {1} quantity: {2}", TickerSymbol, m.SettlementPrice, m.Quantity);
if (_matchAggregate == null)
{
_matchAggregate = new MatchAggregate(TickerSymbol, m.SettlementPrice, m.Quantity);
return;
}
if (!_matchAggregate.WithMatch(m)) // pull Match into current price/volume aggregation
{
// should never get to here
_log.Warning("Received Match for ticker symbol [{0}] - but we only accept symbols for [{1}]",
m.StockId, TickerSymbol);
}
});
The aggregated Match
events are used to produce an estimated, weighted moving average price and per-trade volume which the pricing service will, in turn, publish to its own subscribers:
Command<PublishEvents>(p =>
{
if (_matchAggregate == null)
return;
var (latestPrice, latestVolume) = _matchAggregate.FetchMetrics(_timestamper);
// Need to update pricing records prior to persisting our state, since this data is included in
// output of SaveAggregateData()
_priceUpdates.Add(latestPrice);
_volumeUpdates.Add(latestVolume);
PersistAsync(SaveAggregateData(), snapshot =>
{
_log.Info("Saved latest price {0} and volume {1}", snapshot.RecentAvgPrice, snapshot.RecentAvgVolume);
if (LastSequenceNr % SnapshotEveryN == 0)
{
SaveSnapshot(snapshot);
}
});
// publish updates to price and volume subscribers
_marketEventPublisher.Publish(TickerSymbol, latestPrice);
_marketEventPublisher.Publish(TickerSymbol, latestVolume);
});
The price and volume updates will be published as IPriceUpdate
and IVolumeUpdate
events, which will be consumed by the Akka.CQRS.Pricing.Web and the custom Akka.CQRS.Pricing.Cli Petabridge.Cmd palette we created for this sample.
Before we try to setup the Akka.CQRS.Pricing.Web application, let’s see if we can get the Akka.CQRS.Pricing.Service up and running inside our cluster using docker-compose
.
Launching Akka.CQRS.Pricing
Go to the docker-compose.yaml
file at the root of the solution and add the following entry to the bottom of the file:
pricing-root:
image: akka.cqrs.pricing
hostname: pricing-root
ports:
- '0:9110'
environment:
CLUSTER_SEEDS: "akka.tcp://AkkaTrader@lighthouse:4053"
CLUSTER_PORT: 6055
CLUSTER_IP: "pricing-root"
MONGO_CONNECTION_STR: "mongodb://mongo:27017/akkaTrader"
STATSD_PORT: 8125
STATSD_URL: "graphite"
JAEGER_AGENT_HOST: "jaeger"
restart: on-failure
depends_on:
- "mongo"
- "lighthouse"
The MatchAggregator
actors are persistent, thus they need to be able to connect to MongoDb.
Once this is done, run the Docker build process and then execute the following:
PS> docker-compose up
Once the cluster comes online locate the port exposed by the pricing-root
container to the host network. This port is determined randomly based on how we wrote our Dockerfile. In Kitematic, you’ll see that information here - but this can also be done from the Docker command line:
So in this case, the port number for pbm
is 32769
. Open a new terminal and type the following command:
PS> pbm 127.0.0.1:{your port number} price track -s MSFT
And you’ll see output similar to the following:
[MSFT][05/28/2019 17:37:25 +00:00] - $[20.106650376251427964807110712]
[MSFT][05/28/2019 17:32:34 +00:00] - $[20.274556689693797146097405325]
[MSFT][05/28/2019 17:32:44 +00:00] - $[19.979082111208260641847389504]
[MSFT][05/28/2019 17:32:54 +00:00] - $[18.880631034012567896143666967]
[MSFT][05/28/2019 17:33:04 +00:00] - $[18.071261664343961427756599508]
[MSFT][05/28/2019 17:33:14 +00:00] - $[17.942004359311407306285646854]
[MSFT][05/28/2019 17:33:24 +00:00] - $[17.910926625654301605621356379]
[MSFT][05/28/2019 17:33:34 +00:00] - $[17.628967535873517027261368426]
[MSFT][05/28/2019 17:33:44 +00:00] - $[17.811762841507259352753432734]
[MSFT][05/28/2019 17:33:54 +00:00] - $[18.008120807019171652575791913]
Every 10 seconds, whenever a MatchAggregator
for a specific ticker symbol receives a PublishEvents
command it will publish its most recent IPriceUpdate
and IVolumeUpdate
events to indicate what the new “market price” is for a specific stock. We can follow this process along in real-time using this custom Petabridge.Cmd palette we added for pricing data inside Akka.CQRS.
Bonus Exercise: Given the design of the Akka.CQRS.Pricing.Cli project, could you replicate the functionality we use for tracking live IPriceChanged
updates, but this time for tracking IVolumeChanged
updates?
The usage of DistributedPubSub
here makes it really easy for us to extend the capabilities of Akka.CQRS to support all sorts of new functionality - and in this case, the last piece of functionality we’re going to add to this lesson is a decoupled Web UI that allows us to view what’s going on inside the Akka.CQRS Akka.NET cluster without actually being a part of the cluster itself. How are we going to do this? Through the use of the ClusterClient
!
Working with ClusterClient
Akka.NET clusters are meant for building high-throughput, highly available real-time applications - which implies that they are stateful. We always want to treat business state with care and caution because that’s where the business value of our applications live. Therefore, it’s often a good idea to decouple ancillary parts of our application from the cluster - and that’s precisely what the Akka.Cluster.Tools.ClusterClient does.
Here’s an analogy to help you understand the role of ClusterClient
in the grand scheme of things:
Akka.Cluster is to SQL Server as ClusterClient is to SqlClient.
The ClusterClient allows us to interact with our Akka.NET cluster over Akka.Remote without being part of the cluster ourselves. This is useful because it means that frequently updated pieces of code, such as Web UIs, can be continuously updated without any impact on the membership or operation of the Akka.NET cluster itself. This is how we’re going to introduce the Akka.CQRS.Pricing.Web application to our solution.
Akka.CQRS.Pricing.Web is an ASP.NET Core + SignalR powered solution - it’s designed to stream IPriceUpdate
and IVolumeUpdate
messages over SignalR to a Web UI just like how we saw with the custom Petabridge.Cmd palette from earlier.
NOTE: the Akka.Cluster.Tools.ClusterClient does not need to have
akka.actor.provider = cluster
in order to run. In fact, it shouldn’t have that setting - what it does need, however, isakka.actor.provider = remote
. With access to Akka.Remote theClusterClient
can’t do its job.
In order to do this, however, we need to have some way of actually receiving those update events and then publishing them into SignalR. So the first thing we’ll need is an actor that can translate these Akka.NET events into SignalR, which is precisely what the StockPublisherActor
does:
/// <summary>
/// Publishes events directly to the <see cref="StockHubHelper"/>
/// </summary>
public class StockPublisherActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly StockHubHelper _hub;
public StockPublisherActor(StockHubHelper hub)
{
_hub = hub;
ReceiveAsync<IPriceUpdate>(async p =>
{
try
{
_log.Info("Received event {0}", p);
await hub.WritePriceChanged(p);
}
catch (Exception ex)
{
_log.Error(ex, "Error while writing price update [{0}] to StockHub", p);
}
});
ReceiveAsync <IVolumeUpdate>(async p =>
{
try
{
_log.Info("Received event {0}", p);
await hub.WriteVolumeChanged(p);
}
catch (Exception ex)
{
_log.Error(ex, "Error while writing volume update [{0}] to StockHub", p);
}
});
}
}
The StockHubHelper
object is really a wrapper around an IHubContext<StockHub>
, which is what’s responsible for actually transmitting the bytes over SignalR’s WebSocket connection to the web browsers sitting on the other side. We just convert the IPriceChanged
and IVolumeChanged
events to their string representations via the usual object.ToString()
method implemented on their concrete base classes.
But the StockPublisherActor
needs some way of getting those events from the Akka.CQRS.Pricing.Service in the first place - and that’s where the StockEventConfiguratorActor
and the ClusterClient
come into play:
public class StockEventConfiguratorActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private IActorRef _clusterClient;
private readonly IActorRef _stockPublisher;
private ImmutableHashSet<ActorPath> _initialContacts;
private sealed class Start
{
public static readonly Start Instance = new Start();
private Start() { }
}
public StockEventConfiguratorActor(IActorRef stockPublisher, IReadOnlyList<Address> contactAddresses)
{
_initialContacts = contactAddresses.Select(x => new RootActorPath(x) / "system" / "receptionist")
.ToImmutableHashSet();
_stockPublisher = stockPublisher;
Initializing();
}
private void Initializing()
{
Receive<Start>(s =>
{
_log.Info("Contacting cluster client on addresses [{0}]", string.Join(",", _initialContacts));
_clusterClient.Tell(new ClusterClient.Send("/user/subscriptions", new SubscribeClientAll()));
});
Receive<ReceiveTimeout>(t => { Self.Tell(Start.Instance); });
ReceiveAny(_ =>
{
// connected via ClusterClient now
_stockPublisher.Forward(_);
});
}
protected override void PreStart()
{
Context.SetReceiveTimeout(TimeSpan.FromSeconds(15));
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(2), Self, Start.Instance, ActorRefs.NoSender);
_clusterClient = Context.ActorOf(Akka.Cluster.Tools.Client.ClusterClient.Props(ClusterClientSettings
.Create(Context.System)
.WithInitialContacts(_initialContacts)));
}
}
The actor is responsible for establishing a connection to at least one of the Akka.CQRS.Pricing.Service nodes - that’s the purpose of setting up the _initialContacts
property inside this actor’s constructor: it tells us where to find the ClusterClientReceptionist
on each node.
The relationship between the ClusterClient
and the ClusterClientReceptionist
can be defined thusly: the ClusterClient
relies on the ClusterClientReceptionist
to route its messages to the appropriate destinations inside the cluster and to send replies back upstream to the ClusterClient
who made the original request.
The ClusterClient
only needs to be able to successfully contact a single ClusterClientReceptionist
in order to be effective - once that first receptionist is succesfully contacted it will publish the addresses of other receptionists that are running inside the cluster back to the ClusterClient
, that way the client can always maintain at least one point of contact with the cluster over very long periods of time - even if the membership of the cluster churns as a result of deployments, autoscaling, network partitions, and so forth.
In the case of the StockEventConfiguratorActor
, we create a ClusterClient
immediately at startup and repeatedly send a Akka.CQRS.Subscriptions.SubscribeClientAll
message to the /user/subscriptions
actor until we start receiving a stream of traffic back. That /user/subscriptions
running inside the cluster… Somewhere… How do we ensure that message gets handled by someone?
That’s the job of the ClusterClientReceptionist
running on the Akka.CQRS.Pricing.Service:
var clientHandler =
actorSystem.ActorOf(Props.Create(() => new ClientHandlerActor(shardRegion)), "subscriptions");
// make ourselves available to ClusterClient at /user/subscriptions
ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);
/// <summary>
/// Responsible for handling inbound requests from the <see cref="Akka.Cluster.Tools.Client.ClusterClient"/>
/// actors running on the Web nodes.
/// </summary>
public sealed class ClientHandlerActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IActorRef _priceRouter;
public ClientHandlerActor(IActorRef priceRouter)
{
_priceRouter = priceRouter;
Receive<SubscribeClient>(s =>
{
_log.Info("Received {0} from {1}", s, Sender);
_priceRouter.Tell(new MarketSubscribe(s.StockId, MarketEventHelpers.AllMarketEventTypes, Sender));
});
Receive<SubscribeClientAll>(a =>
{
_log.Info("Received {0} from {1}", a, Sender);
foreach (var s in AvailableTickerSymbols.Symbols)
{
_priceRouter.Tell(new MarketSubscribe(s, MarketEventHelpers.AllMarketEventTypes, Sender));
}
});
Receive<UnsubscribeClient>(s =>
{
_log.Info("Received {0} from {1}", s, Sender);
_priceRouter.Tell(new MarketUnsubscribe(s.StockId, MarketEventHelpers.AllMarketEventTypes, Sender));
});
}
}
The really important detail, however, is this call to the ClusterClientReceptionist
:
// make ourselves available to ClusterClient at /user/subscriptions
ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);
What this line does: it enables ClusterClient
s to use a ClusterClient.Send
command to deliver messages to this actor at list local path (/user/subscriptions
), thus connecting the dots between the ClusterClient
and the rest of the Akka.NET cluster.
When the ClusterClient.Send
message is sent to a ClusterClientReceptionist
, we create a local “response tunnel” actor to act as a proxy back to the original sending actor on the same side of the network as the ClusterClient
- this is done so the rest of the Akka.NET cluster doesn’t accidentally open a bunch of unnecessary addtional inbound connections back into the Akka.CQRS.Pricing.Web node.
All replies back to the Sender
in response to a ClusterClient.Send
message are all piped back through the response tunnel, which are then finally sent back to the original sender of the ClusterClient.Send
message back on the same node running the ClusterClient
itself.
In the event that the current ClusterClientReceptionist
node dies, the StockEventConfiguratorActor
will use the ClusterClient
to attempt to contact one of the receptionists running on a different node in order to resume communications again - which gives us the necessary level of fault tolerance we need to be able to redeploy all of our services independently without compromising availability.
Running Akka.CQRS.Pricing.Web
No code changes are needed to get Akka.CQRS.Pricing.Web off the ground - we should have everything we need already built into the project. All we need to do is add the Akka.CQRS.Pricing.Web Docker image output to docker-compose.yaml
file at the root of the solution:
pricing-web:
image: akka.cqrs.pricing.web
ports:
- '8999:80'
environment:
CLUSTER_SEEDS: "akka.tcp://AkkaTrader@pricing-root:6055"
CLUSTER_PORT: 16666
STATSD_PORT: 8125
STATSD_URL: "graphite"
JAEGER_AGENT_HOST: "jaeger"
restart: on-failure
depends_on:
- "pricing-root"
Add the above YAML to the end of the docker-compose.yaml
file.
If you already built all of the Docker images in the previous step, you should be good to go - otherwise follow the Docker build process again.
Notice: we’re using the CLUSTER_SEEDS
environment variable to tell Akka.CQRS.Pricing.Web what the initial ClusterClientReceptionist
nodes are inside our docker-compose.yaml
file - that information is subsequently read by the Akka.CQRS.Pricing.Web AkkaService
when it’s started by the ASP.NET Core bootstrapper:
public ActorSystem Sys { get; private set; }
public Task StartActorSystem(StockHubHelper helper)
{
Console.WriteLine("STARTING AKKA.NET");
var conf = ConfigurationFactory.ParseString(File.ReadAllText("app.conf"))
.BoostrapApplication(new AppBootstrapConfig(false, false));
var actorSystem = Sys = ActorSystem.Create("AkkaCqrsWeb", conf);
var stockPublisherActor =
actorSystem.ActorOf(Props.Create(() => new StockPublisherActor(helper)), "stockPublisher");
var initialContactAddress = Environment.GetEnvironmentVariable("CLUSTER_SEEDS")?.Trim().Split(",")
.Select(x => Address.Parse(x)).ToList();
if (initialContactAddress == null)
{
actorSystem.Log.Error("No initial cluster contacts found. Please be sure that the" +
" CLUSTER_SEEDS environment variable is populated with at least one address.");
return Task.FromException(new ConfigurationException(
"No initial cluster contacts found. Please be sure that the" +
" CLUSTER_SEEDS environment variable is populated with at least one address."));
}
var configurator = actorSystem.ActorOf(
Props.Create(() => new StockEventConfiguratorActor(stockPublisherActor, initialContactAddress)),
"configurator");
return Task.CompletedTask;
}
public async Task Stop()
{
await Sys.Terminate();
}
Once this is finished, call docker-compose up
and browse to http://localhost:8999. You should see output similar to the following:
Every 10 seconds or so there will be a new batch of price and volume updates - so if you don’t see data right away, don’t panic. Wait for the interval to come around.
Next
Now that our Akka.NET cluster is running using the best available tools and practices, it’s time to put everything together: Lesson 5 - Deploying Akka.Cluster inside Kubernetes
If you liked this post, you can share it with your followers or follow us on Twitter!
- Read more about:
- Akka.NET
- Case Studies
- Videos
Observe and Monitor Your Akka.NET Applications with Phobos
Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?
Click here to learn more.