Ivan Voroshilin’s Blog.

Algorithmic contests, distributed systems and software architecture

Concurrent Work Scheduling in Java 8: ForkJoinPool, Dispatching and Caching - All in 1 Bottle

| Comments

Been too long since I last blogged…

Since today, I am changing the format of blogging due to these 2 reasons:

  1. The blog has moved to the new platform.

  2. I want to make it more lively and plausible, that is not just writing a problem statement with solutions, but describing chalenges, and the way I came up with smth. So, the text is going to be more hilarious, and easy to follow.. Will see.. ;-)

Ok, let’s catch up.. I’ve been wrestling with concurrent algorithms and scalability lately. E.g. last month, I was researching design of Akka Dispatchers, advanced caching in java (in particular, a java 8 rewrite of Guava’s ConcurrentLinkedHashMap - «Caffeine», kudos to Ben Maine for his constant help, he designed both of them btw), did some microbenchmarks.

TL;DR

My recent goal was to implement a scalable dispatcher for better throughput in a highly concurrent environment. A dispatcher should be able to schedule asychronously millions of short-living Runnable tasks, coming in from many threads simultaniously. Related tasks should get in the same FIFO submission queue (very much like Actors in Scala and their mailboxes). FIFO property, thereby should ensure that queues’ tasks get executed one-after-another, sequentially, from the same queue. Unrelated task are executed asynchronously.

This post came about as a small research in dispatching in tandem with caching and performance trade-offs. We’re going to touch on some caching techniques in a hashmap<dispatchId, Runnable task>, in particular WeakReference-values (I’m going to explain below the reason WeakRerefence on values, odd?), ForkJoinPool magic, ending up with comparison of performance benchmarks.

Intro

As been said, there’s a Runnable task that needs to be completed. Each such task has a dispatchId.

I originally started exploring pinned to a thread task-dispatching, but then struggled with an overall performance bottleneck.. The first attempt was a Hashing Dispatcher. That is, there’s an array of threads. A hash(dispatchId of task) mod threadsNumber determines a corresponding Thread index in that array, responsible for its execution. Each such thread owns a ConcurrentLinkedQueue as a FIFO-queue, where its gets task and executes them. See the picture for clarity.

As turned out, the algorithm heavily degrades as a number of tasks increases due to the fact that some threads might be idle whereas others are busy. The only advantage might be aimed at low latency, rather than overall performance. If we set up a CPU-affinity (no context switches, locality of CPU-caches is good), provided that tasks are equal in size and more or less uniformly spreaded among thread-buckets, we might benefit from it. But for most application it is not the case. Thus we need another solution.

Unbalanced work

What if tasks differ in their execution-time? Some threads can be busy, while others are free. This leads, as per the above model, to the stall of some threads which is very inefficient, causing unbalanced execution and performance degradation. Even though, if tasks were equal in execution time, that would be a lot less scalable.

Redesign

Some dispatchId queues may be more active than the others and unfairly balanced among workers. Thus, we need to somehow decouple a queue from its corresponding thread, but maintain the FIFO order for the same dispatchId. By separating the queue from the worker-thread, we retain FIFO property and more evenly spread out the work - a better throughput!

Let’s apply a chained CompletableFuture as a FIFO-queuing mechanism kept in a value of ConcurrentMap<dispatchId, CompletableFuture>. CompletableFuture object can have a reference to the next future for completion, thereby holding a FIFO property. This is like a linked list that forms an execution pipeline.

So, all the work happens in a ConcurrentHashMap which is a cache of tasks for completion. A bit later, I’ll show how to integrate this cache with a Dispatcher responsible for scheduling and running tasks from it. For now let’s see, how we can efficiently manage eviction of cache’s entries.

Prunning the cache

In some usecases, each Runnable task has a unique dispatchId (a global counter might be a good example). Adding a new task to a dispatcher for execution generates a corresponding key for the cache (yes, it is a dispatchId).

