A look at Microsoft Orleans through Erlang-tinted glasses

Some time ago, Microsoft announced Orleans, an implementation of the actor model in .Net which is designed for the cloud environment where instances are ephemeral.

We’re currently working on a number of projects in Erlang and have run into some assumptions in distributed Erlang which doesn’t hold true in a cloud-hosted environment where nodes are ephemeral and entire topologies are constantly in flux. Also, as most of our backend code for Gamesys Social is in .Net, being able to work with languages that we’re already familiar with is a win for the team (more people being able to work on the codebase, for instance).

As such I have been taking an interest in Orleans to see if it represents a good fit, and whether or not it holds up to some of its lofty promises around scalability, performance and reliability. Below is an account of my personal views having read the paper, downloaded the SDK and looked through the samples and followed through Richard Astbury’s Pluralsight course.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Update 2014/12/08:

Since I posted this the other day, there has been some great feedback from the Orleans team and clarified several places where I have previously misunderstood based on the information I had at the time of writing. Some of my other concerns still remain, but at least two of the biggest sticking points – single point of failure and at-least-once message delivery – has been disproved.

As such, I’ll updated this post in several place to incorporate the new information that the Orleans team have provided via the comments.

I’ve left what was previously written untouched, but look out for the impacted sections (* followed by a paragraph that is underlined) throughout the post to see the relevant new information. In this callout sections I have focused on the correct behaviour that you should expected based on corrections from Sergey and Gabriel, if you’re interested in the background and rationale behind these decisions, please read their comments in full.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

TL;DR

When I first read about Orleans, I was hesitant because of the use of code-gen (reminiscent of WCF there), and that the underlying message passing mechanism is hidden from you so you end up with a RPC mechanism (again, reminiscent of WCF…).

However, after spending some time with Orleans, I can definitely see its appeal – convenience and productivity. I was able to get something up and running quickly and with ease. My original concerns about code-gen and RPC didn’t get in the way of me getting stuff done.

As I dig deeper into how Orleans works though, a number of more worrying concerns surfaced regarding some of its core design decisions.

For starters, *1 it’s not partition tolerant towards partitions to the data store used for its Silo management. Should the data store be partitioned or suffer an outage, it’ll result in a full outage of your service. These are not traits of a masterless and partition tolerant system that is desirable when you have strict uptime requirements.

When everything is working, Orleans guarantees that there is only one instance of a virtual actor running in the system, but when a node is lost the cluster’s knowledge of nodes will diverge and during this time the single-activation guarantees becomes eventually consistent. However, you can provide stronger guarantees yourself (see Silo Management section below).

*2 Orleans uses at-least-once message delivery, which means it’s possible for the same message to be sent twice when the receiving node is under load or simply fails to acknowledge the first message in a timely fashion. This again, is something that you can mitigate yourself (see Message Delivery Guarantees section below).

Finally, its task scheduling mechanism appears to be identical to that of a naive event loop and exhibits all the fallacies of an event loop (see Task Scheduling section below).

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*1 As Gabriel and Sergey both explained in the comments, the membership management works quite a bit different to what I first thought. Once connected, all heartbeats are sent between pairs of nodes using a linear algorithm, and the backend data store is only used for reaching agreements on what nodes are dead and to disseminate the new membership view to all other nodes.

In this case, losing connection to the backend data store would not impact existing, connected clusters. making it partition tolerant. If the backend data store becomes unavailable and at the same time as your cluster topology is changing then it will hinder updates to the membership and stop new nodes from being able to join the cluster.

Hopefully the implementation details of the membership management will be discussed in more detail in Orleans team’s future posts. Also, since Orleans will be open sourced in early 2015, we’ll be able to get a closer look at exactly how this behaves when the source code is available.

*2 Gabriel pointed out that by default Orleans does not resend messages that have timed out. So by default, it uses at-most-once delivery, but can be configured to automatically resend upon timeout if you want at-least-once delivery instead.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Terminology

In Orleans, a Grain represents an actor, and each node has a Silo which manages the lifetime of the grains running inside the Silo. A Grain is activated when it receives a request, and can be deactivated after it becomes idle for a while. The Silo will also remove the deactivate Grains from memory to free up resources.

Orleans’ Grains are referred to as virtual actors. They are virtual because the state of a Grain can be persisted to a storage service, and then reinstated as the Grain is reactivated (after being deactivated due to idleness) on another node. This is a nice abstraction from a developer’s point of view, and to enable this level of indirection, Orleans introduces the mechanism of state providers.

Storage Providers

To use storage providers, you first need to define an interface that represents the state of your Grain, and have it inherit from Orleans’ IGrainState interface. For example:

