Why and When to Use Akka.Streams

A recording of our recent webinar 'Introducing Akka.NET Streams' and more.

On this blog before we’ve covered a high level introduction to Akka.Streams for Akka.NET, but in this post we go into some more detail on:

  1. What is Akka.Streams;
  2. The motivations for using Akka.Streams; and
  3. When should you use Akka.Streams?

“Introduction to Akka.Streams”

I covered all of this material over the course of 90 minutes in our recent webinar, “Introduction to Akka.NET Streams,” the recording for which is embedded below:

You can explore the code samples I produced for this webinar here: https://github.com/Aaronontheweb/intro-to-akka.net-streams - and those samples are largely all executable via .NET Interactive, which you can run directly in the browser via Binder or in Visual Studio Code.

However, in this post I will also break down some of the finer points in writing so you don’t have to watch the entire 90 minute webinar.

Motiviations for Akka.Streams

There are three major motivations for Akka.Streams:

  1. Architectural: provide a robust reference implementation for Reactive Streams in .NET.
  2. Reuse: repeatable low-code patterns for solving common asynchronous, event-driven problems (i.e. batching, debouncing, etc.)
  3. Integrations: streamline integration between other technologies and Akka.NET.

Akka.Streams Stabilizes Asynchronous Producers and Consumers

At its core, Akka.Streams is an implementation of the Reactive Streams protocol - which is designed to address a conceptually simple, nasty in practice problem.

Reactive Streams use case: asynchronous stream where producer overwhelms a consumer

In this scenario we have a web API that produces 10,000 asynchronous web requests per second, all routed towards an internal application.

That internal application relies on a database, and when the database occasionally becomes busy throughput can drop as low as 1,000 requests per second.

What will happen in this scenario? Most likely: failure. In an asynchronous producer-consumer system, or really any type of asynchronous streaming system it’s possible for a fast-moving producer to overwhelm and topple a slower-moving consumer.

So what’s needed to resolve this issue? The answer is a non-blocking backpressure system that allows the consumer to tell the producer to back-off.

Reactive Streams backoff

The producer, once it receives this backoff signal, can change strategies on how it manages its workload until the consumer is ready again. This is the entire purpose of Reactive Streams!

These strategy changes can include:

  • Pausing production of new events (i.e. stop reading from Kafka until consumer is ready;)
  • Aggregating events together (i.e. rolling up totals into larger values);
  • Buffering events;
  • Dropping older events (i.e. circular buffer);
  • Batching (i.e. combining byte arrays together) and more.

Akka.Streams is purpose-built to facilitate these types of designs using an expressive, LINQ-like syntax that uses a minimal amount of code.

From “Akka.Streams and Backoff” - which you should read for a full explanation:

#r "nuget: Akka.Streams, 1.4.24"
#r "nuget: Akka.Streams.IAsyncEnumerable, 0.1.0"

using System.Linq;
using System.Collections.Immutable;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;

ActorSystem actorSystem = ActorSystem.Create("StreamsExample2");
IMaterializer materializer = actorSystem.Materializer();

// Fast upstream producer - instantly produces 10,000 requests
Source<int,NotUsed> source = Source.From(Enumerable.Range(0, 10000));

// Group into batches of 100
Source<IEnumerable<int>,NotUsed> batchingSource = source.Via(Flow.Create<int>().Grouped(100)); 

var start = DateTime.UtcNow;

// simulate a slower downstream consumer - can only process 10 events per second
var slowSink = batchingSource.Via(Flow.Create<IEnumerable<int>>().Delay(TimeSpan.FromMilliseconds(1000), DelayOverflowStrategy.Backpressure)
    .Select(x => (x.Sum(), DateTime.UtcNow - start))); // sum each group;

var output = slowSink.RunAsAsyncEnumerable(materializer);

await foreach(var i in output){
    Console.WriteLine($"{i}");
}

This will produce an output that looks like this:


(4950, 00:00:01.0206637)
(14950, 00:00:01.0214608)
(24950, 00:00:01.0214684)
(34950, 00:00:01.0214717)
(44950, 00:00:01.0214746)
(54950, 00:00:01.0214826)
(64950, 00:00:01.0215108)
(74950, 00:00:01.0215165)
(84950, 00:00:01.0215372)
(94950, 00:00:01.0215412)
(104950, 00:00:01.0215499)
(114950, 00:00:01.0217026)
(124950, 00:00:01.0217156)
(134950, 00:00:01.0217258)
(144950, 00:00:01.0218766)
(154950, 00:00:01.0218855)
(164950, 00:00:02.0341112)
(174950, 00:00:02.0346757)
(184950, 00:00:02.0346826)
(194950, 00:00:02.0346855)

Our IAsyncEnumerable that we materialize the stream into can buffer up to 16 messages at any given time (internal implementation detail, but it’s configurable) - each time we reach that 16 message threshold the Flow.Delay kicks in and backpressures the upstream for up to 1 second. That’s why you see the 1 second pause after each group of 16 elements in the .NET Interactive notebook output.

Akka.Streams backoff implementation

Akka.Streams’ backpressure system is implemented using a Dead man’s switch:

  • Downstream consumers demand up to N items worth of output from upstream;
  • Producers keep track of this demand value, N, and will send up to N items to the consumer;
  • In the happy path scenario, each time a consumer successfully processes an item it will demand 1 additional item from upstream;
  • In the unhappy path, the consumer will be too busy to request more output from upstream and the producer will notice that it has 0 demand from downstream;
  • When demand is 0, the producer executes its backoff strategy - in the earlier code sample it was to use the Delay for up to 1 second to pause traffic from upstream to downstream.

If you are building asynchronous streaming applications, you will constantly be faced with these types of architectural dilemmas - Akka.Streams gives you a simple, but proven protocol for load-balancing this problem along with a set of highly configurable strategies for implementing it.

Akka.Streams Provides Repeatable, Low-Code Patterns for Event-Driven Programming

In my opinion, the biggest benefit of Akka.Streams is that it offers a set of low-code, highly repeatable, composable, and performant patterns for event-driven programming.

While we have an entire list of built-in Akka.Streams stages and their semantics (and even that list isn’t 100% complete), here’s a set of some of the stages I’ve frequently found to be useful:

  • Combine<T> - merge several sources together
  • GroupedWithin<T> - debounce events in groups of N or within T milliseconds
  • Batch<T> - group elements together when downstream is too busy
  • Select<TIn,TOut> - project from one type to another
  • Where<T> - filter out elements based on predicate
  • Broadcast<T> - broadcast stream elements to multiple consumers
  • Throttle<T> - delay the rate at which elements can be passed downstream

Just to give you an idea as to how powerful composition with these Akka.Streams stages are, let’s consider the example of my Akka.Streams networking benchmark that uses Akka.Streams.IO to perform duplex TCP streaming complete with message framing and some light serialization:

Server

var actorSystem = ActorSystem.Create("AkkaStreams", @"akka.loglevel = DEBUG");
var materializer = actorSystem.Materializer();

// create server
Source<Dsl.Tcp.IncomingConnection, Task<Dsl.Tcp.ServerBinding>> connections = actorSystem.TcpStream()
	.Bind("127.0.0.1", 8888);

var (serverBind, source) = connections.PreMaterialize(materializer);

// server event handler - per connection
source.RunForeach(conn =>
{
    var echo = Flow.Create<ByteString>()
        .Via(Decoder)
        .Select(c => c.ToString())
        .Select(c =>
        {
            return c.ToUpperInvariant();
        })
        .Select(ByteString.FromString)
        .Via(Encoder)
        .GroupedWithin(100, TimeSpan.FromMilliseconds(20))
        .Select(x => x.Aggregate(ByteString.Empty, (s, byteString) => s.Concat(byteString)));

        conn.HandleWith(echo, materializer);
}, materializer);

