Thus far we’ve gotten Akka.CQRS to get its work done via clustered consistent hash routing, hand-rolled publish and subscribe, and the child-per-entity pattern. These are all fairly simple building blocks for Akka.Cluster applications.

And yet, it is because of the simplicity of these very building blocks that we haven’t had Akka.CQRS.Pricing or the Akka.CQRS.Pricing.Web applications involved in any of our work thus far. And not to mention we’ve had some issues cleanly moving our OrderBook state on the Akka.CQRS.TradeProcessor service due to the simplistic nature of clustered routers in Akka.NET.

In order to address some of these problems, we’re going to adopt the tools introduced to Akka.NET developers via the Akka.Cluster.Tools and Akka.Cluster.Sharding NuGet packages.

Getting Started

To start this lesson, we need to checkout to the lesson4 branch in our git repository:

PS> git checkout lesson4

First thing we’re going to do is open up the Akka.CQRS.Infrastructure project and take a look at the Akka.CQRS.Infrastructure.csproj definition:

    
<Project Sdk="Microsoft.NET.Sdk">
  <Import Project="..\common.props" />
  <PropertyGroup>
    <TargetFramework>netstandard2.0</TargetFramework>
    <Description>Shared, non-domain-specific infrastructure used by various Akka.CQRS services.</Description>
    <Configurations>Debug;Release;Phobos</Configurations>
  </PropertyGroup>
  <ItemGroup>
    <None Remove="Ops\phobos.conf" />
  </ItemGroup>

  <ItemGroup>
    <EmbeddedResource Include="Ops\ops.conf" />
    <EmbeddedResource Include="Ops\phobos.conf" />
  </ItemGroup>

  <ItemGroup>
    <PackageReference Include="Akka.Cluster.Sharding" Version="$(AkkaVersion)-beta*" />
    <PackageReference Include="Akka.Persistence.Extras" Version="$(AkkaPersistenceExtrasVersion)" />
    <PackageReference Include="Akka.Bootstrap.Docker" Version="$(AkkaBootstrapVersion)" />
  </ItemGroup>

  <ItemGroup Condition="'$(Configuration)' == 'Phobos'">
     <!-- Uncomment these to install Phobos binaries -->
    <!--<PackageReference Include="Phobos.Actor.Cluster" Version="$(PhobosVersion)" />
    <PackageReference Include="Phobos.Tracing.Jaeger" Version="$(PhobosTracingVersion)" />
    <PackageReference Include="Phobos.Monitoring.StatsD" Version="$(PhobosMonitoringVersion)" />-->
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..\Akka.CQRS\Akka.CQRS.csproj" />
  </ItemGroup>

You’ll notice that we already have the Akka.Cluster.Sharding package referenced here - since Akka.Cluster.Sharding depends on Akka.Cluster.Tools, and since all of our .*Service and *.Web projects reference Akka.CQRS.Infrastructure, this means that we’re good to go on NuGet packages. No need to install any at this point in time. We just need to rebuild the Akka.CQRS.sln and that will automatically restore the packages we need.

NOTE: Akka.Cluster.Sharding depends on Akka.Cluster.Tools because the default “persistent” mode for storing sharding data depends on having access to a Cluster Singleton that can maintain a single, consistent view of the shard allocation data. That’s why this dependency exists.

Now at the moment, we’re not actually using Akka.Cluster.Sharding or any of the Akka.Cluster.Tools features anywhere in our applications thus far. That’s about to change.

The next thing we need to do is to open the Akka.CQRS.Infrastructure.AppBootstrap.cs file and uncomment the following lines:

config = config
    .WithFallback(GetOpsConfig())
    .WithFallback(TradeEventSerializer.Config)
    //.WithFallback(ClusterSharding.DefaultConfig())
    //.WithFallback(DistributedData.DistributedData.DefaultConfig()) // needed for DData sharding
    //.WithFallback(ClusterClientReceptionist.DefaultConfig())
    //.WithFallback(DistributedPubSub.DefaultConfig())
    .BootstrapFromDocker();

Uncomment these lines to enable Cluster.Sharding, Cluster.Client, and DistributedPubSub serialization for all nodes in our cluster.

NOTE: starting with Akka.NET 1.4, this may no longer be necessary as all built-in HOCON configurations will be loaded automatically by Akka.NET configuration library.

Now it’s time for us to introduce these tools where they’re needed inside the Akka.CQRS solution.

Working with Akka.Cluster.Sharding

Akka.Cluster.Sharding is a much more sophisticated version of what we previously accomplished using consistent hash routing and the child-per-entity pattern, but there are some major differences:

  1. Akka.Cluster.Sharding guarantees that there will be at most one instance of every unique persistent entity in our cluster, even in the event of network partitions;
  2. Akka.Cluster.Sharding will buffer messages intended for entities that are being moved from one node in the cluster to another - thereby improving the odds that they’ll be delivered during partition hand-offs;
  3. Akka.Cluster.Sharding is designed to guarantee an even distribution of state across the cluster even as it scales up and down - this is so we don’t run the risk of having some nodes acting as “hot spots” while the rest of the cluster stays idle; and
  4. Akka.Cluster.Sharding includes some built-in mechanisms for passivating entities - getting them to safely write the rest of their state to a persistent store prior to being stopped and moved onto a new node. Historically this has usually been done by just setting a ReceiveTimeout inside your actors, but it can now be done explicitly when setting up the ShardRegion for your entities.

The first thing we’re going to do is create a ShardRegion inside our Akka.CQRS.TradeProcessor.Service/Program.cs:

static int Main(string[] args)
{
  	var config = File.ReadAllText("app.conf");
    var conf = ConfigurationFactory.ParseString(config).BoostrapApplication(new AppBootstrapConfig(true, true));

    var actorSystem = ActorSystem.Create("AkkaTrader", conf);

    var orderBooks = actorSystem.ActorOf(Props.Create(() => new OrderBookMasterActor()), "orderbooks");

    // start Petabridge.Cmd (for external monitoring / supervision)
    var pbm = PetabridgeCmd.Get(actorSystem);
    pbm.RegisterCommandPalette(ClusterCommands.Instance);
    pbm.RegisterCommandPalette(ClusterShardingCommands.Instance);
    pbm.RegisterCommandPalette(RemoteCommands.Instance);
    pbm.Start();

    actorSystem.WhenTerminated.Wait();
    return 0;
}

Delete the line where we assign orderBooks and replace it with the following code:

Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
{
    var sharding = ClusterSharding.Get(actorSystem);


    var shardRegion = sharding.Start("orderBook", s => OrderBookActor.PropsFor(s), ClusterShardingSettings.Create(actorSystem),
        new StockShardMsgRouter());
});

What does this code do? First, the Cluster.RegisterOnMemberUp method creates a callback that gets invoked once this node has successfully joined the cluster, so none of the code inside that lambda method is run until the cluster is online. You don’t have to wait until your node has joined the cluster before starting the sharding system, but this is something we routinely do at Petabridge.

Next, we get a reference to the sharding system itself:

var sharding = ClusterSharding.Get(actorSystem);

We can use the ClusterSharding object to spawn new ShardRegions or ShardRegionProxies, which we will get to in a moment. First, we’re going to create an “orderBook” ShardRegion we can use for hosting OrderBookActors:

var shardRegion = sharding.Start("orderBook", s => OrderBookActor.PropsFor(s), ClusterShardingSettings.Create(actorSystem),
        new StockShardMsgRouter());

The shardRegion variable, in this case, is an IActorRef - and we can use that shardRegion actor to send and distribute messages to any of the “orderBook” entity actors hosted anywhere in our Akka.Cluster. In order for a node to host “orderBook” entities, it must run the exact same piece of code we’ve just executed here - thankfully, Docker containers do this automatically.

The second argument we pass into the ClusterSharding.Start method is a Func<String, Props> - it’s a factory method that receives, as its input, the name of the sharded entity and returns a Props that can be used to create the actor that will represent that entity inside the sharding system. “How do we know how to name entities inside Akka.Cluster.Sharding?,” you ask?

Using the StockShardMsgRouter defined in the Akka.CQRS.Infrastructure project, which we need to uncomment in order to allow our solution to compile:

/*
/// <summary>
/// Used to route sharding messages to order book actors hosted via Akka.Cluster.Sharding.
/// </summary>
public sealed class StockShardMsgRouter : HashCodeMessageExtractor
{
    /// <summary>
    /// 3 nodes hosting order books, 10 shards per node.
    /// </summary>
    public const int DefaultShardCount = 30;
    public StockShardMsgRouter() : this(DefaultShardCount)
    {
    }
    public StockShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
    {
    }
    public override string EntityId(object message)
    {
        if (message is IWithStockId stockMsg)
        {
            return stockMsg.StockId;
        }
        if (message is IConfirmableMessageEnvelope<IWithStockId> envelope)
        {
            return envelope.Message.StockId;
        }
        return null;
    }
}
*/

Uncomment this class - we’re going to use it to route messages any time we use the Akka.Cluster.Sharding system in the Akka.CQRS solution.

The StockShardMsgRouter implements a HashCodeMessageExtractor base class from the Akka.Cluster.Sharding namespace - message extractors are used for doing two things in Akka.Cluster.Sharding:

  1. Determining which entity belongs to which shard - in this case we use consistent hash routing to determine which shard an entity belongs to, and in the base class constructor of the HashCodeMessageExtractor we specify the maxmimum number of shards that can exist for the “orderBook” ShardRegion. If there are 30 shards and the hash value of the entity’s id falls into the portion of the hash range owned by shard 21, then that entity will be created as the child of shard 21.
  2. Determining the names of each entity actor - this is what the EntityId method on the StockShardMsgRouter does. In this instance, we are simply looking for the IWithStockId interface in the messages sent through the sharding system and any time we find it, we return the StockId and use that as the entity’s id. All messages with a StockId of “MSFT” will all be routed to the single “MSFT” actor that exists globally inside our cluster.

Sharding overview

For every ShardRegion, there exists 1-N shards, and each one of those shards acts as the parent actor of all of the individual entity actors underneath. It’s the role of the IMessageExtractor to determine which entity a message belongs to and which shard that entity belongs to. Sharded entity actors are created on-demand when they’re sent a message for the very first time, either via a ShardRegion or a ShardRegionProxy. That’s why the StockShardMessageRouter is the next crucial piece in our setup for Akka.Cluster.Sharding.

ShardRegionProxy and the Akka.CQRS.TradePlacers.Service

Now that we’re capable of hosting the “orderBook” ShardRegion on all of the Akka.CQRS.TradeProcessor.Service instances, we need to fix the Akka.CQRS.TradePlacers.Service so it can communicate with our sharded actors instead of using the consistent hash router we depended on earlier.

Let’s take a look at Akka.CQRS.TradePlacers/Program.cs and make some changes:

static int Main(string[] args)
{
    var config = File.ReadAllText("app.conf");
    var conf = ConfigurationFactory.ParseString(config).BoostrapApplication(new AppBootstrapConfig(false, true));


    var actorSystem = ActorSystem.Create("AkkaTrader", conf);


    var tradeRouter = actorSystem.ActorOf(
        Props.Empty.WithRouter(new ClusterRouterGroup(
            new ConsistentHashingGroup(new[] {"/user/orderbooks"},
                TradeEventConsistentHashMapping.TradeEventMapping),
            new ClusterRouterGroupSettings(10000, new []{ "/user/orderbooks" }, true, useRole:"trade-processor"))), "tradesRouter");


    Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
    {
        var shardRegionProxy = tradeRouter;
        var subManager = new ActorTradeSubscriptionManager(tradeRouter);
        foreach (var stock in AvailableTickerSymbols.Symbols)
        {
            var max = (decimal)ThreadLocalRandom.Current.Next(20, 45);
            var min = (decimal) ThreadLocalRandom.Current.Next(10, 15);
            var range = new PriceRange(min, 0.0m, max);


            // start bidders
            foreach (var i in Enumerable.Repeat(1, ThreadLocalRandom.Current.Next(1, 6)))
            {
                actorSystem.ActorOf(Props.Create(() => new BidderActor(subManager, stock, range, shardRegionProxy)));
            }


            // start askers
            foreach (var i in Enumerable.Repeat(1, ThreadLocalRandom.Current.Next(1, 6)))
            {
                actorSystem.ActorOf(Props.Create(() => new AskerActor(subManager, stock, range, shardRegionProxy)));
            }
        }
    });


    // start Petabridge.Cmd (for external monitoring / supervision)
    var pbm = PetabridgeCmd.Get(actorSystem);
    pbm.RegisterCommandPalette(ClusterCommands.Instance);
    pbm.RegisterCommandPalette(ClusterShardingCommands.Instance);
    pbm.RegisterCommandPalette(RemoteCommands.Instance);
    pbm.Start();


    actorSystem.WhenTerminated.Wait();
    return 0;
}

First things first, delete the tradeRouter - we’re not going to need it anymore.

Next, replace the following two lines:

var shardRegionProxy = tradeRouter;
var subManager = new ActorTradeSubscriptionManager(tradeRouter);

With these ones:

var sharding = ClusterSharding.Get(actorSystem);
var shardRegionProxy = sharding.StartProxy("orderBook", "trade-processor", new StockShardMsgRouter());
var subManager = Akka.CQRS.Subscriptions.DistributedPubSub.DistributedPubSubTradeEventSubscriptionManager.For(actorSystem);

No further code changes are required to Program.cs beyond this point. What this code does is two things:

  1. Creates a ShardRegionProxy that the BidderActor and AskerActor instances can use to transmit trades to the OrderBookActors hosted inside the “orderBook” ShardRegion on the Akka.CQRS.TradeProcessor.Service nodes. The ShardRegionProxy, as you may notice, uses the exact same StockShardMsgRouter to distribute messages to the entity actors - this is no accident. In order for any node to be able to reliably contact a sharded entity actor it must have a copy of the same IMessageExtractor algorithm, which is why they’re typically defined in shared libraries like Akka.CQRS.Infrastructure.
  2. We’re now using Akka.Cluster.Tools.DistributedPubSub implementation of the ITradeEventSubscriptionManager - an abstraction we created in order to make it easier to switch from a hand-rolled pub-sub implementation to DistributedPubSub for the purposes of this workshop.

We’ll get back to DistributedPubSub in a moment, but for the time being - ShardRegionProxy.

sharding.StartProxy("orderBook", "trade-processor", new StockShardMsgRouter());

Here’s what this call does:

  • Create a ShardRegionProxy for region “orderBook”;
  • Tell the ShardRegionProxy to look for “orderBook” ShardRegions only on Akka.Cluster nodes with a defined “trade-processor” role; and
  • Use the StockShardMsgRouter to determine the shard number and entity id for all messages we want to distribute to “orderBook” entity actors.

ShardRegionProxy routing

The ShardRegionProxy is essentially a pointer to the sharded entities in the cluster - it gets its information from the ShardingCoordinator and then uses that to direct its messages towards the appropriate node in the cluster. Whenever the shape or topology of the Akka.Cluster.Sharding system changes (i.e. when shards are rebalanced or a ShardRegion leaves the cluster) those changes will also be propagated to all of the ShardRegionProxy instances - just like their ShardRegion counterparts.

Adding Akka.Cluster.Sharding HOCON to All Nodes

One thing that is currently missing is that none of our services currently have the HOCON necessary to tell Akka.Cluster.Sharding which nodes can host shards and which nodes cannot. We’re going to fix that issue right now.

Configure Akka.CQRS.Pricing.Service

First, open the Akka.CQRS.Pricing.Service/app.conf file and insert the following content INSIDE the akka.cluster HOCON section:

akka {
	extensions = ["Akka.Cluster.Tools.Client.ClusterClientReceptionistExtensionProvider, Akka.Cluster.Tools"]
	actor {
		provider = cluster
	}
						
	remote {
		dot-netty.tcp {
				hostname = "127.0.0.1"
				port = 6055
			}
	}			

	cluster {
		#will inject this node as a self-seed node at run-time
		seed-nodes = ["akka.tcp://[email protected]:6055"] 
		roles = ["pricing-engine" , "trade-events"]
		pub-sub{
			role = "trade-events"
		}
		sharding{
			role = "pricing-engine"
			state-store-mode = ddata
		}
		client.receptionist.role = pricing-engine # stops ClusterClient gossip from going to the worker nodes
	}

	persistence{
		journal {
		    plugin = "akka.persistence.journal.mongodb"
			mongodb.class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
			mongodb.collection = "EventJournal"
			mongodb.event-adapters = {
				stock-tagger = "Akka.CQRS.Infrastructure.StockEventTagger, Akka.CQRS.Infrastructure"
			}
			mongodb.event-adapter-bindings = {
				"Akka.CQRS.IWithStockId, Akka.CQRS" = stock-tagger
			}
		}

		snapshot-store {
		    plugin = "akka.persistence.snapshot-store.mongodb"
			mongodb.class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
			mongodb.collection = "SnapshotStore"
		}

		query {
            mongodb {
                class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"
                refresh-interval = 1s
            }
        }
	}
}

This will set up everything we need for Akka.Cluster.Sharding, ClusterClient, and DistributedPubSub inside the Akka.CQRS.Pricing.Service. Effectively, what we’re doing is telling these different Akka.Cluster components “which roles can X run on?” - in the case of DistributedPubSub, it’s only the nodes with the trade-events role that have access to the DistributedPubSubMediator - the actor responsible for circulating topics and the addresses of nodes who are subscribed to those topics. Ditto for Akka.Cluster.Tools.ClusterClient - only “pricing-engine” nodes can host the ClusterClientReceiptionist who accepts incoming connections from remote ClusterClient instances.