image

Then in your Grain implementation class, you provide this ITournamentGrainState interface as generic type parameter to the Orleans.Grains base class, as below. You also need to specify the storage provider you want to use via the [StorageProvider] attribute. The ProviderName specified here points to a storage provider you define in the configuration file for Orleans.

image

When a Grain is activated in a Silo, the Orleans runtime will go and fetch the state of the Grain for us and put it in an instance member called State. For instance, when the ActivateAsync method is called, the state of our TournamentGrain would have been populated from the DynamoDBStorage provider we created:

image

You can modify the state by modifying its members, but the changes will not be persisted to the backend storage service until you call the State.WriteStateAsync method. Un-persisted changes will be lost when the Grain is deactivated by the Silo, or if the node itself is lost.

image

Finally, there are a number of built-in storage providers, such as the Azure Table Storage, but it’s trivial to implement your own. To implement a custom storage provider, you just need to implement the IStorageProvider interface.

Storage providers make it very easy to create actors that can be easily resumed after deactivation, but you need to be mindful of a number of things:

  • how often you persist state is a trade-off between durability and performance + cost
  • if multiple parts of the state need to be modified in one call, you need to have a rollback strategy in place in case of exceptions or risk leaving dirty writes in your state (see Not letting it crash section below)
  • you need to handle the case when persistence fails – since you’ve mutated the in-memory state, if persistence failed do you rollback or continue and hope that you get another chance at persisting the state before the actor is deactivated through idleness or the node crashing

In Erlang, there are no built-in mechanism for storage providers, but there is also nothing stopping you from implementing this yourself. Have a look at Bryan Hunter’s CQRS with Erlang talk at NDC Oslo 2014 for inspiration.

Silo Membership

Silos use a backend store to manage Silo memberships, this uses Azure Table Storage by default. From the MSR paper, this is what it has to say about Silo memberships:

“Servers automatically detect failures via periodic heartbeats and reach an agreement on the membership view. For a short period of time after a failure, membership views on different servers may diverge, but it is guaranteed that eventually all servers will learn about the failed server and have identical membership views….if a server was declared dead by the membership service, it will shut itself down even if the failure was just a temporary network issue.

Furthermore, on the guarantee that an actor (or Grain with a specific ID) is only activated on one node:

“In failure-free times, Orleans guarantees that an actor only has a single activation. However, when failures occur, this is only guaranteed eventually.

Membership is in flux after a server has failed but before its failure has been communicated to all survivors. During this period, a register-activation request may be misrouted if the sender has a stale membership view….However, it may be that two activations of the same actor are registered in two different directory partitions, resulting in two activations of a single-activation actor. In this case, once the membership has settled, one of the activations is dropped from the directory and a message is sent to its server to deactivate it.”

There are couple of things to note about Silo membership management from the above:

  • *3 the way servers detonate when they lose connectivity to the storage service means it’s not partition-tolerant because if the storage service is partitioned from the cluster, even for a relatively short amount of time, then there’s a chance for every node that are running Silos to self detonate;
  • *3 there is a single point of failure at the storage service used to track Silo memberships, any outage to the storage service results in outage to your Orleans service too (this happened to Halo 4);
  • it offers strong consistency during the good times, but fails back to eventual consistency during failures;
  • whilst it’s not mentioned, but I speculate that depending on the size of cluster and the time to converge on Silo membership views across the cluster, it’s possible to have more than two activations of the same Grain in the cluster;
  • the conflict resolution approach above suggests that one activation is chosen at random and the rest are discarded, this seems rather naive and means losing intermediate changes recorded on the discard Grain activations, and
  • since each activation can be persisting its state independently so it’s possible for the surviving Grain activation’s internal state to be out-of-sync with what had been persisted;
  • these failure times can happen a lot more often than you think, nodes can be lost due to failure, but also as a result of planned/automatic scaling down events throughout the day as traffic patterns change (Orleans is designed for the cloud and all its elastic scalability after all).

During failures, it should be possible to provide stronger guarantee on single activation using optimistic concurrency around the Grain’s state. For instance,

1. node A failed, now the cluster’s view of Silos have diverged

2a. Grain receives a request on node B, and is activated with state v1

2b. Grain receives a request on node C, and is activated with state v1

3a. Grain on node B finishes processing request, and succeeds in saving state v2

3b. Grain on node C finishes processing request, but fails to save state v2 (optimistic concurrency at work here)

4. Grain on node C fails the request and triggers deactivation

5. Cluster only has one activation of the Grain on node B

