A look at Microsoft Orleans through Erlang-tinted glasses

Some time ago, Microsoft announced Orleans, an implementation of the actor model in .Net which is designed for the cloud environment where instances are ephemeral.

We’re currently working on a number of projects in Erlang and have run into some assumptions in distributed Erlang which doesn’t hold true in a cloud-hosted environment where nodes are ephemeral and entire topologies are constantly in flux. Also, as most of our backend code for Gamesys Social is in .Net, being able to work with languages that we’re already familiar with is a win for the team (more people being able to work on the codebase, for instance).

As such I have been taking an interest in Orleans to see if it represents a good fit, and whether or not it holds up to some of its lofty promises around scalability, performance and reliability. Below is an account of my personal views having read the paper, downloaded the SDK and looked through the samples and followed through Richard Astbury’s Pluralsight course.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Update 2014/12/08:

Since I posted this the other day, there has been some great feedback from the Orleans team and clarified several places where I have previously misunderstood based on the information I had at the time of writing. Some of my other concerns still remain, but at least two of the biggest sticking points – single point of failure and at-least-once message delivery – has been disproved.

As such, I’ll updated this post in several place to incorporate the new information that the Orleans team have provided via the comments.

I’ve left what was previously written untouched, but look out for the impacted sections (* followed by a paragraph that is underlined) throughout the post to see the relevant new information. In this callout sections I have focused on the correct behaviour that you should expected based on corrections from Sergey and Gabriel, if you’re interested in the background and rationale behind these decisions, please read their comments in full.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

TL;DR

When I first read about Orleans, I was hesitant because of the use of code-gen (reminiscent of WCF there), and that the underlying message passing mechanism is hidden from you so you end up with a RPC mechanism (again, reminiscent of WCF…).

However, after spending some time with Orleans, I can definitely see its appeal – convenience and productivity. I was able to get something up and running quickly and with ease. My original concerns about code-gen and RPC didn’t get in the way of me getting stuff done.

As I dig deeper into how Orleans works though, a number of more worrying concerns surfaced regarding some of its core design decisions.

For starters, *1 it’s not partition tolerant towards partitions to the data store used for its Silo management. Should the data store be partitioned or suffer an outage, it’ll result in a full outage of your service. These are not traits of a masterless and partition tolerant system that is desirable when you have strict uptime requirements.

When everything is working, Orleans guarantees that there is only one instance of a virtual actor running in the system, but when a node is lost the cluster’s knowledge of nodes will diverge and during this time the single-activation guarantees becomes eventually consistent. However, you can provide stronger guarantees yourself (see Silo Management section below).

*2 Orleans uses at-least-once message delivery, which means it’s possible for the same message to be sent twice when the receiving node is under load or simply fails to acknowledge the first message in a timely fashion. This again, is something that you can mitigate yourself (see Message Delivery Guarantees section below).

Finally, its task scheduling mechanism appears to be identical to that of a naive event loop and exhibits all the fallacies of an event loop (see Task Scheduling section below).

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*1 As Gabriel and Sergey both explained in the comments, the membership management works quite a bit different to what I first thought. Once connected, all heartbeats are sent between pairs of nodes using a linear algorithm, and the backend data store is only used for reaching agreements on what nodes are dead and to disseminate the new membership view to all other nodes.

In this case, losing connection to the backend data store would not impact existing, connected clusters. making it partition tolerant. If the backend data store becomes unavailable and at the same time as your cluster topology is changing then it will hinder updates to the membership and stop new nodes from being able to join the cluster.

Hopefully the implementation details of the membership management will be discussed in more detail in Orleans team’s future posts. Also, since Orleans will be open sourced in early 2015, we’ll be able to get a closer look at exactly how this behaves when the source code is available.

*2 Gabriel pointed out that by default Orleans does not resend messages that have timed out. So by default, it uses at-most-once delivery, but can be configured to automatically resend upon timeout if you want at-least-once delivery instead.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Terminology