Notice that we’re saving the Akka.Cluster.Sharding state using Akka.DistributedData, an in-memory eventually consistent entity replication system for Akka.NET. Akka.DistributedData is still in beta (until Akka.NET v1.4.0 is releaesed) but we’re going to use it here so we can avoid having yet-another-thing depend on MongoDb.

Also, notice that the role for Akka.Cluster.Sharding is “pricing-engine” rather than “trade-processor” - this is because the Akka.CQRS.Pricing.Service hosts its own sharded entities, “priceAggregator”s, and those are kept separate from the “orderBook” enities hosted on the Akka.CQRS.TradeProcessor.Service.

Configure Akka.CQRS.TradePlacers.Service

Next, we need to configure the Akka.CQRS.TradePlacers.Service/app.conf file and insert the following content INSIDE the akka.cluster HOCON section:

akka {
	actor {
		provider = cluster

		deployment{
			/tradesRouter {
				router = consistent-hashing-group
				routees.paths = ["/user/orderbooks"]
				cluster {
						enabled = on
						use-role = trade-processor
				}
			}        
		}
	}
						
	remote {
		dot-netty.tcp {
				hostname = "127.0.0.1"
				port = 5054
			}
	}			

	cluster {
		#will inject this node as a self-seed node at run-time
		seed-nodes = ["akka.tcp://[email protected]:5055"] 
		roles = ["trader", "trade-events"]
		pub-sub{
			role = "trade-events"
		}
		sharding{
			role = "trade-processor"
		}
	}
}

Configure Akka.CQRS.TradeProcessor.Service

Finally, we need to configure the Akka.CQRS.TradeProcessor.Service/app.conf file and insert the following content INSIDE the akka.cluster HOCON section:

akka {
	actor {
		provider = cluster
	}
						
	remote {
		dot-netty.tcp {
				hostname = "127.0.0.1"
				port = 5055
			}
	}			

	cluster {
		#will inject this node as a self-seed node at run-time
		seed-nodes = ["akka.tcp://[email protected]:5055"] 
		roles = ["trade-processor" , "trade-events"]
		pub-sub{
			role = "trade-events"
		}
		sharding{
			role = "trade-processor"
			state-store-mode = ddata
		}
	}

	persistence{
		journal {
		    plugin = "akka.persistence.journal.mongodb"
			mongodb.class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
			mongodb.collection = "EventJournal"
			#mongodb.stored-as = binary
		}

		snapshot-store {
		    plugin = "akka.persistence.snapshot-store.mongodb"
			mongodb.class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
			mongodb.collection = "SnapshotStore"
			#mongodb.stored-as = binary
		}
	}
}

This will ensure that the “orderBook” ShardRegion will be hosted correctly once it’s started by the Akka.CQRS.TradeProcessor.Service.

Working with DistributedPubSub

Now that we have Akka.Cluster.Sharding up and running, we should talk about the role of DistributedPubSub inside Akka.CQRS.

Prior to the changes we implement in this lesson, all of the publish-subscribe activity inside the Akka.CQRS solution between the TradePlacer and TradeProcessor services was done using custom mechanisms like the InMemoryTradeEventPublisher:

/// <summary>
/// Used locally, in-memory by a single order book actor. Belongs to a single ticker symbol.
/// </summary>
public sealed class InMemoryTradeEventPublisher : TradeEventSubscriptionManagerBase, ITradeEventPublisher
{
    private readonly Dictionary<TradeEventType, HashSet<IActorRef>> _subscribers;

    public InMemoryTradeEventPublisher() : this(new Dictionary<TradeEventType, HashSet<IActorRef>>()) { }

    public InMemoryTradeEventPublisher(Dictionary<TradeEventType, HashSet<IActorRef>> subscribers)
    {
        _subscribers = subscribers;
    }

    public void Publish(string tickerSymbol, ITradeEvent @event)
    {
        var eventType = @event.ToTradeEventType();
        EnsureSub(eventType);

        foreach(var sub in _subscribers[eventType])
            sub.Tell(@event);
    }

    public override Task<TradeSubscribeAck> Subscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
    {
        foreach (var e in events)
        {
            EnsureSub(e);
            _subscribers[e].Add(subscriber);
        }

        return Task.FromResult(new TradeSubscribeAck(tickerSymbol, events));
    }

    private void EnsureSub(TradeEventType e)
    {
        if (!_subscribers.ContainsKey(e))
        {
            _subscribers[e] = new HashSet<IActorRef>();
        }
    }

    public override Task<TradeUnsubscribeAck> Unsubscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
    {
        foreach (var e in events)
        {
            EnsureSub(e);
            _subscribers[e].Remove(subscriber);
        }

        return Task.FromResult(new TradeUnsubscribeAck(tickerSymbol, events));
    }
}

We ran one of these inside each individual OrderBookActor and used that to publish back and forth directly to the AskerActor and BidderActor instances running on each of the Akka.CQRS.TradePlacers.Service nodes - all of the subscription state was, therefore, contained locally inside the OrderBookActor’s memory. This approach has a number of important limitations:

  • In the event that the OrderBookActor died and had to be recreated, possibly even on a new node, the subscriber actors had to be actively programmed to recreate their subscription to the new OrderBookActor once it was recreated. This can be difficult to do and can result in pauses in data availability to subscribers.
  • All actors who want to subscribe to order book events for a specific ticker symbol need to know how to contact the OrderBookActors directly.

We’re able to resolve these issues through the use of Akka.Cluster.Tools.DistributedPubSub, a decentralized message/topic broker that allows individual actors to subscribe to and publish to topics throughout the cluster without having to directly contact any of the other actors involved.

The DistributedPubSubTradeEventSubscriptionManager is an abstraction we use inside Akka.CQRS to help make it easy for actors to subscribe to DistributedPubSub topics published by the OrderBookActors without having to know the exact place and location of them in the cluster at any given time:

// <summary>
/// <see cref="ITradeEventSubscriptionManager"/> that uses the <see cref="DistributedPubSub.Mediator"/> under the hood.
/// </summary>
public sealed class DistributedPubSubTradeEventSubscriptionManager : TradeEventSubscriptionManagerBase
{
    private readonly IActorRef _mediator;

    public DistributedPubSubTradeEventSubscriptionManager(IActorRef mediator)
    {
        _mediator = mediator;
    }

    public override async Task<TradeSubscribeAck> Subscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
    {
        var tasks = ToTopics(tickerSymbol, events).Select(x =>
            _mediator.Ask<SubscribeAck>(new Subscribe(x, subscriber), TimeSpan.FromSeconds(3)));

        await Task.WhenAll(tasks).ConfigureAwait(false);

        return new TradeSubscribeAck(tickerSymbol, events);
    }

    public override async Task<TradeUnsubscribeAck> Unsubscribe(string tickerSymbol, TradeEventType[] events, IActorRef subscriber)
    {
        var tasks = ToTopics(tickerSymbol, events).Select(x =>
            _mediator.Ask<UnsubscribeAck>(new Unsubscribe(x, subscriber), TimeSpan.FromSeconds(3)));

        await Task.WhenAll(tasks).ConfigureAwait(false);

        return new TradeUnsubscribeAck(tickerSymbol, events);
    }


    public static DistributedPubSubTradeEventSubscriptionManager For(ActorSystem sys)
    {
        var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
        return new DistributedPubSubTradeEventSubscriptionManager(mediator);
    }
}

DistributedPubSub works through a built-in actor called the “mediator”, which we can access through the DistributedPubSub.Get(sys).Mediator method - the mediator is responsible for doing the following things:

  1. Memorizing which actors are subscribed to which topics on the current local node;
  2. Gossiping to mediators running on other nodes (usually ones running in a specific role) about which topics any actor on the current local nodes are subscribed to; and
  3. Executing Publish operations out to all of the appropriate local actors AND to the mediators running on other nodes which also have subscribers for the published topic.

To subscribe to a topic in DistributedPubSub, we just need to send a Subscribe(string topicId, IActorRef subscriber) message to the mediator actor:

var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
var subscribeAsk = await _mediator.Ask<SubscribeAck>(new Subscribe(x, subscriber), TimeSpan.FromSeconds(3)));

To publish to a topic in DistributedPubSub, we need to send a Publish(string topicId, object msg) to the mediator:

var mediator = Cluster.Tools.PublishSubscribe.DistributedPubSub.Get(sys).Mediator;
mediator.Tell(new Publish("MSFT", new Bid(...));

DistributedPubSub in action

DistributedPubSub is traffic-optimized. If there are a thousand actors running on node 2 that are all subscribed to topic “MSFT” and an actor on node 1 publishes an event to the topic “MSFT”, then the mediator on node 1 will only publish 1 copy of that message to the mediator running on node 2. The mediator on node 2 will then publish that message for topic “MSFT” locally, in-memory to all of the actors running on node who are subscribed to the “MSFT” topic.

In the course of the code that we changed earlier, we ensured that the OrderBookActor is now running using DistributedPubSub because the Cluster.Sharding system is starting the actor with the following constructor:

public OrderBookActor(string tickerSymbol) 
: this(tickerSymbol, null, 
DistributedPubSubTradeEventPublisher.For(Context.System), 
NoOpTradeEventSubscriptionManager.Instance, Context.Parent) { }

So all of the OrderBookActor ITradeEvent instances are now published to topics that can be subscribed to and from any other node in the cluster that has access to DistributedPubSub. This makes it possible for us to get the Akka.CQRS.Pricing service up and running for the first time.

Running Akka.CQRS.Pricing

The Pricing service is designed to aggregate Match events emitted by the OrderBookActors running on the Akka.CQRS.TradeProcessor.Service nodes and use that to generate real-time pricing and volume data about the market activity made against each trade symbol. So far the Akka.CQRS.Pricing.Service has been disabled in our solution because we really needed DistributedPubSub in order to make it feasible to run, and that’s exactly what we’re going to fix now.

Open the Akka.CQRS.Pricing.Service/Program.cs file and uncomment the main method body:

static int Main(string[] args)
{
    var config = File.ReadAllText("app.conf");
    var conf = ConfigurationFactory.ParseString(config);
    /*
    var actorSystem = ActorSystem.Create("AkkaTrader", conf.BoostrapApplication(new AppBootstrapConfig(true, true)));
    
    var sharding = ClusterSharding.Get(actorSystem);
    
    var shardRegion = sharding.Start("priceAggregator",
        s => Props.Create(() => new MatchAggregator(s)),
        ClusterShardingSettings.Create(actorSystem),
        new StockShardMsgRouter());
    var clientHandler =
        actorSystem.ActorOf(Props.Create(() => new ClientHandlerActor(shardRegion)), "subscriptions");
    // make ourselves available to ClusterClient at /user/subscriptions
    ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);
    
    Cluster.Cluster.Get(actorSystem).RegisterOnMemberUp(() =>
    {
        foreach (var ticker in AvailableTickerSymbols.Symbols)
        {
            shardRegion.Tell(new Ping(ticker));
        }
    });
    // start Petabridge.Cmd (for external monitoring / supervision)
    var pbm = PetabridgeCmd.Get(actorSystem);
    void RegisterPalette(CommandPaletteHandler h)
    {
        if (pbm.RegisterCommandPalette(h))
        {
            Console.WriteLine("Petabridge.Cmd - Registered {0}", h.Palette.ModuleName);
        }
        else
        {
            Console.WriteLine("Petabridge.Cmd - DID NOT REGISTER {0}", h.Palette.ModuleName);
        }
    }
    RegisterPalette(ClusterCommands.Instance);
    RegisterPalette(RemoteCommands.Instance);
    RegisterPalette(ClusterShardingCommands.Instance);
    RegisterPalette(new PriceCommands(shardRegion));
    pbm.Start();
    actorSystem.WhenTerminated.Wait();
    */
    return 0;
}

Remove the block comment from this method body and the Akka.CQRS.Pricing system will be good to go, provided that you also applied the HOCON changes from earlier in this lesson.

The MatchAggregator actor is the key player inside the Akka.CQRS.Pricing service - it’s responsible for subscribing to the ITradeEvents published by the OrderBookActor and aggregating those trade events:

Command<Match>(m => TickerSymbol.Equals(m.StockId), m =>
{
    _log.Info("Received MATCH for {0} - price: {1} quantity: {2}", TickerSymbol, m.SettlementPrice, m.Quantity);
    if (_matchAggregate == null)
    {
        _matchAggregate = new MatchAggregate(TickerSymbol, m.SettlementPrice, m.Quantity);
        return;
    } 


    if (!_matchAggregate.WithMatch(m)) // pull Match into current price/volume aggregation
    {
        // should never get to here
        _log.Warning("Received Match for ticker symbol [{0}] - but we only accept symbols for [{1}]", 
			m.StockId, TickerSymbol);
    }
});

The aggregated Match events are used to produce an estimated, weighted moving average price and per-trade volume which the pricing service will, in turn, publish to its own subscribers:

Command<PublishEvents>(p =>
{
    if (_matchAggregate == null)
        return;


    var (latestPrice, latestVolume) = _matchAggregate.FetchMetrics(_timestamper);


    // Need to update pricing records prior to persisting our state, since this data is included in
    // output of SaveAggregateData()
    _priceUpdates.Add(latestPrice);
    _volumeUpdates.Add(latestVolume);


    PersistAsync(SaveAggregateData(), snapshot =>
    {
        _log.Info("Saved latest price {0} and volume {1}", snapshot.RecentAvgPrice, snapshot.RecentAvgVolume);
        if (LastSequenceNr % SnapshotEveryN == 0)
        {
            SaveSnapshot(snapshot);
        }
    });


    // publish updates to price and volume subscribers
    _marketEventPublisher.Publish(TickerSymbol, latestPrice);
    _marketEventPublisher.Publish(TickerSymbol, latestVolume);
});

The price and volume updates will be published as IPriceUpdate and IVolumeUpdate events, which will be consumed by the Akka.CQRS.Pricing.Web and the custom Akka.CQRS.Pricing.Cli Petabridge.Cmd palette we created for this sample.

Before we try to setup the Akka.CQRS.Pricing.Web application, let’s see if we can get the Akka.CQRS.Pricing.Service up and running inside our cluster using docker-compose.

Launching Akka.CQRS.Pricing

Go to the docker-compose.yaml file at the root of the solution and add the following entry to the bottom of the file:

  pricing-root:
    image: akka.cqrs.pricing
    hostname: pricing-root
    ports:
      - '0:9110'
    environment:
      CLUSTER_SEEDS: "akka.tcp://AkkaTrader@lighthouse:4053"
      CLUSTER_PORT: 6055
      CLUSTER_IP: "pricing-root"
      MONGO_CONNECTION_STR: "mongodb://mongo:27017/akkaTrader"
      STATSD_PORT: 8125
      STATSD_URL: "graphite"
      JAEGER_AGENT_HOST: "jaeger"
    restart: on-failure
    depends_on:
      - "mongo"
      - "lighthouse"

The MatchAggregator actors are persistent, thus they need to be able to connect to MongoDb.

Once this is done, run the Docker build process and then execute the following:

PS> docker-compose up

Once the cluster comes online locate the port exposed by the pricing-root container to the host network. This port is determined randomly based on how we wrote our Dockerfile. In Kitematic, you’ll see that information here - but this can also be done from the Docker command line:

Kitematic pricing-root Petabridge.Cmd port

So in this case, the port number for pbm is 32769. Open a new terminal and type the following command:

PS> pbm 127.0.0.1:{your port number} price track -s MSFT

And you’ll see output similar to the following:

[MSFT][05/28/2019 17:37:25 +00:00] - $[20.106650376251427964807110712]
[MSFT][05/28/2019 17:32:34 +00:00] - $[20.274556689693797146097405325]
[MSFT][05/28/2019 17:32:44 +00:00] - $[19.979082111208260641847389504]
[MSFT][05/28/2019 17:32:54 +00:00] - $[18.880631034012567896143666967]
[MSFT][05/28/2019 17:33:04 +00:00] - $[18.071261664343961427756599508]
[MSFT][05/28/2019 17:33:14 +00:00] - $[17.942004359311407306285646854]
[MSFT][05/28/2019 17:33:24 +00:00] - $[17.910926625654301605621356379]
[MSFT][05/28/2019 17:33:34 +00:00] - $[17.628967535873517027261368426]
[MSFT][05/28/2019 17:33:44 +00:00] - $[17.811762841507259352753432734]
[MSFT][05/28/2019 17:33:54 +00:00] - $[18.008120807019171652575791913]

Every 10 seconds, whenever a MatchAggregator for a specific ticker symbol receives a PublishEvents command it will publish its most recent IPriceUpdate and IVolumeUpdate events to indicate what the new “market price” is for a specific stock. We can follow this process along in real-time using this custom Petabridge.Cmd palette we added for pricing data inside Akka.CQRS.

Bonus Exercise: Given the design of the Akka.CQRS.Pricing.Cli project, could you replicate the functionality we use for tracking live IPriceChanged updates, but this time for tracking IVolumeChanged updates?

The usage of DistributedPubSub here makes it really easy for us to extend the capabilities of Akka.CQRS to support all sorts of new functionality - and in this case, the last piece of functionality we’re going to add to this lesson is a decoupled Web UI that allows us to view what’s going on inside the Akka.CQRS Akka.NET cluster without actually being a part of the cluster itself. How are we going to do this? Through the use of the ClusterClient!

Working with ClusterClient

Akka.NET clusters are meant for building high-throughput, highly available real-time applications - which implies that they are stateful. We always want to treat business state with care and caution because that’s where the business value of our applications live. Therefore, it’s often a good idea to decouple ancillary parts of our application from the cluster - and that’s precisely what the Akka.Cluster.Tools.ClusterClient does.

Here’s an analogy to help you understand the role of ClusterClient in the grand scheme of things:

Akka.Cluster is to SQL Server as ClusterClient is to SqlClient.

The ClusterClient allows us to interact with our Akka.NET cluster over Akka.Remote without being part of the cluster ourselves. This is useful because it means that frequently updated pieces of code, such as Web UIs, can be continuously updated without any impact on the membership or operation of the Akka.NET cluster itself. This is how we’re going to introduce the Akka.CQRS.Pricing.Web application to our solution.

Akka.CQRS.Pricing.Web is an ASP.NET Core + SignalR powered solution - it’s designed to stream IPriceUpdate and IVolumeUpdate messages over SignalR to a Web UI just like how we saw with the custom Petabridge.Cmd palette from earlier.

NOTE: the Akka.Cluster.Tools.ClusterClient does not need to have akka.actor.provider = cluster in order to run. In fact, it shouldn’t have that setting - what it does need, however, is akka.actor.provider = remote. With access to Akka.Remote the ClusterClient can’t do its job.

In order to do this, however, we need to have some way of actually receiving those update events and then publishing them into SignalR. So the first thing we’ll need is an actor that can translate these Akka.NET events into SignalR, which is precisely what the StockPublisherActor does:

/// <summary>
/// Publishes events directly to the <see cref="StockHubHelper"/>
/// </summary>
public class StockPublisherActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();
    private readonly StockHubHelper _hub;

    public StockPublisherActor(StockHubHelper hub)
    {
        _hub = hub;

        ReceiveAsync<IPriceUpdate>(async p =>
        {
            try
            {
                _log.Info("Received event {0}", p);
                await hub.WritePriceChanged(p);
            }
            catch (Exception ex)
            {
                _log.Error(ex, "Error while writing price update [{0}] to StockHub", p);
            }
        });

        ReceiveAsync <IVolumeUpdate>(async p =>
        {
            try
            {
                _log.Info("Received event {0}", p);
                await hub.WriteVolumeChanged(p);
            }
            catch (Exception ex)
            {
                _log.Error(ex, "Error while writing volume update [{0}] to StockHub", p);
            }
        });
    }
}