Enforcing stronger single activation guarantee in this fashion should also remove the need for better conflict resolution. For this approach to work, you will need to be able to detect persistence failures due to optimistic concurrency. In DynamoDB, this can be identified by a conditional check error.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*3 Again, as per Gabriel and Sergey’s comments below, this is not true and there’s no single point of failure in this case. See *1 above, or read the comments for more detail.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Distributed Erlang employs a different approach for forming clusters. Nodes form a mesh network where every node is connected to every other node. They use a gossip protocol to inform each other when nodes join or leave the cluster.

image

This approach has a scalability limitation and doesn’t scale well to thousands, or even hundreds of nodes depending on the capabilities of each node. This is due to the overhead in forming and maintaining the cluster increases quadratically to the number of nodes. The effect is particularly evident if you require frequent inter-node communications, or need to use functions in the global built-in module.

In this particular space, SD (scalable distributed) Erlang is attempting to address this shortcoming by allowing you to create groups of sub-clusters amongst your nodes so that the size of the mesh network is limited to the size of the sub-clusters.

Random Actor Placement

Another interesting choice Orleans made is that, instead of using consistent hashing for actor placement (a technique that has been successfully used in a number of Key-Value Stores such as CouchBase and Riak), Orleans introduces another layer of indirection here by introducing the Orleans directory.

“The Orleans directory is implemented as a one-hop distributed hash table (DHT). Each server in the cluster holds a partition of the directory, and actors are assigned to the partitions using consistent hashing. Each record in the directory maps an actor id to the location( s) of its activations.”

image

The rationale for this decision is that it allows *4 random placement of actors will help avoid creation of hotspots in your cluster which might result from poorly chosen IDs, or bad luck. But it means that to retain correctness, every request to actors now require an additional hop to the directory partition first. To address this performance concern, each node will use a local cache to store where each actor is.

I think this is a well-meaning attempt to a problem, but I’m not convinced that it’s a problem that deserves the

  1. additional layer of indirection and
  2. the subsequent problem of performance, and
  3. the subsequent use of local cache and
  4. *5 the problem of cache invalidation that comes with it (which as we know, is one of the two hard problems in CS)

Is it really worth it? Especially when the IDs are guids by default, which hash well. Would it not be better to solve it with a better hashing algorithm?

From my personal experience of working with a number of different Key-Value stores, actor placement has never been an issue that is significant enough to deserve the special treatment Orleans have given it. I’d really like to see results of any empirical study that shows this to be a big enough issue in real-world key-value store usages.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*4 As Sergey mentioned in the comments, you can do a few more things such as using a PreferLocalPlacement strategy to “instruct the runtime to place an activation of a grain of that type local to the first caller (another grain) to it.That is how the app can hint about optimizing placement.”. This appears the same as when you spawn a new process in Erlang. It would require further clarification from Sergey or Gabriel but I’d imagine the placement strategy probably applies at the type level for each type of grain.

The additional layer of abstraction does buy you some more flexibility, and not having to move grains around when topology changes probably simplifies things (I imagine moving the directory information around is cheaper and easier than the grains themselves) from the implementation point-of-view too.

In the Erlang space, RiakCore provides a set of tools to help you build distributed systems and its approach gives you more control over the behaviour of your system. You do however have to implement a few more things yourself, such as how to move data around when cluster topology changes (though the vnode behaviour gives you the basic template for doing this) and how to deal with collisions etc.

*5 Hitting stale cache is not much of a problem in this case, where Orleans would do a new lookup, forward the message to the right destination and update the cache.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

As a side, here’s how Riak does consistent hashing and read/write replications:

Riak NRW

Message Delivery Guarantees

“Orleans provides at-least-once message delivery, by resending messages that were not acknowledged after a configurable timeout. Exactly-once semantics could be added by persisting the identifiers of delivered messages, but we felt that the cost would be prohibitive and most applications do not need it. This can still be implemented at the application level”

The decision to use at-least-once message delivery as default is a contentious one in my view. *6 A slow node will cause messages to be sent twice, and handled twice, which is probably not what you want most of the time.

Whilst the rationale regarding cost is valid, it seems to me that allowing the message delivery to time out and letting the caller handle timeout cases is the more sensible choice here. It’d make the handling of timeouts an explicit decision on the application developer’s part, probably at a per call basis since some calls are more important than others.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*6 The default behaviour is to not resent on timeout, so by default Orleans actually uses at-most-once delivery. But you can configure it to automatically resend upon timeout, i.e. at-least-once.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Not letting it crash

Erlang’s mantra has always been to Let It Crash (except when you shouldn’t). When a process crashes, it can be restarted by its supervisor. Using techniques such as event sourcing it’s easy to return to the previous state just before the crash.