If a dispatcher schedules a huge number of incoming tasks, we need to prune the cache (our ConcurrentMap), to avoid OutOfMemory, as there are already completed tasks and their entries can be evicted from it.

WeakReference values can automatically evict CompletabeFuture, based on the observation that the execution chain provides the strong reference and completed futures become a garbage. Why value, but not a key? The key isn’t appropriate because it doesn’t tells us when its chained CompletedFuture is done. Weak-reference values do! The value has a strong reference through the executor chaining down to the last enqueued future. When the last future completes and is idle, it becomes eligible for garbage collection and the map may evict the entry.

I derived the idea from Guava of prunning the map in a separate thread, right after a cache capacity was reached. This is done with an exclusive tryLock, so that we don’t want to block progress of other threads – neat! A thread not being able to capture the lock immediately jumps over to do another work. This reduces the total cost of cache maintenance during dispatching.

On the cost of Weak-References

Object allocation is very cheap ~10 cycles compared to ~30 cycles for malloc on most of modern hardware and reclaiming short-lived objects is very cheap.

A lifespan of tasks have an ephemeral nature, so this shouldn’t be an issue, as they die in the young space, not being promoted to the old space. The reason behing cheapness is that GC only visits live objects.

The churn rate of tasks is low in most cases, however there is a small GC penalty by delegating the tracking to WeakReference, but can be performed in a minor GC (yes, copying young GC) and aggressively cleared. Because, for most of the applications, the length of the minor GC-pauses is negligible, this is true if most of the objects in Eden can be considered garbage and are never copied to Survivor/Old spaces. If the opposite is true and most of the newborn objects are not eligible for GC, Minor GC pauses start taking considerably more time. In this case, tune you GC appropriately.

Soft-References - be careful

Why not SoftReferene values? Soft references require two major GCs in order to be collected, are costly to track, and if abused can fill up the heap to cause GC thrashing. So, they are not appropriate for dispatching.

Integrating cache with a CompletableFuture

Here’s a simple mechanism that allows to run asyncrhonously tasks via a ConcurrentMap. Please note that for ConcurrentHashMap a new method “compute” is atomic and dead-lock-prone, which should be handled with care:

MyConcurrentCache.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);

This is a gist!

Thread Pool - a big deal

≈A few words about ForkJoinPool and non-recursive tasks. Many people still tend to think that ForkJoinPool is efficient only for Recursive tasks. I asked this question a while ago on stackoverflow and even had to ask Alexey Shipilev for help. Why does Akka use ForkJoinPool as the main engine of task execution? Or why does JDK 8 take advantage of a commonPool() in its Streaming and CompletedFuture implementations? Fork/Join is the default parallel computation framework since Java 8. For short tasks, which don’t involve heavy I/O, ForkJoinPool is a way more scalable, unlike e.g. FixedThreadPool! JMH benchmarks depict this very clearly. Tasks are already small and needn’t a recursive decomposition. Work-stealing works (read the design of ForkJoinPool, if confused), regardless whether it is a big or small task - tasks can be grabbed by another free worker from the Deque’s tail of a busy worker, reducing contention drastically.

Don’t be surprised that in the graphs below for JDK 7/8 results are very different. These are the changes targetted at JDK 8, thus the difference. Both improvements for ForkJoinPool (up to 60x speedup!) and cache-oriented enhancements to the ConcurrentHashMap made this possible. The idea is to treat external submitters in a similar way as workers — using randomized queuing and stealing. This also greatly improves throughput when all tasks are async and submitted to the pool rather than forked, which becomes a reasonable way to structure actor frameworks, as well as many plain services that you might otherwise use ThreadPoolExecutor for. This was not true in JDK 7, where I observed a lot of overhead in many cases with FJ.

As a result, ForkJoinPool matches dispatcher’s requirements perfrectly, being able of accepting asynchronous tasks returning you a Future object. Moreover JSR-166 says that new features include support for completion-based designs that are often most appropriate for IO-bound usages, among others. But, I didn’t check with the I/O..

Akka Actors and dispatching framework : Differences in the design approach

There are 4 different dispatchers in the Akka framework.

