Atomicity from single-cycle compare & swap

There is a wonderful function defined in scala.concurrent.Future trait called onComplete. Here is the declaration:

def onComplete[U](func: Try[T] => U)
              (implicit executor: ExecutionContext): Unit

There is one key-point in the documentation which says :

If the future has already been completed, this will either be applied immediately or be scheduled asynchronously.

The reason I call it is wonderful function is its implementation. Roughly speaking doing:

import scala.concurrent._
import ExecutionContext.Implicits.global
val future = future{
Thread.sleep(1000)
"Successful"
}
future onComplete{
case Success(i) => println(i)
case Failure(i) => println(i)
}

Line 3 dispatches the method to a thread and returns a Future. At line 7 we start an asynchronous call which says – When the task is completed, depending upon the return value, please print it.

On reading the documentation, it  might suggest that it creates a new java.lang.Thread as soon as line-7 is executed. This Thread waits for “future” to complete and then print the value. But say if the thread is from Executors.newCachedThreadPool then this might also mean that for every call to onComplete, a separate Thread is responsible to execute the above mentioned. A Disaster!!

But its inventors were smart. What they instead did was to have an internal list which stores all functions to be called once the Future completes. Once it completes, it calls back all those registered functions itself or using ExecutorService. This way no extra Thread has to be creates or blocked. What is interesting is that they did not use any internal list to store the list of callback functions.


private volatile Object _ref;
final static long _refoffset;

protected final boolean updateState(Object oldState, Object newState) {
return Unsafe.instance.compareAndSwapObject(this, _refoffset, oldState, newState);
 }

Now calling onComplete does below. It replaces previous pointer with a new list containing previous list and a new addition. On calling multiple times, it keeps adding to the previous list.

updateState(listeners, runnable :: listeners)

function tryComplete of CallbackRunnable is called once the future isCompleted. Now is when all the above listeners are to be executed which are dispatched to the ExecutorService.

def tryComplete(value: Try[T]): Boolean = {
  val resolved = resolveTry(value)
   (try {
     @tailrec
     def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
      getState match {
        case raw: List[_] =>
        val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
        if (updateState(cur, v)) cur else tryComplete(v)
            case _ => null
      }
    }
   tryComplete(resolved)
  } finally {
    synchronized { notifyAll() } //Notify any evil blockers
  }) match {
     case null => false
     case rs if rs.isEmpty => true
     case rs => rs.foreach(r => r.executeWithValue(resolved)); true
   }
 }
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
 val preparedEC = executor.prepare
 val runnable = new CallbackRunnable[T](preparedEC, func)

  @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
  def dispatchOrAddCallback(): Unit =
   getState match {
    case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
    case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
   }
  dispatchOrAddCallback()
  }
 }

What is nice about the above is how they have used a single cycle processor call – “Unsafe.instance.compareAndSwapObject” to achieve synchronization. At line-30 recursive calls are made to swap it. Say if some other thread at the exact moment did not change the reference (i.e. add a future), then in the first recursive call, the operation succeeds. Say if someone did, then one more recursive call is made where it tries again. This is a superb optimization where without using any synchronization, it is able to do both “Memory Visibility” and “Atomicity”.

Here is lazy initialization (github link) implementation in Java inspired from Scala’s lazy val implementation. It uses similar technique to guarantee atomicity with memory-visibility.