When you except in an Orleans’ Grain, the exception does not crash the Grain itself and is simply reported back to the caller instead. This simplifies things but runs the risk of leaving behind dirty writes (hence corrupting the state) in the wake of an exception. For example:

image

The choice of not crashing the grain in the event of exceptions offers convenience at the cost of breaking the atomicity of operations and personally it’s not a choice that I agree with.

Reentrant Grains

In a discussion on the Actor model with Erik Meijer and Clemens Szyperski, Carl Hewitt (father of the Actor model) said

“Conceptually, messages are processed one at a time, but the implementation can allow for concurrent processing of messages”

In Orleans, grains process messages one at a time normally. To allow concurrent processing of messages, you can mark your grain implementation with the [Reentrant] attribute.

Reentrant grains can be used as an optimization technique to remove bottlenecks in your network of grains.

“One actor is no actor, they come in sys­tems, and they have to have addresses so that one actor can send mes­sages to another actor.”

– Carl Hewitt

However, using reentrant grains means you lose the guarantee that the state is accessed sequentially and opens yourself up to potential race-conditions. You should use reentrant grains with great care and consideration.

In Erlang, concurrent processing of messages is not allowed. But, you don’t have to block your actor whilst it waits for some expensive computation to complete. Instead, you can spawn another actor and ask the child actor to carry on with the expensive work whilst the parent actor processes the next message. This is possible because the overhead and cost of spawning a process in Erlang is very low and the runtime can easily handle ten of thousands of concurrent processes and load balance across the available CPU resources via its schedulers.

If necessary, once the child actor has finished its work it can send a message back to the parent actor, whom can then perform any subsequent computation as required.

Using this simple technique, you acquire the same capability that reentrant grains offers. You can also control which messages can be processed concurrently, rather than the all-or-nothing approach that reentrant grains uses.

Immutable Messages

*7 Messages sent between Grains are usually serialized and then deserialized, this is an expensive overhead when both Grains are running on the same machine. You can optionally mark the messages are immutable so that they won’t be serialized when passed between Grains on the same machine, but this immutability promise is not enforced at all and it’s entirely down to you to apply the due diligence of not mutating the messages.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*7 A clarification here, messages are only serialized and deserialized when they are sent across nodes, messages sent between grains on the same node is deep-copied instead, which is cheaper than serialization. Marking the type as immutable skips the deep-copying process too.

But, it’s still your responsibility to enforce the immutability guarantee.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In Erlang, variables are immutable so there is no need to do anything explicit.

Task Scheduling

“Orleans schedules application turns using cooperative multitasking. That means that once started, an application turn runs to completion, without interruption. The Orleans scheduler uses a small number of compute
threads that it controls, usually equal to the number of CPU cores, to execute all application actor code.”

This is another point of concern for me.

Here we’re exhibiting the same vulnerabilities of an event-loop system (e.g. Node.js, Tornado) where a single poisoned message can cripple your entire system. Even without poisoned messages, you are still left with the problem of not distributing CPU resources evenly across actors, and allowing slow/misbehaving actors to badly impact your latency for other pending requests.

Even having multiple cores and having one thread per core (which is a sane choice) is not going to save you here. All you need is one slow-running actor on each processor-affined execution thread to halt the entire system.

The Erlang’s approach to scheduling makes much more sense – one scheduler per core, and an actor is allowed to execute 2000 reductions (think of one reduction as one function call to do something) before it has to yield the CPU so that another actor can get a slice of the CPU time. The original actor will then wait for its turn to run again.

This CPU-sharing policy is no different to what the OS does with threads, and there’s a good reason for that.

Ease of use

I think this is the big winner for Orleans and the focus of its design goals.

I have to admit, I was pleasantly surprised how easily and quickly I was able to put a service together and have it running locally. Based on what I have seen of the samples and Richard’s Pluralsight course, deploying to the cloud is pretty straight forward too.

Cloud Friendliness

Another win for Orleans here, as it’s designed from the start to deal with cluster topologies that can change dynamically with ephemeral instances. Whereas distributed Erlang, at least distributed OTP, assumes a fixed topology where nodes have well defined roles at start. There are also challenges around getting the built-in distributed database – Mnesia – to work well in a dynamically changing topology.

Conclusion

In many ways, I think Orleans is true to its original design goals of optimizing for developer productivity. But by shielding developers from decisions and considerations that usually comes with building distributed systems, it has also deprived them of the opportunity to build systems that need to be resilient to failures and meet stringent uptime requirements.

