Some time ago, Microsoft announced Orleans, an implementation of the actor model in .Net which is designed for the cloud environment where instances are ephemeral.
We’re currently working on a number of projects in Erlang and have run into some assumptions in distributed Erlang which doesn’t hold true in a cloud-hosted environment where nodes are ephemeral and entire topologies are constantly in flux. Also, as most of our backend code for Gamesys Social is in .Net, being able to work with languages that we’re already familiar with is a win for the team (more people being able to work on the codebase, for instance).
As such I have been taking an interest in Orleans to see if it represents a good fit, and whether or not it holds up to some of its lofty promises around scalability, performance and reliability. Below is an account of my personal views having read the paper, downloaded the SDK and looked through the samples and followed through Richard Astbury’s Pluralsight course.
Since I posted this the other day, there has been some great feedback from the Orleans team and clarified several places where I have previously misunderstood based on the information I had at the time of writing. Some of my other concerns still remain, but at least two of the biggest sticking points – single point of failure and at-least-once message delivery – has been disproved.
As such, I’ll updated this post in several place to incorporate the new information that the Orleans team have provided via the comments.
I’ve left what was previously written untouched, but look out for the impacted sections (* followed by a paragraph that is underlined) throughout the post to see the relevant new information. In this callout sections I have focused on the correct behaviour that you should expected based on corrections from Sergey and Gabriel, if you’re interested in the background and rationale behind these decisions, please read their comments in full.
When I first read about Orleans, I was hesitant because of the use of code-gen (reminiscent of WCF there), and that the underlying message passing mechanism is hidden from you so you end up with a RPC mechanism (again, reminiscent of WCF…).
However, after spending some time with Orleans, I can definitely see its appeal – convenience and productivity. I was able to get something up and running quickly and with ease. My original concerns about code-gen and RPC didn’t get in the way of me getting stuff done.
As I dig deeper into how Orleans works though, a number of more worrying concerns surfaced regarding some of its core design decisions.
For starters, *1 it’s not partition tolerant towards partitions to the data store used for its Silo management. Should the data store be partitioned or suffer an outage, it’ll result in a full outage of your service. These are not traits of a masterless and partition tolerant system that is desirable when you have strict uptime requirements.
When everything is working, Orleans guarantees that there is only one instance of a virtual actor running in the system, but when a node is lost the cluster’s knowledge of nodes will diverge and during this time the single-activation guarantees becomes eventually consistent. However, you can provide stronger guarantees yourself (see Silo Management section below).
*2 Orleans uses at-least-once message delivery, which means it’s possible for the same message to be sent twice when the receiving node is under load or simply fails to acknowledge the first message in a timely fashion. This again, is something that you can mitigate yourself (see Message Delivery Guarantees section below).
Finally, its task scheduling mechanism appears to be identical to that of a naive event loop and exhibits all the fallacies of an event loop (see Task Scheduling section below).
*1 As Gabriel and Sergey both explained in the comments, the membership management works quite a bit different to what I first thought. Once connected, all heartbeats are sent between pairs of nodes using a linear algorithm, and the backend data store is only used for reaching agreements on what nodes are dead and to disseminate the new membership view to all other nodes.
In this case, losing connection to the backend data store would not impact existing, connected clusters. making it partition tolerant. If the backend data store becomes unavailable and at the same time as your cluster topology is changing then it will hinder updates to the membership and stop new nodes from being able to join the cluster.
Hopefully the implementation details of the membership management will be discussed in more detail in Orleans team’s future posts. Also, since Orleans will be open sourced in early 2015, we’ll be able to get a closer look at exactly how this behaves when the source code is available.
*2 Gabriel pointed out that by default Orleans does not resend messages that have timed out. So by default, it uses at-most-once delivery, but can be configured to automatically resend upon timeout if you want at-least-once delivery instead.
In Orleans, a Grain represents an actor, and each node has a Silo which manages the lifetime of the grains running inside the Silo. A Grain is activated when it receives a request, and can be deactivated after it becomes idle for a while. The Silo will also remove the deactivate Grains from memory to free up resources.
Orleans’ Grains are referred to as virtual actors. They are virtual because the state of a Grain can be persisted to a storage service, and then reinstated as the Grain is reactivated (after being deactivated due to idleness) on another node. This is a nice abstraction from a developer’s point of view, and to enable this level of indirection, Orleans introduces the mechanism of state providers.
To use storage providers, you first need to define an interface that represents the state of your Grain, and have it inherit from Orleans’ IGrainState interface. For example:
Then in your Grain implementation class, you provide this ITournamentGrainState interface as generic type parameter to the Orleans.Grains base class, as below. You also need to specify the storage provider you want to use via the [StorageProvider] attribute. The ProviderName specified here points to a storage provider you define in the configuration file for Orleans.
When a Grain is activated in a Silo, the Orleans runtime will go and fetch the state of the Grain for us and put it in an instance member called State. For instance, when the ActivateAsync method is called, the state of our TournamentGrain would have been populated from the DynamoDBStorage provider we created:
You can modify the state by modifying its members, but the changes will not be persisted to the backend storage service until you call the State.WriteStateAsync method. Un-persisted changes will be lost when the Grain is deactivated by the Silo, or if the node itself is lost.
Finally, there are a number of built-in storage providers, such as the Azure Table Storage, but it’s trivial to implement your own. To implement a custom storage provider, you just need to implement the IStorageProvider interface.
Storage providers make it very easy to create actors that can be easily resumed after deactivation, but you need to be mindful of a number of things:
- how often you persist state is a trade-off between durability and performance + cost
- if multiple parts of the state need to be modified in one call, you need to have a rollback strategy in place in case of exceptions or risk leaving dirty writes in your state (see Not letting it crash section below)
- you need to handle the case when persistence fails – since you’ve mutated the in-memory state, if persistence failed do you rollback or continue and hope that you get another chance at persisting the state before the actor is deactivated through idleness or the node crashing
In Erlang, there are no built-in mechanism for storage providers, but there is also nothing stopping you from implementing this yourself. Have a look at Bryan Hunter’s CQRS with Erlang talk at NDC Oslo 2014 for inspiration.
Silos use a backend store to manage Silo memberships, this uses Azure Table Storage by default. From the MSR paper, this is what it has to say about Silo memberships:
“Servers automatically detect failures via periodic heartbeats and reach an agreement on the membership view. For a short period of time after a failure, membership views on different servers may diverge, but it is guaranteed that eventually all servers will learn about the failed server and have identical membership views….if a server was declared dead by the membership service, it will shut itself down even if the failure was just a temporary network issue.”
Furthermore, on the guarantee that an actor (or Grain with a specific ID) is only activated on one node:
“In failure-free times, Orleans guarantees that an actor only has a single activation. However, when failures occur, this is only guaranteed eventually.
Membership is in flux after a server has failed but before its failure has been communicated to all survivors. During this period, a register-activation request may be misrouted if the sender has a stale membership view….However, it may be that two activations of the same actor are registered in two different directory partitions, resulting in two activations of a single-activation actor. In this case, once the membership has settled, one of the activations is dropped from the directory and a message is sent to its server to deactivate it.”
There are couple of things to note about Silo membership management from the above:
- *3 the way servers detonate when they lose connectivity to the storage service means it’s not partition-tolerant because if the storage service is partitioned from the cluster, even for a relatively short amount of time, then there’s a chance for every node that are running Silos to self detonate;
- *3 there is a single point of failure at the storage service used to track Silo memberships, any outage to the storage service results in outage to your Orleans service too (this happened to Halo 4);
- it offers strong consistency during the good times, but fails back to eventual consistency during failures;
- whilst it’s not mentioned, but I speculate that depending on the size of cluster and the time to converge on Silo membership views across the cluster, it’s possible to have more than two activations of the same Grain in the cluster;
- the conflict resolution approach above suggests that one activation is chosen at random and the rest are discarded, this seems rather naive and means losing intermediate changes recorded on the discard Grain activations, and
- since each activation can be persisting its state independently so it’s possible for the surviving Grain activation’s internal state to be out-of-sync with what had been persisted;
- these failure times can happen a lot more often than you think, nodes can be lost due to failure, but also as a result of planned/automatic scaling down events throughout the day as traffic patterns change (Orleans is designed for the cloud and all its elastic scalability after all).
During failures, it should be possible to provide stronger guarantee on single activation using optimistic concurrency around the Grain’s state. For instance,
1. node A failed, now the cluster’s view of Silos have diverged
2a. Grain receives a request on node B, and is activated with state v1
2b. Grain receives a request on node C, and is activated with state v1
3a. Grain on node B finishes processing request, and succeeds in saving state v2
3b. Grain on node C finishes processing request, but fails to save state v2 (optimistic concurrency at work here)
4. Grain on node C fails the request and triggers deactivation
5. Cluster only has one activation of the Grain on node B
Enforcing stronger single activation guarantee in this fashion should also remove the need for better conflict resolution. For this approach to work, you will need to be able to detect persistence failures due to optimistic concurrency. In DynamoDB, this can be identified by a conditional check error.
*3 Again, as per Gabriel and Sergey’s comments below, this is not true and there’s no single point of failure in this case. See *1 above, or read the comments for more detail.
Distributed Erlang employs a different approach for forming clusters. Nodes form a mesh network where every node is connected to every other node. They use a gossip protocol to inform each other when nodes join or leave the cluster.
This approach has a scalability limitation and doesn’t scale well to thousands, or even hundreds of nodes depending on the capabilities of each node. This is due to the overhead in forming and maintaining the cluster increases quadratically to the number of nodes. The effect is particularly evident if you require frequent inter-node communications, or need to use functions in the global built-in module.
In this particular space, SD (scalable distributed) Erlang is attempting to address this shortcoming by allowing you to create groups of sub-clusters amongst your nodes so that the size of the mesh network is limited to the size of the sub-clusters.
Random Actor Placement
Another interesting choice Orleans made is that, instead of using consistent hashing for actor placement (a technique that has been successfully used in a number of Key-Value Stores such as CouchBase and Riak), Orleans introduces another layer of indirection here by introducing the Orleans directory.
“The Orleans directory is implemented as a one-hop distributed hash table (DHT). Each server in the cluster holds a partition of the directory, and actors are assigned to the partitions using consistent hashing. Each record in the directory maps an actor id to the location( s) of its activations.”
The rationale for this decision is that it allows *4 random placement of actors will help avoid creation of hotspots in your cluster which might result from poorly chosen IDs, or bad luck. But it means that to retain correctness, every request to actors now require an additional hop to the directory partition first. To address this performance concern, each node will use a local cache to store where each actor is.
I think this is a well-meaning attempt to a problem, but I’m not convinced that it’s a problem that deserves the
- additional layer of indirection and
- the subsequent problem of performance, and
- the subsequent use of local cache and
- *5 the problem of cache invalidation that comes with it (which as we know, is one of the two hard problems in CS)
Is it really worth it? Especially when the IDs are guids by default, which hash well. Would it not be better to solve it with a better hashing algorithm?
From my personal experience of working with a number of different Key-Value stores, actor placement has never been an issue that is significant enough to deserve the special treatment Orleans have given it. I’d really like to see results of any empirical study that shows this to be a big enough issue in real-world key-value store usages.
*4 As Sergey mentioned in the comments, you can do a few more things such as using a PreferLocalPlacement strategy to “instruct the runtime to place an activation of a grain of that type local to the first caller (another grain) to it.That is how the app can hint about optimizing placement.”. This appears the same as when you spawn a new process in Erlang. It would require further clarification from Sergey or Gabriel but I’d imagine the placement strategy probably applies at the type level for each type of grain.
The additional layer of abstraction does buy you some more flexibility, and not having to move grains around when topology changes probably simplifies things (I imagine moving the directory information around is cheaper and easier than the grains themselves) from the implementation point-of-view too.
In the Erlang space, RiakCore provides a set of tools to help you build distributed systems and its approach gives you more control over the behaviour of your system. You do however have to implement a few more things yourself, such as how to move data around when cluster topology changes (though the vnode behaviour gives you the basic template for doing this) and how to deal with collisions etc.
*5 Hitting stale cache is not much of a problem in this case, where Orleans would do a new lookup, forward the message to the right destination and update the cache.
As a side, here’s how Riak does consistent hashing and read/write replications:
Message Delivery Guarantees
“Orleans provides at-least-once message delivery, by resending messages that were not acknowledged after a configurable timeout. Exactly-once semantics could be added by persisting the identifiers of delivered messages, but we felt that the cost would be prohibitive and most applications do not need it. This can still be implemented at the application level”
The decision to use at-least-once message delivery as default is a contentious one in my view. *6 A slow node will cause messages to be sent twice, and handled twice, which is probably not what you want most of the time.
Whilst the rationale regarding cost is valid, it seems to me that allowing the message delivery to time out and letting the caller handle timeout cases is the more sensible choice here. It’d make the handling of timeouts an explicit decision on the application developer’s part, probably at a per call basis since some calls are more important than others.
*6 The default behaviour is to not resent on timeout, so by default Orleans actually uses at-most-once delivery. But you can configure it to automatically resend upon timeout, i.e. at-least-once.
Not letting it crash
Erlang’s mantra has always been to Let It Crash (except when you shouldn’t). When a process crashes, it can be restarted by its supervisor. Using techniques such as event sourcing it’s easy to return to the previous state just before the crash.
When you except in an Orleans’ Grain, the exception does not crash the Grain itself and is simply reported back to the caller instead. This simplifies things but runs the risk of leaving behind dirty writes (hence corrupting the state) in the wake of an exception. For example:
The choice of not crashing the grain in the event of exceptions offers convenience at the cost of breaking the atomicity of operations and personally it’s not a choice that I agree with.
In a discussion on the Actor model with Erik Meijer and Clemens Szyperski, Carl Hewitt (father of the Actor model) said
“Conceptually, messages are processed one at a time, but the implementation can allow for concurrent processing of messages”
In Orleans, grains process messages one at a time normally. To allow concurrent processing of messages, you can mark your grain implementation with the [Reentrant] attribute.
Reentrant grains can be used as an optimization technique to remove bottlenecks in your network of grains.
“One actor is no actor, they come in systems, and they have to have addresses so that one actor can send messages to another actor.”
– Carl Hewitt
However, using reentrant grains means you lose the guarantee that the state is accessed sequentially and opens yourself up to potential race-conditions. You should use reentrant grains with great care and consideration.
In Erlang, concurrent processing of messages is not allowed. But, you don’t have to block your actor whilst it waits for some expensive computation to complete. Instead, you can spawn another actor and ask the child actor to carry on with the expensive work whilst the parent actor processes the next message. This is possible because the overhead and cost of spawning a process in Erlang is very low and the runtime can easily handle ten of thousands of concurrent processes and load balance across the available CPU resources via its schedulers.
If necessary, once the child actor has finished its work it can send a message back to the parent actor, whom can then perform any subsequent computation as required.
Using this simple technique, you acquire the same capability that reentrant grains offers. You can also control which messages can be processed concurrently, rather than the all-or-nothing approach that reentrant grains uses.
*7 Messages sent between Grains are usually serialized and then deserialized, this is an expensive overhead when both Grains are running on the same machine. You can optionally mark the messages are immutable so that they won’t be serialized when passed between Grains on the same machine, but this immutability promise is not enforced at all and it’s entirely down to you to apply the due diligence of not mutating the messages.
*7 A clarification here, messages are only serialized and deserialized when they are sent across nodes, messages sent between grains on the same node is deep-copied instead, which is cheaper than serialization. Marking the type as immutable skips the deep-copying process too.
But, it’s still your responsibility to enforce the immutability guarantee.
In Erlang, variables are immutable so there is no need to do anything explicit.
“Orleans schedules application turns using cooperative multitasking. That means that once started, an application turn runs to completion, without interruption. The Orleans scheduler uses a small number of compute
threads that it controls, usually equal to the number of CPU cores, to execute all application actor code.”
This is another point of concern for me.
Here we’re exhibiting the same vulnerabilities of an event-loop system (e.g. Node.js, Tornado) where a single poisoned message can cripple your entire system. Even without poisoned messages, you are still left with the problem of not distributing CPU resources evenly across actors, and allowing slow/misbehaving actors to badly impact your latency for other pending requests.
Even having multiple cores and having one thread per core (which is a sane choice) is not going to save you here. All you need is one slow-running actor on each processor-affined execution thread to halt the entire system.
The Erlang’s approach to scheduling makes much more sense – one scheduler per core, and an actor is allowed to execute 2000 reductions (think of one reduction as one function call to do something) before it has to yield the CPU so that another actor can get a slice of the CPU time. The original actor will then wait for its turn to run again.
This CPU-sharing policy is no different to what the OS does with threads, and there’s a good reason for that.
Ease of use
I think this is the big winner for Orleans and the focus of its design goals.
I have to admit, I was pleasantly surprised how easily and quickly I was able to put a service together and have it running locally. Based on what I have seen of the samples and Richard’s Pluralsight course, deploying to the cloud is pretty straight forward too.
Another win for Orleans here, as it’s designed from the start to deal with cluster topologies that can change dynamically with ephemeral instances. Whereas distributed Erlang, at least distributed OTP, assumes a fixed topology where nodes have well defined roles at start. There are also challenges around getting the built-in distributed database – Mnesia – to work well in a dynamically changing topology.
In many ways, I think Orleans is true to its original design goals of optimizing for developer productivity. But by shielding developers from decisions and considerations that usually comes with building distributed systems, it has also deprived them of the opportunity to build systems that need to be resilient to failures and meet stringent uptime requirements.
But, not every distributed system is critical, and not every distributed system needs five to nine nines uptime. As long as you’re informed about the trade-offs that Orleans have made and what they mean to you as a developer, you can at least make informed choices of if and when to adopt Orleans.
I hope this post will help you make those informed decisions, if I have been misinformed and incorrect about any parts of Orleans working, please do leave a comment below.
- Channel9 – using Orleans to build Halo 4’s Distributed Cloud Services in Azure
- Microsoft Research’s paper on Orleans
- Orleans codeplex page
- Steve Vinoski – Let It Crash…Except When You Shouldn’t
- Natalia Chechina – RELEASE Scalable Erlang