await serverBind;

Client

// start client to connect to server
var clientTask = actorSystem.TcpStream().OutgoingConnection(serverBind.Result.LocalAddress);

// generate repeating loop of data
var repeater = ByteString.FromString("A");
var dataGenerator = Source.Repeat(repeater)
    .Via(Encoder);

// compute rate at which data is sent client --> server --> client per second
var bytesPerSecondFlow = Flow.Create<ByteString>()
    .Via(Decoder)
    .GroupedWithin(1000, TimeSpan.FromMilliseconds(1000))
    .Select(bytes => bytes.Sum(x => x.Count))
    .Aggregate((0L, DateTime.UtcNow), (l, s) =>
    {
        var (accum, time) = l;
        accum += s;

        var timeSpan = (DateTime.UtcNow - time);
        if (timeSpan.TotalSeconds >= 1.0d)
        {
            Console.WriteLine($"{accum} byte/{timeSpan.TotalSeconds}");
            return (0L, DateTime.UtcNow); // reset
        }

        return (accum, time);
    })
    .To(Sink.Ignore<(long, DateTime)>());

// run BiDi flow
dataGenerator.Via(clientTask).To(bytesPerSecondFlow).Run(materializer);

So this is a duplex client + server application, performing interactive TCP messaging, in fewer than 100 lines of code - and I’m sure I could compress its footprint even more without altering its behavior significantly.

However, this code sample has one small proble, as I noted in “Introduction to Akka.Streams” at the 23 minute mark: the performance is awful. About 20,000 messages per second. What gives?

The problem here is that we’re flushing the socket for every single message - which is non-performant. So what if we aggregated as many outbound messages together as we could between flushes, which is what we do inside Akka.Remote? How could we accomplish that with Akka.Streams?

This fix can be implemented using a single method call in Akka.Streams:

// generate repeating loop of data
var repeater = ByteString.FromString("A");
var dataGenerator = Source.Repeat(repeater)
    .Via(Encoder)

    // batch up to 100 messges at a time
    .Batch(100, s => s, (s, byteString) => s.Concat(byteString));

We added a Batch<T> Akka.Streams stage on the client side and it improves the performance from roughly 20,000 messages to per second to 360,000 messages per second. Akka.Streams gives you the ability to make high-impact changes on the performance and behavior of your applications with a very small amount of code!

Akka.Streams Simplifies Integration Between Akka.NET and Other Technologies

Through the Alpakka project, Akka.Streams.Kafka, Akka.Persistence.Query, and other libraries it’s very easy to streamline integration between those technologies and Akka.NET actors themselves.

Examples of some of the integrations that exist for Akka.Streams today:

  • Akka.Peristence.Query
  • Azure EventHubs, Service Bus, Queues
  • Kafka
  • AMQP 0.9 / 1.0 (RabbitMQ)
  • SignalR
  • AWS Kinesis, SQS, SNS
  • CSV / XML Files
  • Petabridge.Cmd

Akka.Streams makes it really simple to combine these technologies with your Akka.NET actors - and you can see a stand-alone example from “Introduction to Akka.Streams” here that uses Petabridge.Cmd and Akka.Streams.SignalR to create a simple Akka.Cluster visualizer.

Conclusion

Akka.Streams is a high-level abstraction built on top of Akka.NET actors that creates powerful streaming workflows which are capable of self-balancing, are highly composable and expressive, and simplifies integration with other non-Akka.NET technologies.

I highly recommend taking a look at the code samples mentioned earlier in this post and watch the recording of “Introduction to Akka.Streams” for yourself!

If you liked this post, you can share it with your followers or follow us on Twitter!
Written by Aaron Stannard on August 31, 2021

 

 

Observe and Monitor Your Akka.NET Applications with Phobos

Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?

Click here to learn more.