About the upcoming Java 9 release
Photo by Alejandro Benėt on Unsplash
As you may already know, Java 9 will be release soon (likely in October 2017). It's been several years now that we talk about it. Mainly because of the Jigsaw project on which a lot of buzz has been done. In my opinion, Jigsaw is clearly not the most interesting feature of Java 9. But I won't troll here about that ! So, here, I will discuss a small API improvement : the improvement of the Process API.
A nice API
This work was in the scope for JEP 102 and aims to provide a better interface for managing native process calls. The main drawback with the API (up to Java 8) was that it was blocking. At some point, you couldn't escape from the p.waitFor()
call. Now, this time is over and calling p.onExit()
will return a CompletableFuture<Process>
.
There is always a but...
Unfortunately, there is always something to notice before using this API naively. First, let's read the documentation. At the end of the documentation for the onExit
method, it's stated :
This implementation executes waitFor() in a separate thread repeatedly until it returns successfully. If the execution of waitFor is interrupted, the thread's interrupt status is preserved.
Hum... That is annoying ! The waiting is done in an arbitrary thread. Let's dive into the implementation, to see how it works under the hood.
Back to the sources
As you may already know the sources of the JDK are a bit disturbing when browsed for the first time. So, to make things short, here are the links to the Process shared implementation and to the specific Unix implementation.
In the Unix implementation, here is the code which is relevant for us :
public CompletableFuture onExit() {
return ProcessHandleImpl.completion(pid, false)
.handleAsync((unusedExitStatus, unusedThrowable) -> {
boolean interrupted = false;
while (true) {
// Ensure that the concurrent task setting the exit status has completed
try {
waitFor();
break;
} catch (InterruptedException ie) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
});
}
There we see that the class involved is in fact ProcessHandle
, which is a new interface along Process
. The static method completion
is in the shared implementation (ProcessHandleImpl
). This is this method which is responsible for building the initial CompletableFuture
.
To better understand this code, let's remind that in the call that interests us the shouldReap
flag is set to false
. Here is a modified version, without the code
static CompletableFuture completion(long pid, boolean shouldReap) {
// check canonicalizing cache 1st
ExitCompletion completion = completions.get(pid);
// re-try until we get a completion that shouldReap => isReaping
while (completion == null || (shouldReap && !completion.isReaping)) {
ExitCompletion newCompletion = new ExitCompletion(shouldReap);
if (completion == null) {
completion = completions.putIfAbsent(pid, newCompletion);
} else {
completion = completions.replace(pid, completion, newCompletion)
? null : completions.get(pid);
}
if (completion == null) {
// newCompletion has just been installed successfully
completion = newCompletion;
// spawn a thread to wait for and deliver the exit value
processReaperExecutor.execute(() -> {
int exitValue = waitForProcessExit0(pid, shouldReap);
if (exitValue == NOT_A_CHILD) {
// pid not alive or not a child of this process
// If it is alive wait for it to terminate
long sleep = 300; // initial milliseconds to sleep
int incr = 30; // increment to the sleep time
long startTime = isAlive0(pid);
long origStart = startTime;
while (startTime >= 0) {
try {
Thread.sleep(Math.min(sleep, 5000L)); // no more than 5 sec
sleep += incr;
} catch (InterruptedException ie) {
// ignore and retry
}
startTime = isAlive0(pid); // recheck if is alive
if (origStart > 0 && startTime != origStart) {
// start time changed, pid is not the same process
break;
}
}
exitValue = 0;
}
newCompletion.complete(exitValue);
// remove from cache afterwards
completions.remove(pid, newCompletion);
});
}
}
return completion;
}
We can see that the completion implementation used is ExitCompletion
. There is a task submitted to a processReaperExecutor
which is involved. This executor is created as an unbounded thread pool which will create a new Thread
for each process on which you called onExit
. This may not be what you want, especially if you are calling several long processes at the same time.
Trade-off
In the end, this nice and easy to use API that fits well with the asynchronous tools of the JDK doesn't come without a price. If it's fine to use it on small programs that do not call "too numerous" native processes, beware when calling tons of them, especially if they are long running processes or short lived processes as there is no thread recycling in this implementation (and creating new threads has a price).
The best advice I can give you is to roll out your own watcher. There are several ways to achieve this. Use a ScheduledExecutorService
bound to a single thread, for instance.
An implementation for my use case
The main reason why I found this API was to achieve a goal in a real world application. My use case is the following : calling gdal_translate
for large rasters. If you are not familiar with Gdal, let's just say that it is the tool for GIS in the FLOSS land. Calls to gdal_translate
take time and I will need to call it often, sometimes in parallel.
Inside Java 9
The CompletableFuture
class exists only since Java 8 and is not yet used widely in the java.util.concurrent
package. But Java 9 add some valuable methods in that regard. While searching how to solve my problem, I found out that if Java 9 leave the various Executor
s implementations free of any reference to CompletableFuture
, it adds some methods to the CompletableFuture
class itself. In Java 9, there is a delayedExecutor
method which is a factory for a n executor that execute tasks after some delay. It is a bit confusing to find this method in CompletableFuture
, and it may be hard to see the connection with CompletableFuture
but one of the reasons for it is hidden in the implementation of the method orTimeout
. There, we can see that the internal mechanisms, under the hood use a ScheduledThreadPoolExecutor
of one single (daemon) thread. This thread is a singleton, so it is used for all CompletableFuture
s.
Backporting to Java 8
Guess what ? My project uses Java 8 and is of course already in production ! I need to use Java 8. Let's see how we can prepare a simple implementation for my use case, inspired by the Java 9 API.
Let's define a class ProcessWatcher
which internally uses a ScheduledExecutorService
. This class will provide only one public method to run processes in an asynchronous fashion :
public CompletableFuture runProcessAsync(ProcessBuilder process, long timeout, TimeUnit timeUnit) {
CompletableFuture handle = new CompletableFuture<>();
underlying.execute(() -> {
try {
Process p = process.start();
underlying.schedule(() -> handle.completeExceptionally(new TimeoutException()), timeout, timeUnit); // Schedule timeout
underlying.schedule(new ProcessWatcher(p, handle), this.throttle, this.timeUnit);
} catch (IOException e) {
handle.completeExceptionally(e);
}
}); return handle;
}
This code is simple. It uses the underlying
ScheduledExecutorService
to create a forked process and position two scheduled tasks, one is in charge of managing the timeout. The other exists to check on a regular basis if the process is still running. Then we return the handle to the caller. The Java API is weak as it does not make any difference between the promise and the future. In other languages, such as Scala, only the one holding the promise is able to fulfill it, and the caller which gets the Future cannot call the equivalent method of complete
. Anyway, I introduced an inner class in thid code, ProcessWatcher
with is an implementation of Runnable
. Again, it is very simple :
private class ProcessWatcher implements Runnable{
final Process toWatch;
final CompletableFuture handle;
ProcessWatcher(Process toWatch, CompletableFuture handle) {
this.toWatch = toWatch;
this.handle = handle;
}
@Override
public void run() {
if (toWatch.isAlive()) {
if (!handle.isDone()) {
// Re schedule a new task
underlying.schedule(new ProcessWatcher(toWatch, handle), throttle, timeUnit);
} else {
// In case of timeout
toWatch.destroy();
}
} else {
handle.complete(toWatch.exitValue());
}
}
}
As in the CompletableFuture
implementation, I used the same pattern as the TaskSubmitter
runnable. The only goal here is to check if the process is done yet, or if we need to check again later. And that's it for a first draft.
But wait...
To be honest, there is a Java wrapper which calls the C API of gdal under the hood. I did not played with it yet, but I am pretty sure it offers better performance than spawning a separate process to do the job (but i'm also sure it is not an async implementation !).
Conclusion
At first, the new Process
API looked really nice and useful. Especially if you are waiting for non-blocking, async APIs in the JDK. CompletableFuture
s are fairly new in Java and it will take time to make the concept spread across the JDK. Native implementation in OS of the signaling on process ending may come one day and may be that day we won't need a watcher task to handle process completion. Anyway as with anything that seems magic, taking a look under the hood explains a lot. This investigation lead me to the new methods in CompletableFuture
and a bunch of them will make your life easier, for sure !
A good option on my real world issue would be also to remind me that the machine running my program has physical constraints and limits. In some way, limiting the pressure on the time/CPU consuming calls to gdal will help. This could be done with reactive flows. But this could be the topic for (several) future posts !
On a personal note, it was the first time I was digging this much into the sources of the JDK. I really enjoyed it, but as for every first time, I may have missed something, so, do not hesitate to tell me your remarks and comments on twitter!