The StockHubHelper object is really a wrapper around an IHubContext<StockHub>, which is what’s responsible for actually transmitting the bytes over SignalR’s WebSocket connection to the web browsers sitting on the other side. We just convert the IPriceChanged and IVolumeChanged events to their string representations via the usual object.ToString() method implemented on their concrete base classes.

But the StockPublisherActor needs some way of getting those events from the Akka.CQRS.Pricing.Service in the first place - and that’s where the StockEventConfiguratorActor and the ClusterClient come into play:

public class StockEventConfiguratorActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();
    private IActorRef _clusterClient;
    private readonly IActorRef _stockPublisher;
    private ImmutableHashSet<ActorPath> _initialContacts;


    private sealed class Start
    {
        public static readonly Start Instance = new Start();
        private Start() { }
    }


    public StockEventConfiguratorActor(IActorRef stockPublisher, IReadOnlyList<Address> contactAddresses)
    {
        _initialContacts = contactAddresses.Select(x => new RootActorPath(x) / "system" / "receptionist")
			.ToImmutableHashSet();
        _stockPublisher = stockPublisher;


        Initializing();
    }


    private void Initializing()
    {
        Receive<Start>(s =>
        {
            _log.Info("Contacting cluster client on addresses [{0}]", string.Join(",", _initialContacts));
            _clusterClient.Tell(new ClusterClient.Send("/user/subscriptions", new SubscribeClientAll()));
        });


        Receive<ReceiveTimeout>(t => { Self.Tell(Start.Instance); });


        ReceiveAny(_ =>
        {
            // connected via ClusterClient now
            _stockPublisher.Forward(_);
        });
    }


    protected override void PreStart()
    {
        Context.SetReceiveTimeout(TimeSpan.FromSeconds(15));
        Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(2), Self, Start.Instance, ActorRefs.NoSender);
        _clusterClient = Context.ActorOf(Akka.Cluster.Tools.Client.ClusterClient.Props(ClusterClientSettings
            .Create(Context.System)
            .WithInitialContacts(_initialContacts)));
    }
}

The actor is responsible for establishing a connection to at least one of the Akka.CQRS.Pricing.Service nodes - that’s the purpose of setting up the _initialContacts property inside this actor’s constructor: it tells us where to find the ClusterClientReceptionist on each node.

Akka.NET ClusterClient and ClusterClientReceptionist setup

The relationship between the ClusterClient and the ClusterClientReceptionist can be defined thusly: the ClusterClient relies on the ClusterClientReceptionist to route its messages to the appropriate destinations inside the cluster and to send replies back upstream to the ClusterClient who made the original request.

The ClusterClient only needs to be able to successfully contact a single ClusterClientReceptionist in order to be effective - once that first receptionist is succesfully contacted it will publish the addresses of other receptionists that are running inside the cluster back to the ClusterClient, that way the client can always maintain at least one point of contact with the cluster over very long periods of time - even if the membership of the cluster churns as a result of deployments, autoscaling, network partitions, and so forth.

In the case of the StockEventConfiguratorActor, we create a ClusterClient immediately at startup and repeatedly send a Akka.CQRS.Subscriptions.SubscribeClientAll message to the /user/subscriptions actor until we start receiving a stream of traffic back. That /user/subscriptions running inside the cluster… Somewhere… How do we ensure that message gets handled by someone?

That’s the job of the ClusterClientReceptionist running on the Akka.CQRS.Pricing.Service:

var clientHandler =
    actorSystem.ActorOf(Props.Create(() => new ClientHandlerActor(shardRegion)), "subscriptions");