In Orleans, a Grain represents an actor, and each node has a Silo which manages the lifetime of the grains running inside the Silo. A Grain is activated when it receives a request, and can be deactivated after it becomes idle for a while. The Silo will also remove the deactivate Grains from memory to free up resources.

Orleans’ Grains are referred to as virtual actors. They are virtual because the state of a Grain can be persisted to a storage service, and then reinstated as the Grain is reactivated (after being deactivated due to idleness) on another node. This is a nice abstraction from a developer’s point of view, and to enable this level of indirection, Orleans introduces the mechanism of state providers.

Storage Providers

To use storage providers, you first need to define an interface that represents the state of your Grain, and have it inherit from Orleans’ IGrainState interface. For example:

image

Then in your Grain implementation class, you provide this ITournamentGrainState interface as generic type parameter to the Orleans.Grains base class, as below. You also need to specify the storage provider you want to use via the [StorageProvider] attribute. The ProviderName specified here points to a storage provider you define in the configuration file for Orleans.

image

When a Grain is activated in a Silo, the Orleans runtime will go and fetch the state of the Grain for us and put it in an instance member called State. For instance, when the ActivateAsync method is called, the state of our TournamentGrain would have been populated from the DynamoDBStorage provider we created:

image

You can modify the state by modifying its members, but the changes will not be persisted to the backend storage service until you call the State.WriteStateAsync method. Un-persisted changes will be lost when the Grain is deactivated by the Silo, or if the node itself is lost.

image

Finally, there are a number of built-in storage providers, such as the Azure Table Storage, but it’s trivial to implement your own. To implement a custom storage provider, you just need to implement the IStorageProvider interface.

Storage providers make it very easy to create actors that can be easily resumed after deactivation, but you need to be mindful of a number of things:

  • how often you persist state is a trade-off between durability and performance + cost
  • if multiple parts of the state need to be modified in one call, you need to have a rollback strategy in place in case of exceptions or risk leaving dirty writes in your state (see Not letting it crash section below)
  • you need to handle the case when persistence fails – since you’ve mutated the in-memory state, if persistence failed do you rollback or continue and hope that you get another chance at persisting the state before the actor is deactivated through idleness or the node crashing

In Erlang, there are no built-in mechanism for storage providers, but there is also nothing stopping you from implementing this yourself. Have a look at Bryan Hunter’s CQRS with Erlang talk at NDC Oslo 2014 for inspiration.

Silo Membership

Silos use a backend store to manage Silo memberships, this uses Azure Table Storage by default. From the MSR paper, this is what it has to say about Silo memberships:

“Servers automatically detect failures via periodic heartbeats and reach an agreement on the membership view. For a short period of time after a failure, membership views on different servers may diverge, but it is guaranteed that eventually all servers will learn about the failed server and have identical membership views….if a server was declared dead by the membership service, it will shut itself down even if the failure was just a temporary network issue.

Furthermore, on the guarantee that an actor (or Grain with a specific ID) is only activated on one node:

“In failure-free times, Orleans guarantees that an actor only has a single activation. However, when failures occur, this is only guaranteed eventually.

Membership is in flux after a server has failed but before its failure has been communicated to all survivors. During this period, a register-activation request may be misrouted if the sender has a stale membership view….However, it may be that two activations of the same actor are registered in two different directory partitions, resulting in two activations of a single-activation actor. In this case, once the membership has settled, one of the activations is dropped from the directory and a message is sent to its server to deactivate it.”