Our WorkStealingDispatcher is similar to an Akka BalancingDispatcher by:

  • A shared queue (caching CHM, bear in mind though Akka’s queue has a different data structure)
  • A concept of work-stealing.

But there is a difference. My framework doesn’t know anything about actors (thus no concept of mailboxes and other related stuff). Moreover, caching techniques described here is just another requirement that is absent in the standard Akka implementation for clear reasons.

Akka invariant:

  • All actors can process all messages that have been sent to one of the actors via “work stealing” (to be more specific “work donating”).

Our invariant:

  • All threads can process all messages that have been sent to one of the dispatchId via “work stealing”.

Their PinnedDispatcher is like our ThreadBoundHashDispatcher again in the concept, but design differs. :-) It dedicates a single unique thread for each actor passed in as reference.

To prevent visibility and reordering problems on actors, Akka guarantees the following two “happens before” rules:

  1. The actor send rule: the send of the message to an actor happens before the receive of that message by the same actor.
  2. The actor subsequent processing rule: processing of one message happens before processing of the next message by the same actor.

My framework holds these guarantess too:

  1. The dispatchId send rule: the send of the message to a dispatchId happens before the receive of that message by the same dispatchId. Proof: In WorkStealingDispatcher, CHM hash-bucket locks and volatile reads ensure safe publication and happens-before guarantees; In ThreadBoundHashDispatcher, ConcurrentLinkedHashMap locks ensure safe publication and happens-before guarantees;
  2. The dispatchId subsequent processing rule: processing of one message happens before processing of the next message by the same dispatchID. Proof: In WorkStealingDispatcher, ConcurrentHashMap.compute is atomic by the contract and CompletableFuture’s ordered chain ensures this; ThreadBoundHashDispatcher: Java’s ConcurrentLinkedHashMap ensures this;

At last, the microbenchmarks

Benchmarks were written on JMH framework for JDK 7 and 8 separately and run on iMac Core i5 CPU @ 2.50GHz (4 cores) 8 GB, Yosemite OS. All the benchmark work with an empty Runnable synthetic task to mitigate side-effects.

I’ve attached here 2 graphs for comparison of performance of JDK 7 and JDK 8 algorithms, respectively, based on random dispatchIds over a finite set. For more thorough analysis, see the Dispatch Benchmarks on Github. The trends don’t differ much from test to test within a single Java-version, which proves eviction overhead (Bounded caching) of Weak-values to be neglible for these tests.

Tests for Benchmarking:

  1. A single dispatch-queue: putting new tasks always to the same dispatchId.
  2. Counting dispatchId: one-off queue of size = 1 per task, that is dispatchId is incremented by 1 for new task.
  3. Randomly filled set of queues with a size = 32768.

Benchmarking details:

  • Measuring a throughput (ops/s)
  • 4 user threads for all 3 tests;
    • Purpose: analyze contention impact on concurrent data-structures.
  • 2 types of ExecutorService { ThreadPoolExecutor, ForkJoinPool };
    • Purpose: analyze the impact of 2 different executors on throughput.

Caffeine JDK 8 benchmarks have an additional test with Caffeinated cache which is a robust data structure that won my benchmarking tests, and can be used in the dispatching as well.

JDK 8: 1.8.0_45

1. Java Version: 1.8.0_45

JDK 7: 1.7.0._71

2. Java Version: 1.7.0_71

Note: A Caffeine works with JDK 8+, so there’s no its benchmark here.

Results:

As can be seen, after introducing significant updates to Java 8, ForkJoinPool is a way more scalable, including ConcurrentHashMap changes compared to JDK 7. In JDK 8, in addition to the main submission deque, each worker has its own one. THe thing is that, a single submission Deque was identified as the serious performance bottleneck, Doug Lea et al. striped it. Then, workers that have no work to do, can first look into the submission queue associated with a particular worker, and then wander around looking into the submission queues of others. 60x improvement comes from eliminating a single contended lock, plus good and fast random queue selection to avoid “lockless” contention.