// make ourselves available to ClusterClient at /user/subscriptions
ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);

The ClientHandlerActor inside the Akka.CQRS.Pricing.Service is a basic actor responsible for managing IPriceUpdate and IVolumeChanged events on behalf of remote clients:

/// <summary>
/// Responsible for handling inbound requests from the <see cref="Akka.Cluster.Tools.Client.ClusterClient"/>
/// actors running on the Web nodes.
/// </summary>
public sealed class ClientHandlerActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();
    private readonly IActorRef _priceRouter;


    public ClientHandlerActor(IActorRef priceRouter)
    {
        _priceRouter = priceRouter;


        Receive<SubscribeClient>(s =>
        {
            _log.Info("Received {0} from {1}", s, Sender);
            _priceRouter.Tell(new MarketSubscribe(s.StockId, MarketEventHelpers.AllMarketEventTypes, Sender));
        });


        Receive<SubscribeClientAll>(a =>
        {
            _log.Info("Received {0} from {1}", a, Sender);
            foreach (var s in AvailableTickerSymbols.Symbols)
            {
                _priceRouter.Tell(new MarketSubscribe(s, MarketEventHelpers.AllMarketEventTypes, Sender));
            }
        });


        Receive<UnsubscribeClient>(s =>
        {
            _log.Info("Received {0} from {1}", s, Sender);
            _priceRouter.Tell(new MarketUnsubscribe(s.StockId, MarketEventHelpers.AllMarketEventTypes, Sender));
        });
    }
}

The really important detail, however, is this call to the ClusterClientReceptionist:

// make ourselves available to ClusterClient at /user/subscriptions
ClusterClientReceptionist.Get(actorSystem).RegisterService(clientHandler);

What this line does: it enables ClusterClients to use a ClusterClient.Send command to deliver messages to this actor at list local path (/user/subscriptions), thus connecting the dots between the ClusterClient and the rest of the Akka.NET cluster.

ClusterClient messaging

When the ClusterClient.Send message is sent to a ClusterClientReceptionist, we create a local “response tunnel” actor to act as a proxy back to the original sending actor on the same side of the network as the ClusterClient - this is done so the rest of the Akka.NET cluster doesn’t accidentally open a bunch of unnecessary addtional inbound connections back into the Akka.CQRS.Pricing.Web node.

All replies back to the Sender in response to a ClusterClient.Send message are all piped back through the response tunnel, which are then finally sent back to the original sender of the ClusterClient.Send message back on the same node running the ClusterClient itself.

In the event that the current ClusterClientReceptionist node dies, the StockEventConfiguratorActor will use the ClusterClient to attempt to contact one of the receptionists running on a different node in order to resume communications again - which gives us the necessary level of fault tolerance we need to be able to redeploy all of our services independently without compromising availability.

Running Akka.CQRS.Pricing.Web

No code changes are needed to get Akka.CQRS.Pricing.Web off the ground - we should have everything we need already built into the project. All we need to do is add the Akka.CQRS.Pricing.Web Docker image output to docker-compose.yaml file at the root of the solution:

  pricing-web:
    image: akka.cqrs.pricing.web
    ports:
      - '8999:80'
    environment:
      CLUSTER_SEEDS: "akka.tcp://AkkaTrader@pricing-root:6055"
      CLUSTER_PORT: 16666
      STATSD_PORT: 8125
      STATSD_URL: "graphite"
      JAEGER_AGENT_HOST: "jaeger"
    restart: on-failure
    depends_on:
      - "pricing-root"

Add the above YAML to the end of the docker-compose.yaml file.

If you already built all of the Docker images in the previous step, you should be good to go - otherwise follow the Docker build process again.

Notice: we’re using the CLUSTER_SEEDS environment variable to tell Akka.CQRS.Pricing.Web what the initial ClusterClientReceptionist nodes are inside our docker-compose.yaml file - that information is subsequently read by the Akka.CQRS.Pricing.Web AkkaService when it’s started by the ASP.NET Core bootstrapper:

public ActorSystem Sys { get; private set; }


public Task StartActorSystem(StockHubHelper helper)
{
    Console.WriteLine("STARTING AKKA.NET");


    var conf = ConfigurationFactory.ParseString(File.ReadAllText("app.conf"))
        .BoostrapApplication(new AppBootstrapConfig(false, false));


    var actorSystem = Sys = ActorSystem.Create("AkkaCqrsWeb", conf);
    var stockPublisherActor =
        actorSystem.ActorOf(Props.Create(() => new StockPublisherActor(helper)), "stockPublisher");


    var initialContactAddress = Environment.GetEnvironmentVariable("CLUSTER_SEEDS")?.Trim().Split(",")
        .Select(x => Address.Parse(x)).ToList();


    if (initialContactAddress == null)
    {
        actorSystem.Log.Error("No initial cluster contacts found. Please be sure that the" + 
				" CLUSTER_SEEDS environment variable is populated with at least one address.");
        return Task.FromException(new ConfigurationException(
            "No initial cluster contacts found. Please be sure that the" +
			" CLUSTER_SEEDS environment variable is populated with at least one address."));
    }


    var configurator = actorSystem.ActorOf(
        Props.Create(() => new StockEventConfiguratorActor(stockPublisherActor, initialContactAddress)),
        "configurator");


    return Task.CompletedTask;
}


public async Task Stop()
{
    await Sys.Terminate();
}

Once this is finished, call docker-compose up and browse to http://localhost:8999. You should see output similar to the following:

Akka.CQRS.Pricing.Web live stock data

Every 10 seconds or so there will be a new batch of price and volume updates - so if you don’t see data right away, don’t panic. Wait for the interval to come around.

Next

Now that our Akka.NET cluster is running using the best available tools and practices, it’s time to put everything together: Lesson 5 - Deploying Akka.Cluster inside Kubernetes

If you liked this post, you can share it with your followers or follow us on Twitter!
Written on

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.