Parallel Streams!!

I am always skeptical of using parallel streams in Java8. For simple reason:

You do not have control to decide the pool responsible for executing tasks. By default, common ForkJoinPool executes it.

So in short when you do:

stream.parallel().filter(x -> isPrime(x));

The common pool executes it. And you cannot let some other pool execute `filter` in a straight-forward manner. It gets really dangerous on large codebases where people stupidly using parallel streams. If any computation gets time taking or worse blocking, it impacts all the parallel streams because they all use the same pool.

Though there is a way to let your custom pool execute tasks, but it is insanely irritating (and breakable).

ForkJoinPool pool = new ForkJoinPool(4);
int[] ans = pool.submit(() -> {
            return IntStream.range(1,1000000).parallel().filter(x -> isPrime(x)).toArray();
        }).get();

public static boolean isPrime(int i){
        System.out.println(Thread.currentThread());
        return true;
    }

The above prints Thread[ForkJoinPool-1-worker-0,5,main] rather than Thread[ForkJoinPool.commonPool-worker-2,5,main]. Which means the subtasks are executed by our pool rather than the common pool. It seems magic because we haven’t requested the parallel stream to use our pool. This implicitly happens because of ForkJoinTask#doInvoke internals

private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
            //else delegate to common pool
    }

Where it checks if the current thread is part of any pool, if so obtains reference to the pool and uses it. Else delegates to common pool.

I haven’t found documentation which suggests above behavior. Hence the behavior cannot be fully trusted.

Tagged

Constraint Argument to Multiple Types

A lot of times while writing quick scripts (where I haven’t thought through well), I wish the compiler would be a bit lenient to allow any argument of different types. Like this:

class Animal
def getInt(n: Any) : Int = n match {
    case x: Int => x
    case x: String => Integer.parseInt(x)
    case x: Animal => 42
}

Duck typing looks well. But its too lenient for me. I dont like the ability to do getInt(new Object). I wish I was allowed to do:

def getInt(n: Int || String || Animal) : Int = n match {
    case x: Int => ....

One very smart way of doing it can be:

sealed trait GetIntType[-T]
object GetIntType{
implicit object IntType extends GetIntType[Int]
implicit object StringType extends GetIntType[String]
implicit object AnimalType extends GetIntType[Animal]
}

class Dog extends Animal

def getInt[T : GetIntType](n: T) : Int = n match {
case x: Int => x
case x: String => Integer.parseInt(x)
case x: Animal => 42
}

scala> getInt(1)
res7: Int = 1

scala> getInt("1")
res8: Int = 1

It works because, the compiler constraint is from the availability of implicit objects in `GetIntType` companion object. The context bound implicit def f: GetIntType[T] by default searches for implicit objects inside companion object of `GetIntType`. So now not only you can do this:

scala> getInt(new Animal)
res5: Int = 42</pre>

But also this:

scala> getInt(new Dog)
res6: Int = 42

It works because `GetIntType` is contra-variant. Which means if Dog <: Animal then GetIntType[Animal] <: GetIntType[Dog].

But I still find the above tedious. And too much baggage code.


 

 

 

Hack Puzzle – 1

I have been thinking for quite some-time to build a puzzle. I think I have built an awesome one and hope everyone finds it awesome to hack :)

You can download the jar from here.

Aim:
  • Make the program terminate gracefully
Terminate gracegullyImplies that the program should end. And it should not end by throwing an Exception or in an abnormal manner. Using `System.exit` or the program terminating due to an exception is not graceful termination.
To run:
  • java -jar DeadLock2.jar
If successfully run it will output:

Oops there is a deadlock. Please solve the deadlock to make me move forward.

Rules:
  1. You cannot replace any of the jar contents with your files
  2. You cannot recompile any of the classes in jar and replace them
  3. You are permitted to alter/modify the contents of the jar/class files
  4. You are permitted to add or use any external library to solve it
If successfully solved, the program prints the below:

Puzzle Soved!! Congratulations you have solved the puzzle.

Comment below the tools you used and time it took to solve. Happy Hacking :-)

Replace View Bounds

Well the scala community has decided to deprecate view bounds in Scala. I suspect it might have to do with brevity. Full discussion can be viewed here

Well looks like most of the view bounds (<%) can be converted to context bounds. Lets see different strategies that can help us do the same. Suppose we have:


scala> def foo[T <% Int](x: T):Int = x
foo: [T](x: T)(implicit evidence$1: T => Int)Int

scala> implicit def a[T](n:T) = n match {
 | case x:String => x.toInt
 | }
warning: there were 1 feature warning(s); re-run with -feature for details
a: [T](n: T)Int

scala> foo("23")
res4: Int = 23

To convert it to `context bound` we need to implicitly have something that does T => Int. We could:


scala> type L[X] = X => Int
defined type alias L

scala> def goo[T : L](x: T):Int = x
goo: [T](x: T)(implicit evidence$1: T => Int)Int

scala> goo(23)
res2: Int = 23

We could combine the type declaration with the function definition as:


scala> def goo[T : ({type L[X] = X => Int})#L](x: T):Int = x
goo: [T](x: T)(implicit evidence$1: T => Int)Int

//quite cryptic though

A more readable version:


scala> def goo2[T](x: T)(implicit ev: T => Int):Int = x
goo2: [T](x: T)(implicit ev: T => Int)Int

scala> goo2("12")
res8: Int = 12

More generalized version:


scala> def goo[E,T : ({type L[X] = X => E})#L](x: T):E = x
goo: [E, T](x: T)(implicit evidence$1: T => E)E

scala> def goo2[T, E](x: T)(implicit ev: T => E):E = x
goo2: [T, E](x: T)(implicit ev: T => E)E

scala> goo("1000")
res10: String = 1000

scala> goo2("1000")
res11: String = 1000

Asynchronous Distributed Key-Value DB

I have developed a simple-beta version of an synchronous distributed key-value database. It is simple and quite robust and I plan to make it not so-simple and open to other languages (apart from JVM based ones) in the future :). You can download it and start using  from github.

In short (Refer wiki for more info): Suraj Bhirani

  1. It is Asynchronous
  2. You can have a single primary node with any number of secondary nodes which can join or leave arbitrarily. 
  3. It is truly distributed. You just need the address of “Arbiter” using which any node running on any instance can start acting as a node.
  4. Insert (Key-Value pair) can only happen at primary node. Search can be done on any node.
  5. Order and consistency is maintained at all nodes. i.e. given enough time, all replicas settle on the same view.
  6. Each node has a Persistence service which takes backup of every insertion/removal. This is client dependent and can be done as desired using any SQL/NoSQL database or file for that matter.
  7. Tested vigorously. I believe more effort has gone in testing it than building it.

The following are the current limitations. I hope to remove them by the next version

  1. Currently updates are only possible on a dedicated node, the primary replica.
  2. The primary (leader) node cannot fail during the uptime of the system.
  3. Membership is handled reliably by a provided subsystem (see section “The Arbiter”). New Replicas which wish to Join should send aJoin call to Arbiter. But for a replica to leave, the current process is quite tedious by sending a manual Replcias(Set[ActorRef])call to Primary Replica.
  4. The update rate is low, meaning that no incoming requests are rejected due to system overload. In case of rejecting an update the store is left in a possibly inconsistent state which may require a subsequent succeeding write to the same key to repair it.
  5. Clients are expected not to reuse request IDs before the request has been fully processed and responded to.
  6. There is an utter small chance for the database to be in in-consistent state. Please refer the section “Consistency in the case of failed replication or persistence” in the overview.

But in short “It Works!!”

Technologies:

  1. Scala
  2. Akka
  3. SBT as build tool

I highly recommend everyone to have a look at Akka. There are very frameworks that scale as beautifully as Akka does.

Special Thanks to the course “Principles of Reactive Programming” at coursera to help me get started.

Tagged ,

Atomicity from single-cycle compare & swap

There is a wonderful function defined in scala.concurrent.Future trait called onComplete. Here is the declaration:

def onComplete[U](func: Try[T] => U)
              (implicit executor: ExecutionContext): Unit

There is one key-point in the documentation which says :

If the future has already been completed, this will either be applied immediately or be scheduled asynchronously.

The reason I call it is wonderful function is its implementation. Roughly speaking doing:

import scala.concurrent._
import ExecutionContext.Implicits.global
val future = future{
Thread.sleep(1000)
"Successful"
}
future onComplete{
case Success(i) => println(i)
case Failure(i) => println(i)
}

Line 3 dispatches the method to a thread and returns a Future. At line 7 we start an asynchronous call which says – When the task is completed, depending upon the return value, please print it.

On reading the documentation, it  might suggest that it creates a new java.lang.Thread as soon as line-7 is executed. This Thread waits for “future” to complete and then print the value. But say if the thread is from Executors.newCachedThreadPool then this might also mean that for every call to onComplete, a separate Thread is responsible to execute the above mentioned. A Disaster!!

But its inventors were smart. What they instead did was to have an internal list which stores all functions to be called once the Future completes. Once it completes, it calls back all those registered functions itself or using ExecutorService. This way no extra Thread has to be creates or blocked. What is interesting is that they did not use any internal list to store the list of callback functions.


private volatile Object _ref;
final static long _refoffset;

protected final boolean updateState(Object oldState, Object newState) {
return Unsafe.instance.compareAndSwapObject(this, _refoffset, oldState, newState);
 }

Now calling onComplete does below. It replaces previous pointer with a new list containing previous list and a new addition. On calling multiple times, it keeps adding to the previous list.

updateState(listeners, runnable :: listeners)

function tryComplete of CallbackRunnable is called once the future isCompleted. Now is when all the above listeners are to be executed which are dispatched to the ExecutorService.

def tryComplete(value: Try[T]): Boolean = {
  val resolved = resolveTry(value)
   (try {
     @tailrec
     def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
      getState match {
        case raw: List[_] =>
        val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
        if (updateState(cur, v)) cur else tryComplete(v)
            case _ => null
      }
    }
   tryComplete(resolved)
  } finally {
    synchronized { notifyAll() } //Notify any evil blockers
  }) match {
     case null => false
     case rs if rs.isEmpty => true
     case rs => rs.foreach(r => r.executeWithValue(resolved)); true
   }
 }
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
 val preparedEC = executor.prepare
 val runnable = new CallbackRunnable[T](preparedEC, func)

  @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
  def dispatchOrAddCallback(): Unit =
   getState match {
    case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
    case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
   }
  dispatchOrAddCallback()
  }
 }

What is nice about the above is how they have used a single cycle processor call – “Unsafe.instance.compareAndSwapObject” to achieve synchronization. At line-30 recursive calls are made to swap it. Say if some other thread at the exact moment did not change the reference (i.e. add a future), then in the first recursive call, the operation succeeds. Say if someone did, then one more recursive call is made where it tries again. This is a superb optimization where without using any synchronization, it is able to do both “Memory Visibility” and “Atomicity”.

Here is lazy initialization (github link) implementation in Java inspired from Scala’s lazy val implementation. It uses similar technique to guarantee atomicity with memory-visibility.

Tagged