There are couple of things to note about Silo membership management from the above:

  • *3 the way servers detonate when they lose connectivity to the storage service means it’s not partition-tolerant because if the storage service is partitioned from the cluster, even for a relatively short amount of time, then there’s a chance for every node that are running Silos to self detonate;
  • *3 there is a single point of failure at the storage service used to track Silo memberships, any outage to the storage service results in outage to your Orleans service too (this happened to Halo 4);
  • it offers strong consistency during the good times, but fails back to eventual consistency during failures;
  • whilst it’s not mentioned, but I speculate that depending on the size of cluster and the time to converge on Silo membership views across the cluster, it’s possible to have more than two activations of the same Grain in the cluster;
  • the conflict resolution approach above suggests that one activation is chosen at random and the rest are discarded, this seems rather naive and means losing intermediate changes recorded on the discard Grain activations, and
  • since each activation can be persisting its state independently so it’s possible for the surviving Grain activation’s internal state to be out-of-sync with what had been persisted;
  • these failure times can happen a lot more often than you think, nodes can be lost due to failure, but also as a result of planned/automatic scaling down events throughout the day as traffic patterns change (Orleans is designed for the cloud and all its elastic scalability after all).

During failures, it should be possible to provide stronger guarantee on single activation using optimistic concurrency around the Grain’s state. For instance,

1. node A failed, now the cluster’s view of Silos have diverged

2a. Grain receives a request on node B, and is activated with state v1

2b. Grain receives a request on node C, and is activated with state v1

3a. Grain on node B finishes processing request, and succeeds in saving state v2

3b. Grain on node C finishes processing request, but fails to save state v2 (optimistic concurrency at work here)

4. Grain on node C fails the request and triggers deactivation

5. Cluster only has one activation of the Grain on node B

Enforcing stronger single activation guarantee in this fashion should also remove the need for better conflict resolution. For this approach to work, you will need to be able to detect persistence failures due to optimistic concurrency. In DynamoDB, this can be identified by a conditional check error.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*3 Again, as per Gabriel and Sergey’s comments below, this is not true and there’s no single point of failure in this case. See *1 above, or read the comments for more detail.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Distributed Erlang employs a different approach for forming clusters. Nodes form a mesh network where every node is connected to every other node. They use a gossip protocol to inform each other when nodes join or leave the cluster.

image

This approach has a scalability limitation and doesn’t scale well to thousands, or even hundreds of nodes depending on the capabilities of each node. This is due to the overhead in forming and maintaining the cluster increases quadratically to the number of nodes. The effect is particularly evident if you require frequent inter-node communications, or need to use functions in the global built-in module.

In this particular space, SD (scalable distributed) Erlang is attempting to address this shortcoming by allowing you to create groups of sub-clusters amongst your nodes so that the size of the mesh network is limited to the size of the sub-clusters.

Random Actor Placement

Another interesting choice Orleans made is that, instead of using consistent hashing for actor placement (a technique that has been successfully used in a number of Key-Value Stores such as CouchBase and Riak), Orleans introduces another layer of indirection here by introducing the Orleans directory.

“The Orleans directory is implemented as a one-hop distributed hash table (DHT). Each server in the cluster holds a partition of the directory, and actors are assigned to the partitions using consistent hashing. Each record in the directory maps an actor id to the location( s) of its activations.”

image

The rationale for this decision is that it allows *4 random placement of actors will help avoid creation of hotspots in your cluster which might result from poorly chosen IDs, or bad luck. But it means that to retain correctness, every request to actors now require an additional hop to the directory partition first. To address this performance concern, each node will use a local cache to store where each actor is.

I think this is a well-meaning attempt to a problem, but I’m not convinced that it’s a problem that deserves the

  1. additional layer of indirection and
  2. the subsequent problem of performance, and
  3. the subsequent use of local cache and
  4. *5 the problem of cache invalidation that comes with it (which as we know, is one of the two hard problems in CS)

Is it really worth it? Especially when the IDs are guids by default, which hash well. Would it not be better to solve it with a better hashing algorithm?

From my personal experience of working with a number of different Key-Value stores, actor placement has never been an issue that is significant enough to deserve the special treatment Orleans have given it. I’d really like to see results of any empirical study that shows this to be a big enough issue in real-world key-value store usages.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*4 As Sergey mentioned in the comments, you can do a few more things such as using a PreferLocalPlacement strategy to “instruct the runtime to place an activation of a grain of that type local to the first caller (another grain) to it.That is how the app can hint about optimizing placement.”. This appears the same as when you spawn a new process in Erlang. It would require further clarification from Sergey or Gabriel but I’d imagine the placement strategy probably applies at the type level for each type of grain.

