.NET Heisenbug Mystery Theater: How Did an Exception Escape its Catch Block?
A painful lesson on atomicity and the assignment of structs.
21 minutes to readOver the past several months the Akka.NET team has had reports of the following Exception
popping up unexpectedly throughout many of our plugins and end-user applications that use the Akka.Streams1 SelectAsync
stage - such as Akka.Streams.Kafka and Akka.Persistence.Sql:
That error message seems simple enough - it comes from here inside GraphStage.cs
:
[InternalApi]
public void InternalOnDownstreamFinish(Exception cause)
{
try
{
if (cause == null)
throw new ArgumentException("Cancellation cause must not be null", nameof(cause));
In Akka.Streams parlance, a stream gets cancelled when an unhandled Exception
is thrown and that error should be propagated all the way down to this GraphStage.InternalOnDownstreamFinish
method so we can log why the stream is being cancelled / terminated.
Here’s the mystery - this is the code that “threw” the Exception
inside Akka.Persistence.Sql for instance:
.SelectAsync(
JournalConfig.DaoConfig.Parallelism,
async promisesAndRows =>
{
try
{
await WriteJournalRows(promisesAndRows.Rows);
foreach (var taskCompletionSource in promisesAndRows.Tcs)
taskCompletionSource.TrySetResult(NotUsed.Instance);
}
catch (Exception e)
{
foreach (var taskCompletionSource in promisesAndRows.Tcs)
taskCompletionSource.TrySetException(e);
}
return NotUsed.Instance;
})
How on Earth did the Exception
escape the catch
block? And why do we have no record of it by the time the stream completes? That is the mystery we are going to explore today.
SelectAsync
Exceptions
Over several months we attempted to trap this issue so we could accurately locate where it was happening inside our code. The original ArgumentException
was very broad, so we needed better data:
And eventually, those changes produced logs that looked like this:
[ERROR][03/14/2025 06:26:33.434Z][Thread 0029][FlowShape`2([SelectAsync.in] [SelectAsync.out])(akka://KafkaUnexpectedRecordsConsumer2/user/kafka-unexpected-records-100-no-PollKafka-actor/worker-0/StreamSupervisor-2)] An exception occured inside SelectAsync while processing message [Akka.Streams.Kafka.Messages.CommittableMessage`2[Confluent.Kafka.Ignore,System.Byte[]]]. Supervision strategy: Stop
And different users using different plugins, both of which use SelectAsync
internally, reported similar versions of this new error too:
- Kafka Producer - Exception occured inside SelectAsync - Cancellation cause must not be null
BaseByteArrayJournalDao
lacks resiliency against stream failure
Ok, so now we know where the Exception
is coming from - SelectAsync
. That narrows our search area by a significant amount. Now the next question is - how can an Exception
escape this code inside SelectAsync
?
The Plumbing
If we peel the covers back on SelectAsync
we have the following:
public SelectAsync(int parallelism, Func<TIn, Task<TOut>> mapFunc)
{
_parallelism = parallelism;
_mapFunc = mapFunc;
Shape = new FlowShape<TIn, TOut>(In, Out);
}
Akka.Streams’ syntax looks a lot like LINQ, but the differences are:
- It’s asynchronous, all run by Akka.NET actors under the covers and
- It’s back-pressure aware - if a stage is blocking as a result of an I/O or compute constraint, we block the upstream stages from emitting new events.
The int parallelism
is one of the tools that SelectAsync
uses to create backpressure - by limiting the number of in-flight tasks that can be executing at any given time. The mapFunc
is what emits the Task<TOut>
that we are going to await
on.
The other thing that is special about SelectAsync
is that it preserves invocation order - even if the parallel Task<TOut>
s complete in arbitrary orders (which they almost certainly will), we will always deliver the results of SelectAsync
in the original invocation order of the TIn
inputs sent to us2.
Therefore, SelectAsync
has to have a data structure for storing the pending Task<TOut>
’s eventual output for this purpose:
private class Holder<T>
{
private readonly Action<Holder<T>> _callback;
public Holder(object message, Result<T> element, Action<Holder<T>> callback)
{
_callback = callback;
Message = message;
Element = element;
}
public Result<T> Element { get; private set; }
public object Message { get; }
public void SetElement(Result<T> result)
{
Element = result.IsSuccess && result.Value == null
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}
public void Invoke(Result<T> result)
{
SetElement(result);
_callback(this);
}
}
That’s what the Holder<T>
does, and the Holder<T>
will eventually accept a Result<T>
once the Task<T>
completes:
public struct Result<T> : IEquatable<Result<T>>
{
public readonly bool IsSuccess;
public readonly T Value;
public readonly Exception Exception;
public Result(T value) : this()
{
IsSuccess = true;
Value = value;
}
public Result(Exception exception) : this()
{
IsSuccess = false;
Exception = exception;
}
}
And this is where all of the magic happens inside SelectAsync
:
public override void OnPush()
{
var message = Grab(_stage.In);
try
{
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
_buffer.Enqueue(holder);
// We dispatch the task if it's ready to optimize away
// scheduling it to an execution context
if (task.IsCompleted)
{
holder.SetElement(Result.FromTask(task));
HolderCompleted(holder);
}
else
task.ContinueWith(t => holder.Invoke(Result.FromTask(t)),
TaskContinuationOptions.ExecuteSynchronously);
}
catch (Exception e)
{
// ... error handling
}
- Once we get a new
TIn
“pushed” to use from upstream, we will invoke the user-defined function and receive a newTask<TOut>
; - We will check to see if this
Task
completed quickly and if it has, we will immediately callHolder<TOut>.SetElement
without using a continuationTask
; and - If the
Task
is still running, we will schedule a continuationTask
and have that callHolder<TOut>.Invoke
.
Holder<T>.Invoke
ended up being the key detail in this whole piece - remember, these Task
s that SelectAsync
is produced are being run as detached Task
s. So when they complete, we’re relying on a little bit of Akka.Streams infrastructure to marshall the results back into our actors in a thread-safe way:
Action<Holder<T>> _taskCallback = GetAsyncCallback<Holder<TOut>>(HolderCompleted);
private void HolderCompleted(Holder<TOut> holder)
{
var element = holder.Element;
if (element.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}
var exception = element.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(exception); // THIS IS WHERE THE PROBLEMS OCCURRED
break;
case Directive.Resume:
case Directive.Restart:
if (IsAvailable(_stage.Out))
PushOne();
break;
default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
}
}
The GetAsyncCallback
method decorates our Action<TOut>
to transform it into an actor message, which gets processed normally inside the Akka.Streams’ actors thread-safe context. This is necessary in order to make sure that the callback function can safely access the Akka.Stream state without needing lock
s or any synchronization mechanisms.
That callback function is where the exception was getting detected - but here’s the catch: the Exception
was null
here. This is what created our original ArgumentException
error at the start of the post. But how could this occur?
Task
s, “Atomic” struct
Assignments, and Lies
There are two critical details at work here:
- There is something very fishy about the
Holder<T>.Invoke
method. Result<T>
is a user-definedstruct
.
First, let’s take a look at Holder<T>.Invoke
with some extra comments:
public void Invoke(Result<T> result)
{
SetElement(result); // happens inside the `ContinueWith` function's context
_callback(this); // happens inside the SelectAsync `StageActor` context
// VERY, VERY LIKELY THAT THESE ARE TWO DIFFERENT THREADS
}
This call is actually split across two different threads of execution - as my comments highlight. This is a potentially unsafe assignment. A no-no.
private static readonly Result<TOut> NotYetThere = Result.Failure<TOut>(new Exception());
var holder = new Holder<TOut>(message, NotYetThere, _taskCallback);
When we create a brand new Holder<TOut>
we initialize its result with a placeholder NotYetThere
value, which we will check for whenever the SelectAsync
stage gets pulled for output by the stages below it:
else if (_buffer.Peek().Element == NotYetThere)
{
if (Todo < _stage._parallelism && !HasBeenPulled(inlet))
TryPull(inlet);
}
else
{
var holder = _buffer.Dequeue();
var result = holder.Element;
if (!result.IsSuccess)
{
// this could happen if we are looping in PushOne and end up on a failed Task before the
// HolderCompleted callback has run
var strategy = _decider(result.Exception);
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(result.Exception); // FAILURE CAN HAPPEN HERE TOO
return;
The reason I point this out: we can’t pull an element where Holder<TOut>.Result
has not been assigned yet - the NotYetThere
check ensures this.
HOWEVER: struct
assignment does not work the same way class
or “heap” assignment does.
Here’s the rub: remember how I said Result<T>
is a struct
? Well when you assign a user-defined struct
that operation is not atomic! In the words of the great Eric Lippert:
I also talked a bit about how making fields of a struct readonly has no effect on atomicity; when the struct is copied around, it may be copied around four bytes at a time regardless of whether its fields are marked as readonly or not.
There is a larger problem though with reasoning about readonly fields in a struct beyond their non-atomicity. Yes, when you read from readonly fields in a struct on multiple threads without any locking, you can get inconsistent results due to race conditions. But the situation is actually worse than that; readonly fields need not give you results that you think are consistent even on one thread! Basically, readonly fields in a struct are the moral equivalent of the struct author writing a cheque without having the funds to back it3.
So here’s how we ended up with a null
Exception that escaped the catch
block - the default
value of a Result<T>
looks like this:
public struct Result<T> : IEquatable<Result<T>>
{
public readonly bool IsSuccess; // false
public readonly T Value; // null
public readonly Exception Exception; // null
public Result()
{
}
}
In some instances while the Result<T>
value was being assigned on one thread, it was being dirtily / unsafely read on another as its default(Result<T>)
value - which gives us a null
Exception
with IsSuccess == false
. That set of conditions would transport us exactly to this error state here:
if (!result.IsSuccess) // we pass here because `IsSuccess == false`
{
var strategy = _decider(result.Exception); // decider does not care if Exception is `null`
Log.Error(result.Exception, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", holder.Message, strategy);
switch (strategy)
{
case Directive.Stop:
// `Exception` is null and gets passed all the way down to InternalOnDownstreamFinish, which blows up
FailStage(result.Exception);
Here’s the rub: this “exception” can AND DID occur without a real Exception
ever being thrown inside SelectAsync
the entire time. It was entirely the dirty assignment + read of the Result<T>
struct
that created this problem.
Fixing the Heisenbug
This bug was a true Heisenbug in the sense that it was elusive to observe via traditional debugging. The debugger created just enough lag where the two threads could both read the same assigned value for the struct Result<T>
.
However, fixing it was quite simple: just do everything inside the AsyncCallback
:
private sealed class Holder<T>(object? message, Result<T> element)
{
public object? Message { get; private set; } = message;
public Result<T> Element { get; private set; } = element;
public void SetElement(Result<T> result)
{
Element = result.IsSuccess && result.Value == null
? Result.Failure<T>(ReactiveStreamsCompliance.ElementMustNotBeNullException)
: result;
}
}
private readonly Action<(Holder<TOut>, Result<TOut>)> _taskCallback;
_taskCallback = GetAsyncCallback<(Holder<TOut> holder, Result<TOut> result)>
(t => HolderCompleted(t.holder, t.result));
private void HolderCompleted(Holder<TOut> holder, Result<TOut> result)
{
// we may not be at the front of the line right now, so save the result for later
holder.SetElement(result);
if (result.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}
// rest of method
}
If your first impulse upon seeing an issue like this is to build some sort of Rube-Goldberg machine out of synchronization code: stop!
In many cases these types of problems can be solved via simplification - in this instance, moving the assignment and reading of the struct
and to inside the same thread.
-
If you want to learn more about Akka.NET Streams, check out “Why and When to Use Akka.Streams” ↩
-
If you don’t care about preserving invocation order and instead would prefer higher throughput, we have a
SelectAsyncUnordered
stage that does exactly this in Akka.NET streams. ↩ -
If this sort of low-level CLI/CLR stuff fascinates you, all three of Eric’s posts in his “Atomicity, volatility and immutability are different” series are excellent. Part 1. Part 2. Part 3. ↩
- Read more about:
- Akka.NET
- Case Studies
- Videos
Observe and Monitor Your Akka.NET Applications with Phobos
Did you know that Phobos can automatically instrument your Akka.NET applications with OpenTelemetry?
Click here to learn more.