Conclusion

I studied some parts of Caffeine’s cache design under the hood. There are advanced techniques (e.g. eventually-consistent multithreaded datastructures, Stripe64 and full rewrite of ConcurrentHashMap proves to be very efficient)! It was fun to make some research, I believe it definetely deserves another post.

It showed very good results on the majority of JMH performance benchmarks compared to my implementations under similar parameters, considering working weak-values. I decided not to remove ThreadBoundHashDispatcher from the code though, as one might potentially have some benefits for some rare cases.

See the code on Github for more research, if interested.

By the way! Here’s a backport of Dispatcher to JDK 7 translated with Guava’s ListenableFuture. I needed it for my project, despite that JDK 7 is not supported officialy anymore.

Hope this helps,

Ivan

Toughest Backtracking Problems in Algorithmic Competitions

| Comments

backtrack

TL;DR

In algorithmic competitions there are frequently problems that can be attacked with recursive backtracking algorithms) - a well-known approach to traverse a search tree). Usually, it is good smell, if there’s a goal to analyze all existing combinations of a problem. And, of course, there needs to be the right strategy to meet time limits (e.g. prune it). Here, I’ve decided to talk about a few very interesting backtracking problems I came across. I touch on a backtracking approach to develop in competitors, but bear in mind that, not trying to solve a problem by yourself, seeing the answer up front is a waste of time. Furthermore, this is an advanced level, if you haven’t practiced a recursive backtracking or DFS, please spend some time on basic backtracking problems and come back.

1. Chess Puzzler

chess

This is quite an interesting problem I’ve ever come across, solving it you realize some very important uses cases to consider like memory limits, recursion, combinatorics and optimization techniques. I’ve seen a chess problem in Skiena’s  algorithmic book some time ago, but as turned out, this one is very different.

Problem Statement:

The problem is to find all distinct layouts of a set of normal chess pieces on a chess board with dimensions MxN where none of the pieces is in a position to take any of the others. Assume the color of the piece does not matter, and that there are no pawns among the pieces.

Write a program which takes as input:

  • The dimensions of the board: M, N.

  • The number of pieces of each type (King, Queen, Bishop, Rook and Knight) to try and place on the board.

As output, the program should yield the number of distinct layouts for which all of the pieces can be placed on the board without threatening each other.

Solution:

We represent each piece as: “K” - King “N” - Knight “Q” - Queen “R” - Rook “B” - Bishop M - Horizontal size of the board N - Vertical size of the board S - is a set of remaining pieces. For example: Input: 3×3 board containing 2 Kings and 1 Rook, that is S = [K,K,R]. Answer: 4 layouts.

layouts

Since we need to find all possible layouts of a chessboard, it can be solved with a recursive backtracking as follows. We take the next piece from S and calculate for it all possible freeSquares on the chess board. Next, by iterating in a loop over freeSquares for current piece, we try to put it in all possible freeSquares. Each loop-step is a potential solution (layout) calls itself recursively by trying to put the next piece for current chess board and so forth until there are no pieces left or freeSquares is empty. Once a piece is placed on the board, we update the set of the free squares by subtracting a set of squares threatened by this piece. In case the set of free squares is empty and there are still any remaining pieces not on the board, there’s no solution to this combination and the recursive function backtracks to the upper level in the recursive tree trying the next loop-step. Thereby, we loop over all steps and stop traversing by pruning impossible configuration in advance - as simple as this. There could be some arithmetic optimization with a number of threatened squares for each piece type by taking into account all remaining pieces to be put on the board and number of free squares, calculated in one go. Since the time limit in this problem was 20 mins to solve, I ditched an optimization. Undoubtedly, my solution can be drastically improved by cutting the search tree even more, and hence I leave this to the reader. Moreover you might want to parallelize this recursive task.