The additional layer of abstraction does buy you some more flexibility, and not having to move grains around when topology changes probably simplifies things (I imagine moving the directory information around is cheaper and easier than the grains themselves) from the implementation point-of-view too.

In the Erlang space, RiakCore provides a set of tools to help you build distributed systems and its approach gives you more control over the behaviour of your system. You do however have to implement a few more things yourself, such as how to move data around when cluster topology changes (though the vnode behaviour gives you the basic template for doing this) and how to deal with collisions etc.

*5 Hitting stale cache is not much of a problem in this case, where Orleans would do a new lookup, forward the message to the right destination and update the cache.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

As a side, here’s how Riak does consistent hashing and read/write replications:

Riak NRW

Message Delivery Guarantees

“Orleans provides at-least-once message delivery, by resending messages that were not acknowledged after a configurable timeout. Exactly-once semantics could be added by persisting the identifiers of delivered messages, but we felt that the cost would be prohibitive and most applications do not need it. This can still be implemented at the application level”

The decision to use at-least-once message delivery as default is a contentious one in my view. *6 A slow node will cause messages to be sent twice, and handled twice, which is probably not what you want most of the time.

Whilst the rationale regarding cost is valid, it seems to me that allowing the message delivery to time out and letting the caller handle timeout cases is the more sensible choice here. It’d make the handling of timeouts an explicit decision on the application developer’s part, probably at a per call basis since some calls are more important than others.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*6 The default behaviour is to not resent on timeout, so by default Orleans actually uses at-most-once delivery. But you can configure it to automatically resend upon timeout, i.e. at-least-once.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Not letting it crash

Erlang’s mantra has always been to Let It Crash (except when you shouldn’t). When a process crashes, it can be restarted by its supervisor. Using techniques such as event sourcing it’s easy to return to the previous state just before the crash.

When you except in an Orleans’ Grain, the exception does not crash the Grain itself and is simply reported back to the caller instead. This simplifies things but runs the risk of leaving behind dirty writes (hence corrupting the state) in the wake of an exception. For example:

image

The choice of not crashing the grain in the event of exceptions offers convenience at the cost of breaking the atomicity of operations and personally it’s not a choice that I agree with.

Reentrant Grains

In a discussion on the Actor model with Erik Meijer and Clemens Szyperski, Carl Hewitt (father of the Actor model) said

“Conceptually, messages are processed one at a time, but the implementation can allow for concurrent processing of messages”

In Orleans, grains process messages one at a time normally. To allow concurrent processing of messages, you can mark your grain implementation with the [Reentrant] attribute.

Reentrant grains can be used as an optimization technique to remove bottlenecks in your network of grains.

“One actor is no actor, they come in sys­tems, and they have to have addresses so that one actor can send mes­sages to another actor.”

– Carl Hewitt

However, using reentrant grains means you lose the guarantee that the state is accessed sequentially and opens yourself up to potential race-conditions. You should use reentrant grains with great care and consideration.

In Erlang, concurrent processing of messages is not allowed. But, you don’t have to block your actor whilst it waits for some expensive computation to complete. Instead, you can spawn another actor and ask the child actor to carry on with the expensive work whilst the parent actor processes the next message. This is possible because the overhead and cost of spawning a process in Erlang is very low and the runtime can easily handle ten of thousands of concurrent processes and load balance across the available CPU resources via its schedulers.

If necessary, once the child actor has finished its work it can send a message back to the parent actor, whom can then perform any subsequent computation as required.

Using this simple technique, you acquire the same capability that reentrant grains offers. You can also control which messages can be processed concurrently, rather than the all-or-nothing approach that reentrant grains uses.

Immutable Messages