But, not every distributed system is critical, and not every distributed system needs five to nine nines uptime. As long as you’re informed about the trade-offs that Orleans have made and what they mean to you as a developer, you can at least make informed choices of if and when to adopt Orleans.

I hope this post will help you make those informed decisions, if I have been misinformed and incorrect about any parts of Orleans working, please do leave a comment below.

Links

F# – Imitating Erlang’s bit syntax for easier binary protocol implementation

Bit Syntax in Erlang

One of the often under-appreciated features of Erlang is its Bit Syntax for parsing and pattern matching binary data at a bit level. For instance, to pare TCP segments you can write something along the line of:

image

image

The same capability can be applied to anything binary protocols, such as video encoding, imaging, or UDP.

 

Imitating with F# Computation Expressions

image

 

Whilst this capability is not built into F#, or any other language that I know of for that matter, we do have a very powerful, and robust feature in F# in Computation Expressions.

With computation expressions, I was able to create a small library that allows you to write and read data to and from a stream at a bit level. With the bitWriter and bitReader workflows you will be able to write and parse TCP headers with code like the following:

 

The library is available via Nuget:

please give it a try, and let me know if you find any bugs here.

 

p.s. there is still much work to be done on the library. For instance, it’s allocating a new buffer array for each workflow rather than using a buffer pool. If you find this library useful and in need for greater performance, please feel free to contribute and help improve the performance of this library.

Getting started with Rebar on Windows

Whilst the official rebar documentation only covers usages on Unix, turns out it was really easy to get going on Windows too, even without resorting to installing and using cygwin.

Here’s some quick steps:

  1. clone the rebar repo
  2. in the root of the repo, run bootstrap.bat (you’ll need Erlang installed for this)
  3. copy the generated rebar and rebar.cmd files to a folder on your PATH, I typically have a C:\tools folder for miscellaneous tools like this

From here on, you can resume the Getting Started guide to create your first rebar project.

 

Enjoy!

Mostly Erlang episode 13

I was recently invited to chat to the Mostly Erlang podcast about F#, with co-host Bryan Hunter, Fred Hebert (author of the excellent Learn You Some Erlang for Great Good!) and fellow F# addict Phil Trelford (author of many popular F# libraries such as TickSpec, Mini Rx and Foq).

It’s always nice to talk about subjects around F# and Erlang, especially with crazy smart people like Bryan, Fred and Phil!

The show is now available on the Mostly Erlang site here, hope you enjoy it!

A simple finite state machine in Erlang and F#

Considering a very simple finite state machine (FSM) such as a code lock which requires you to enter a 4-digit numeric code and upon which will open the lock for a brief few seconds.

imageSuch a FSM have two states: Locked and Open.

Every time a key is entered it checks the sequence of digits inputted so far and if the sequence does not match the secret code then it stays in the Locked state.

If the input sequence matches the length of the secret code then the user is judged to have entered an incorrect sequence and the input sequence is reset.

If, on the other hand, the user has entered the correct code then the FSM will change to the Open state for a number of seconds before reverting back to the Locked state.

This post is intended to show you how you can implement this FSM easily using F#’s agent and Erlang OTP’s gen_fsm behaviour.

 

 

 

F#

First, let’s define the different states that our FSM can be in:

image

and then the messages that our agent can receive:

image

The actual FSM implementation looks like this:

This implementation is pretty straight forward, I’ve used a similar technique to the one Tomas Petricek used in his BlockingQueueAgent implementation and represented the different states using recursive functions.

One caveat with this FSM is the need to revert back to the Locked state after being in the Open state for a certain amount of time. For simplicity sake, I’ve simply specified a timeout (3 seconds) when waiting for messages in the Open state and use the TimeoutException as cue to go back to the Locked state. The obvious downside to my implementation here is that if a GetState message is received it will reset the timer!

You can fix this easily enough in a number of ways, including using a mutable instance variable to keep track of the current state and let the async lambdas modify it during state changes and transitions, then the GetState() method would no longer need to trouble the agent to find out what the current state is.

 

Erlang

Erlang’s OTP framework provide a gen_fsm behaviour which you can use to implement a FSM, which will take care of most of the plumbing and orchestration for you leaving you free to focus on the important things – handling the different ‘events’ that can be triggered during different states.

This Erlang implementation might look much bigger than its F# counterpart but keep in mind that I’ve included plenty of comments here and not been as conservative with write spaces as I was with the F# code. All and all, both versions require roughly the same number of LOC, the main difference being the lack of plumbing in the Erlang version, which if I have to be honest, can be confusing if you don’t have a decent understanding of how the gen_fsm behaviour works!

You can download and play around with the full source code (with corresponding tests for each implementation) from Github here. Enjoy!