Finishing touch, namely what to do about duplicated pieces like 3 queens or 2 knights etc. Honestly, I spent a great deal of time on this while solving. The thing is that, duplicates are interchangeable in terms of different combinations on the chessboard. For instance, for a board of 1x2 length with free squares [x:1,y:1][x:1,y:2], 2 queens can be placed as [Q1][Q2] or [Q2][Q1] yielding 2 different combinations. A simple solution is to put at once all pieces of one type inside a loop-step. From combinatorics, we can enumerate all C(n, k) unique combinations (aka n choose k) in a single loop. Because we recurse, I created a utility function wrapped around with a standard java iterator which doesn’t have to calculate all combinations up front, rather it traverses them lazily by calculating the next one on the fly. The reason for this was a memory taken on each level of the recursion stack to keep an array of all combinations. E.g. C(n, k) = C(1000,5) results into 8,250,291,250,200 elements. There were also some minor issues with Groovy not being able to correctly calculate a difference between 2 lists of coordinate-pairs. Thanks to guys on stackoverflow who quickly replied with a workaround. The full working code  is now available on GitHub. If somebody of you have an idea to optimize it somehow, please comment one at the end of this post!

2. To backtrack or not, that’s the question: Meet “Mine Sweeper Master” from Google code jam 2014

minesweeper

A tricky and simple at the same time problem was posed last year on Google Code Jam in qualification round - a famous Mine Sweeper master. Yes, the one that comes with Windows operating system - I bet, most of you are aware of! It’s well-known solving minesweeper is NP-complete. But conditions of the problem don’t require you to do that (Please read a problem statement before proceeding).

Solving it with a backtracking is the wrong way, as you are not required to analyze all configurations. The catch is that any correct result is a solution (read carefully a problem  statement)! And thus, you don’t have to attack it with backtracking as this pattern is quite costly, aimed at getting all possible solutions. It is possible, but you won’t pass the large set most likely. Hence, the simplest idea is to start at (0,0) - upper-left corner and fill an area of N cells  with non-mine space from left to right and top to bottom - line-by-line. Further, fill the rest with mines. Clicking the (0,0) cell should reveal if this is a good solution. If (0,0) is not a mine - we have won. If the square contains a 0, repeat this recursively for all the surrounding squares.

There are also a number of important corner cases to consider for this approach:

Single non-mine

If N=1, any configuration is a correct solution.

Single row or single column

If <code>R=1</code>, simply fill in the <code>N</code> non-mines from left-to-right. If <code>C=1</code>, fill <code>N</code> rows with a (single) non-mine.

Too few non-mines

If <code>N</code> is even, it must be >= 4. 
If <code>N</code> is odd, it must be >= 9. Also, <code>R</code> and <code>C</code> must be >= 3.

Otherwise there's no solution.

Can’t fill first two rows

If <code>N</code> is even and you can't fill at least two rows with non-mines, then fill the first two rows with <code>N / 2</code> non-mines.   
If <code>N</code> is odd and you can't fill at least two rows with non-mines and a third row with 3 non-mines, then fill the first two rows with <code>(N - 3) / 2</code> non-mines and the third row with 3 non-mines.

Single non-mine in the last row

If <code>N % C = 1</code>, move the final non-mine from the last full row to the next row.

I was lazy to depict each one. As can be seen, there is bunch of special cases to consider to make this solution pass.

3. Another Chess Board Puzzler: “King” from Google Code Jam 2008

This is one of the toughest problems from Google Code Jam. It differs in that no one solved it in global Code Jam rounds during the round in which it was posed. Algorithmic competitions is like sports, if you feel you can solve easier problems faster - go for it. Otherwise you’re at risk of loosing the competition. Some day next time I will try to attack it too, and for now I say goodbye to  all of you.

Dockerizing Spray HTTP Server

| Comments

dockerspray

This is the continuation of the previous article. This series shows how simple it is to create a lightweight HTTP-server based on Spray framework, put it into a Docker-image and run multiple instances on any single machine requiring no dependencies.

Implementing a lightweight RESTful HTTP Service

The whole project can be found on GitHub. For impatient, git pull it and jump right to the next section. We’re going to use Scala and Akka framework along with SBT build tool. From Spray framework we will use a spray-routing module which has a simple routing DSL for elegantly defining RESTful web services and it works on top of a spray-can HTTP Server.