*7 Messages sent between Grains are usually serialized and then deserialized, this is an expensive overhead when both Grains are running on the same machine. You can optionally mark the messages are immutable so that they won’t be serialized when passed between Grains on the same machine, but this immutability promise is not enforced at all and it’s entirely down to you to apply the due diligence of not mutating the messages.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*7 A clarification here, messages are only serialized and deserialized when they are sent across nodes, messages sent between grains on the same node is deep-copied instead, which is cheaper than serialization. Marking the type as immutable skips the deep-copying process too.

But, it’s still your responsibility to enforce the immutability guarantee.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In Erlang, variables are immutable so there is no need to do anything explicit.

Task Scheduling

“Orleans schedules application turns using cooperative multitasking. That means that once started, an application turn runs to completion, without interruption. The Orleans scheduler uses a small number of compute
threads that it controls, usually equal to the number of CPU cores, to execute all application actor code.”

This is another point of concern for me.

Here we’re exhibiting the same vulnerabilities of an event-loop system (e.g. Node.js, Tornado) where a single poisoned message can cripple your entire system. Even without poisoned messages, you are still left with the problem of not distributing CPU resources evenly across actors, and allowing slow/misbehaving actors to badly impact your latency for other pending requests.

Even having multiple cores and having one thread per core (which is a sane choice) is not going to save you here. All you need is one slow-running actor on each processor-affined execution thread to halt the entire system.

The Erlang’s approach to scheduling makes much more sense – one scheduler per core, and an actor is allowed to execute 2000 reductions (think of one reduction as one function call to do something) before it has to yield the CPU so that another actor can get a slice of the CPU time. The original actor will then wait for its turn to run again.

This CPU-sharing policy is no different to what the OS does with threads, and there’s a good reason for that.

Ease of use

I think this is the big winner for Orleans and the focus of its design goals.

I have to admit, I was pleasantly surprised how easily and quickly I was able to put a service together and have it running locally. Based on what I have seen of the samples and Richard’s Pluralsight course, deploying to the cloud is pretty straight forward too.

Cloud Friendliness

Another win for Orleans here, as it’s designed from the start to deal with cluster topologies that can change dynamically with ephemeral instances. Whereas distributed Erlang, at least distributed OTP, assumes a fixed topology where nodes have well defined roles at start. There are also challenges around getting the built-in distributed database – Mnesia – to work well in a dynamically changing topology.

Conclusion

In many ways, I think Orleans is true to its original design goals of optimizing for developer productivity. But by shielding developers from decisions and considerations that usually comes with building distributed systems, it has also deprived them of the opportunity to build systems that need to be resilient to failures and meet stringent uptime requirements.

But, not every distributed system is critical, and not every distributed system needs five to nine nines uptime. As long as you’re informed about the trade-offs that Orleans have made and what they mean to you as a developer, you can at least make informed choices of if and when to adopt Orleans.

I hope this post will help you make those informed decisions, if I have been misinformed and incorrect about any parts of Orleans working, please do leave a comment below.

Links

