In a previous post I discussed using thread partitioning to implement actor based concurrency in Java. A hash function is used to map actors across partitions however as with any hash function there is a possibility that the actors will not be distributed evenly.
In a hash-table this can be alleviated by resizing the table or implementing a different hash function. In a thread partitioning scheme it is harder to resize the thread pool since it is most likely optimised for throughput and latency; so one needs to look instead at the hashing function.
The issue is actually compounded if the amount of work required for a item changes over time. In the example of an auction item, an item can well receive increasingly more bids as the time for the auction to close approaches.
Therefore a fixed allocation to a partition may not be appropriate, one may need to move items between partitions in order to balance the load across all partitions.
Before I look at a dynamic hashing algorithm applicable to this scenario, there are some important issues to consider regarding load balancing partitions.
The main benefit of thread partitioning is to ensure that there is no contention on the actors' data, and hence no need to lock, since all work is performed on a single thread. Therefore the main issue to consider when moving an actor from one partition to another is that one is moving that actor and its data from one thread to another. Therefore one has to ensure that there is no time when the original thread is still acting on the data at the same time as the new thread begins to work on the data; and that the state is correctly published to the new thread.
The basic approach is that if one needs to move actors between two partitions, where one needs to preserve the ordering of incoming messages is:
- stop the queues for both partitions from accepting new messages,
- drain the queues,
- move the actors to the new partitions and ensure publishing of their state,
- implement the new hashing function that reflects the new partitioning,
- allow new queues accept messages again (using the new hashing function).
This is not a trivial operation and like garbage collection can result in a pause, therefore one should only perform this load balancing when one knows that the overhead is acceptable.
If the ordering of the messages is not actually important, then one doesn't need to halt and drain the queues, one can simply apply the new hashing function and when a partition sees a message that is not for its own partition, it can instead re-queue it on the appropriate queue.
Now, back to dynamic hashing functions. A simple approach is to simply manage a look up table of actor to partition. However, this can be resource heavy if one has millions to actors to keep a one to one mapping for; and more importantly the effort and resources required to monitor load at such as fine grained level is likely to be prohibitive.
Instead the approach I implemented was a compromise between a simple modulo hash function and a mapping table. Instead of calculating the modulo directly to the number of threads (T), the modulo function mapped down to N fine grained partitions; where N is one or two orders of magnitude greater than T. Then a mapping table is used to assign each fine grained partition to a thread.
The value N is chosen such that the distribution of actors is fairly smooth - much as one does for determining the number of buckets in a hash table.
Load is monitored at the fine grained partition level and if load balancing is required fine grained partitions are moved between threads and the smaller mapping table is updated.
This approach provides a reasonable balance between the inflexible nature of a simple modulo function and the full flexibility of a one-to-one mapping table.