Ok, let’s get started.

import akka.actor.{ActorSystem}
import spray.routing._
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object HttpServer extends App with SimpleRoutingApp {

  implicit val actorSystem = ActorSystem()
  implicit val timeout = Timeout(1.second)
  import actorSystem.dispatcher

  startServer(interface = "localhost", port = 8080) {

    // GET /welcome --> "Welcome!" response
    get {
      path("welcome") {
        complete {
          <html>
            <h1>"Welcome!</h1>
            <p><a href="/terminate?method=post">Stop the server</a></p>
          </html>
        }
      }
    } ~
      // POST /terminate --> "The server is stopping" response
      (post | parameter('method ! "post")) {
        path("terminate") {
          complete {
            actorSystem.scheduler.scheduleOnce(1.second)(actorSystem.shutdown())(actorSystem.dispatcher)
            "The server is stopping"
          }
        }
      }
  }
    .onComplete {
    case Success(b) =>
      println("Successfully started")
    case Failure(ex) =>
      println(ex.getMessage)
      actorSystem.shutdown()
  }
}

The REST API is as follows:

  • GET/welcome –> responds with a “Welcome” and Post-hyperlink.

  • POST/terminate –> will stop the server.

DSL describing this API is inside of a method startServer.  And that’s it!

I didn’t want to show the full power of Spray because this article is solely about Docker.

Let’s run it and check:

curl http://localhost:8080/welcome

Dockerizing the server

Because the Docker Engine uses Linux-specific kernel features, I’m going to use lightweight virtual machine to run it on OS X. If you too, just download it, install and run - easy peasy. Just make sure before dockerizing that you’ve set the following 3 env variables for connecting your client to the VM:

export DOCKER_HOST=tcp://192.168.59.103:2376
export DOCKER_CERT_PATH=/Users/ivan/.boot2docker/certs/boot2docker-vm
export DOCKER_TLS_VERIFY=1

The remaining stuff doesn’t differ much.

We use a trusted automated java build (OpenJDK Java 7 JRE Dockerfile) as a base of our image.

First, you will need to create a Dockerfile for the image in your project:

# Our base image
FROM dockerfile/java

WORKDIR /

USER daemon

# Here the stuff that we're going to place into the image
ADD target/scala-2.11/docker-spray-http-server-assembly-1.0.jar /app/server.jar

# entry jar to be run in a container
ENTRYPOINT [ "java", "-jar", "/app/server.jar" ]

# HTTP port
EXPOSE 8080

Build your project as a single jar file:

DockerSprayHttpServer$ sbt assembly

And now navigate to the project’s folder and run:

DockerSprayHttpServer$ docker build .

This will send the newly created image to the Docker daemon.

Running multiple instances on a single machine

Run the following command to see available docker images :

DockerSprayHttpServer$ docker images

Снимок экрана 2014-12-15 в 23.48.52

a83cda03f529 is not in the repo yet - what we’ve just created. We’re going to run multiple instances from it.

First run the 1-st instance:

DockerSprayHttpServer$ docker run -d -p 10001:8080 a83cda03f529

Note, that we have mapped our 8080–>10001 port.

Now, let’s verify the container is running:

DockerSprayHttpServer$ docker ps

Снимок экрана 2014-12-16 в 0.58.12

We exposed port 8080 for the http-server. When run in Docker-container, it maps our port onto another. On top of this I am  on OS X. Do you remember we set at the beginning 3 env vars? One of them is DOCKER_HOST.

You can actually check this IP as follows:

DockerSprayHttpServer$ boot2docker ip

We need to use this IP address (for OS X, as we are not on Linux).

Let’s test it:

DockerSprayHttpServer$ curl $(boot2docker ip):10001/welcome

Great!

You can run as many containers as you want! They are completely isolated. For Scala developers, by they way, I’ve found a nice contribution, you can dockerize your artifacts right from SBT.