25 thoughts on “A look at Microsoft Orleans through Erlang-tinted glasses”

  1. First of all, thanks for a great in-depth analysis! Very good insights!

    Now let me address some misconceptions that crept in.

    1. Membership and storage outage.
    You wrote: “For starters, it’s not partition tolerant towards partitions to the data store used for its Silo management. Should the data store be partitioned or suffer an outage, it’ll result in a full outage of your service.”

    This is simply incorrect. When cluster is stable (no nodes joining or leaving it) membeship is completely insensitive to storage unavailability. It will log warnings but continue operating based on the current cluster membership information. It’s only when a new node is trying to join the cluster or a failure of a node is detected, unavailability of storage will hinder updates to membership. Since cluster changes are very infrequent and Azure Table Store (our default membership storage) is highly available, the probability of a storage outage happening at the exact time of a change in the cluster configuration is extremely low.

  2. Hence, the derived claims are also incorrect:

    “the way servers det­o­nate when they lose con­nec­tiv­ity to the stor­age ser­vice means it’s not partition-tolerant because if the stor­age ser­vice is par­ti­tioned from the clus­ter, even for a rel­a­tively short amount of time, then there’s a chance for every node that are run­ning Silos to self detonate;

    there is a sin­gle point of fail­ure at the stor­age ser­vice used to track Silo mem­ber­ships, any out­age to the stor­age ser­vice results in out­age to your Orleans ser­vice too (this hap­pened to Halo 4); ”
    No, this did not happen to Halo 4. Simply couldn’t, by design.

  3. 2. Actor Placement

    There are several points here.

    a) The additional level of indirection has multiple
    benefits. Unlike with hash-based partitioning, no repartitioning is required
    when the cluster changes. It also gives the Orleans runtime flexibility to
    place grains to certain nodes optimizing for locality with other grain and
    local resources. It also allows for dynamic migration of grains for locality
    and performance. We recently got very promising results from testing dynamic
    grain migration based on communication patterns.

    b) Random is just the default we chose because it demonstrated
    very good distribution for multiple workloads we tested. However, placement
    policies are pluggable in the runtime, different strategies can be added and
    applied to different grain types.

    c) There’s no cache invalidation problem, because the
    directory cache is just a performance optimization. If a message is routed
    based on a stale cache record, which is pretty rare, a new lookup is triggered
    and the message is forwarded to the right destination, and the cache gets
    updated.

  4. 3. Message serialization and immutability

    You wrote: “Mes­sages sent between Grains are usu­ally seri­al­ized and then dese­ri­al­ized, this is an expen­sive over­head when both Grains are run­ning on the same machine.”
    Actually, messages between grains on the same node are not serialized, they are deep-copied, which is usually much cheaper than serialization.

  5. Maybe we should schedule a conf call to discuss the details. :-) Thanks again for the great post.

  6. Does the deep copying happen automatically? Based on Richard’s Pluralsight course (chapter 4 ‘Introducing Immutable Messages’) he mentioned that you have to mark the message type as immutable for this to happen, and that it’s down to you to make sure that the message is not mutated, hence the second the part of the paragraph:

    “You can option­ally mark the mes­sages are immutable so that they won’t be seri­al­ized when passed between Grains on the same machine, but this immutabil­ity promise is not enforced at all and it’s entirely down to you to apply the due dili­gence of not mutat­ing the messages.”

  7. Sure, that would be useful, let me digest your comments first and if you have some more information (blog posts, papers, etc.) on how the Silo management works you could point me to, that’d be great.

  8. Hey, I have a couple of follow questions regarding a).

    – When a new node joins/leaves the cluster, doesn’t the directory information need to be repartitioned?
    – Could you give me some examples of local resources that can be optimized this way? I understand the benefit of optimizing for locality with other grains, but since the state of grains are stored remotely and that instances are ephemeral, I can’t think of other local resources that I’d optimize for off the top of my head.
    – To optimize for locality with other grains, how does Orleans find these communication patterns (cluster of grains that can benefit from greater locality) at the time of placement and optimize for them? It doesn’t seem like something Orleans would know ahead of time, and I didn’t see examples of how to provide that information to Orleans.

  9. By default, deep-copying of call arguments happens always, as part of the process of invocation of a method on the grain reference, so that they cannot be mutated before the message is sent. For remote messages, deep-copying is later followed by serialization. For local messages, no serialization is done, only deep-copying. Immutable tells the runtime that deep-copying can be skipped, for both remote and local case. Such a promise is indeed not enforced in any way, and is merely a convention between the app and the Orleans runtime. Until .NET adds a built-in support for immutability, that’s all we can do, unfortunately.

  10. You are correct, when the cluster state changes, the directory gets repartitioned, and parts of its data gets moved between nodes. However, grains don’t need to move.
    Examples of local resources we’ve heard of are large file system caches or databases that are expensive to recreate. E.g. grain state may hold metadata pointing to large blobs that reside in a remote blob store but can be cached on cluster nodes for performance.
    A trivial example of locality optimization is the PreferLocalPlacement placement strategy that instructs the runtime to place an activation of a grain of that type local to the first caller (another grain) to it. This is how the app can hint about optimizing placement.
    The testing we’ve done last summer is where the runtime learns which remote grains talk much to each other, and migrates one of then to the silo of the other, so that they communicate via local messages from that point on. This approach doesn’t require any prior knowledge of the communication patterns.

  11. Thank you for your analysis.

    I would like to clarify a couple of additional points that were not mentioned already in Sergey’s response. I would also first like to acknowledge that some of the confusion and misunderstanding came due to the insufficient explanation on our part. Most of the description of the internal runtime mechanism are in the technical report and at the time of its writing we were limited in space. Thus we had to be brief and omitted a lot of details. The main goal of the technical report was to describe the virtual actor abstraction and its benefits, not to cover all runtime implementation details. But now is the time for us to provider more details, and we will.

    1) Membership: We are going to write a more detailed post just on the membership. Here I will just briefly mention a couple of key features:
    – All I am alive heartbeats are sent directly between silos via TCP.
    – Storage (Azure table or SQL server) are only used to help achieve an agreement (for nodes on actually agree on whom to kill) and also disseminate the new membership view to all other nodes.
    – As a result, as Sergey mentioned, disconnection from the storage has zero impact on running healthy nodes. It will only impact an ability to declare a really failed node as dead or join new nodes.
    – The algorithm is linear and thus is more scalable then a lot of other, more traditional (Group Communication for example) style algorithms.
    – The implementation can be parameterized to tune failure detection. This allows the system administrator to express exact takeoff in failure detection accuracy vs. completeness (how much lost heartbeats qualify for a suspicion, how many suspicions qualify for voting dead, how frequently to send heartbeats…)

    2) Messaging guarantees: I think this is the most confusing point in our technical report. We did not explain
    it right and I personally apologize for that. What we really have is the following:
    – Every message in Orleans has automatic timeout (the exact timeout can be configured). If the reply does not arrive on time the message promise is broken with timeout exception.
    – Orleans can be configured to do automatic retries upon timeout. By default we do NOT do automatic retries.
    – Application of course can also pick to do retries upon timeouts.
    – If the Orleans system is configured not to do automatic retries (default setting) and application is not resending – Orleans provides at most once message delivery. A message will either be delivered once or not at all. It will never be delivered twice.
    – In the system with retries (either by the runtime or by the application) the message may arrive multiple times. We do nothing currently to persistently store which messages already arrived and suppress second delivery (we believe this would be pretty costly). So in the system with retries we do NOT guarantee
    at most once.
    – If you keep retrying potentially indefinitely, the message will eventually arrive, thus it is at least once messaging guarantee. Please notice that this “will eventually arrive” is something that the runtime needs to make sure. It does not come for free just by itself. Orleans provides that since grain never goes into any permanent failure state and we will for sure eventually re-active a failed grain on another silo. In some other systems this may not be the case. Same with overload, we have a lot of protection mechanisms in place that ensure that even under overload any particular grain is not permanently denied service.

    So to summarize: in the system without retries we guarantee at most once message delivery. In the system with infinite retries we guarantee at least once (and do NOT guarantee at most once).

    3) Placement: as Sergey already pointed out, the main reason to go with indirection via distributed directory was to enable placement flexibility and also avoid the need to move grains upon membership changes. We actually believe this is a critical point in our design.

    4) I agree with your point about letting it crash (“The choice of not crash­ing the grain in the event of excep­tions offers con­ve­nience at the cost of break­ing the atom­ic­ity of oper­a­tions”). We currently don’t do that, but we are thinking about potential ways to enable atomicity that will still be consistent with other parts of our model.

    5) Reentrant grains – “In Erlang … you can con­trol which mes­sages can be processed con­cur­rently, rather than the all-or-nothing approach that reen­trant grains uses.” This is actuality something that is very easy to achieve in Orleans. We have a simple Task based wrapper class that your have to use in your grain code and allows to control which messages go in sequentially and which interleave. We also used to support more fine-grained attributes on per method that allowed to specify which methods can interleave and which not. We currently disabled it, since we felt this extra flexibility was mostly confusing. Instead, we opted for binary- reentrant or not – model. But this can be changed.

    Lastly, I would like to make one important (in my opinion) comment. Most of the points you have raised are specific tradeoff w.r.t. to performance or consistency/availability. They are NOT an inherent part of our programming model. They were made to better meet the requirements of our first customers. For example, we preferred availability vs. consistency in the eventually single activation, because that is the setting that better fitted our customers at that point. But we are not married to that choice. In fact, we have ideas on how to make the single activation semantics strong. We may even allow in the future to pick among the
    two (I am not making any promises here, but saying this can be done and makes certain sense).

    Contrast those runtime decisions with the programming model, where we feel much stronger about the choices we made. For example, the virtual actor abstraction is key to our system – we believe that not only you should not know where your actor is, but also not if it is currently active or not. And application should not manage the life cycle of actors. Same with singe threaded execution within the grain, or asynchrony everywhere.

    Gabi Kliot

  12. Thanks for this great analysis.

    Is there any chance you could update the post to reflect the info from the comments? It would be very helpful to have that info inline and in an as easily digestible format as the post.

  13. Hi Gabi,

    Thanks a lot for the detail on the membership management, that’s exactly what I was missing from the whitepaper, and thanks for the clarification on the messaging guarantees as well.

    On your last point, I do agree that the programming model is important, but so too are those trade-offs with regards to consistency and availability and we need to understand those trade-offs and their implications in order to make an informed decision on whether Orleans is the right fit for our projects.

    As for the choice of availability over consistency, I don’t see any problem with that, I’m no stranger to eventually consistent systems (the fact that it’s eventually consistent only during failures is better than some already). But I do still need to understand how the eventually single activation will impact us, under what circumstances, and how I might be able to provide stronger consistency (where it’s needed) at the application level.

    I understand your reasoning for your choices, and although you don’t feel married to those choices please keep in mind that if we were to adopt Orleans in our project we would however be married to your choice. Which is why fully understanding the trade-offs and failure modes are important to us.

    For what it’s worth, now that you and Sergey has clarified on how the membership management and message delivery works, it has eased most of my strongest concerns.

  14. We are going to open-source Orleans in early 2015. So you’ll have much better flexibility with choosing tradeoffs that fit your business goals. Obviously, these would be great candidates for contributing back to the codebase.

  15. Took a bit longer than I’d thought, but I’ve added summaries of the information from the comments side-by-side with the original writing.

  16. Pingback: Etcd 2.0 ? CockroachDB — Episode 0026 « DevZen Podcast

  17. Concerning cooperative task scheduling. I’m not that familiar with Orleans, but I get the impression that they are interested in parallel performance and for getting good parallel performance, cooperative scheduling is superior to preemptive scheduling. With cooperative scheduling, you may be able to get by with essentially O(c) space while with preemptive scheduling you might end up using O(n) space, where c is the number of cores and n is the number of tasks and typically n is multiple orders of magnitude higher than c.

    It is also worth pointing out that building on top of .Net, it is not really possible to make a proper preemptive scheduler for user space threads. F# asynchronous workflows use a similar strategy as Erlang in that F# async workflows allow a specific number of async ops to be executed per async thread. However, that only works as long as each async workflow cooperates by actually performing async ops. A workflow such as

    let bad = async {
    do while true do ()
    }

    does not cooperate and the F# async mechanism has no way to preempt it.

  18. yup, both Orleans and Akka.net are applying lessons from Erlang in the CLR space but there are gaps that you can’t bridge due to difference between the CLR and the Erlang VM – pre-emptive scheduling for instance.

Leave a Comment

Your email address will not be published. Required fields are marked *