Wednesday, August 31, 2016

Threads pools with the Executor Framework


Thread pools manage a pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed.
A thread pool can be described as a collection of Runnable objects
(work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a newRunnable object to the work queue.
The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds life cycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.
package de.vogella.concurrency.threadpools;

/**
 * MyRunnable will count the sum of the number from 1 to the parameter
 * countUntil and then write the result to the console.
 * <p>
 * MyRunnable is the task which will be performed
 *
 * @author Lars Vogel
 *
 */
public class MyRunnable implements Runnable {
        private final long countUntil;

        MyRunnable(long countUntil) {
                this.countUntil = countUntil;
        }

        @Override
        public void run() {
                long sum = 0;
                for (long i = 1; i < countUntil; i++) {
                        sum += i;
                }
                System.out.println(sum);
        }
}
Now you run your runnables with the executor framework.
package de.vogella.concurrency.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
        private static final int NTHREDS = 10;

        public static void main(String[] args) {
                ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
                for (int i = 0; i < 500; i++) {
                        Runnable worker = new MyRunnable(10000000L + i);
                        executor.execute(worker);
                }
                // This will make the executor accept no new threads
                // and finish all existing threads in the queue
                executor.shutdown();
                // Wait until all threads are finish
                executor.awaitTermination();
                System.out.println("Finished all threads");
        }
}
In case the threads should return some value (result-bearing threads) then you can use thejava.util.concurrent.Callable class.

 Futures and Callables

The executor framework presented in the last chapter works with Runnable objects. Unfortunately aRunnable cannot return a result to the caller.
In case you expect your threads to return a computed result you can use java.util.concurrent.Callable. The Callable object allows to return values after completion.
The Callable object uses generics to define the type of object which is returned.
If you submit a Callable object to an Executor the framework returns an object of typejava.util.concurrent.Future . Future exposes methods allowing a client to monitor the progress of a task being executed by a different thread. Therefore a Future object can be used to check the status of aCallable and to retrieve the result from the Callable .
On the Executor you can use the method submit to submit a Callable and to get a future. To retrieve the result of the future use the get() method.
package de.vogella.concurrency.callables;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Long> {
        @Override
        public Long call() throws Exception {
                long sum = 0;
                for (long i = 0; i <= 100; i++) {
                        sum += i;
                }
                return sum;
        }

}
package de.vogella.concurrency.callables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutures {
        private static final int NTHREDS = 10;

        public static void main(String[] args) {

                ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
                List<Future<Long>> list = new ArrayList<Future<Long>>();
                for (int i = 0; i < 20000; i++) {
                        Callable<Long> worker = new MyCallable();
                        Future<Long> submit = executor.submit(worker);
                        list.add(submit);
                }
                long sum = 0;
                System.out.println(list.size());
                // now retrieve the result
                for (Future<Long> future : list) {
                        try {
                                sum += future.get();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        } catch (ExecutionException e) {
                                e.printStackTrace();
                        }
                }
                System.out.println(sum);
                executor.shutdown();
        }
}

CompletableFuture

Asynchronous task handling is important for any application which performs time consuming activities, as IO operations. Two basic approaches to asynchronous task handling are available to a Java application: a thread can wait for a task, which is a blocking approach. Or the task can perform an action directly when the event completes, this is called a nonblocking approach.
CompletableFuture extends the functionality of the Future interface with standard techniques for executing application code when a task completes, including various ways to combine tasks.CompletableFuture support both blocking and nonblocking approaches, including regular callbacks.
This callback can be executed in another thread as the thread in which the CompletableFuture is executed.
The following example demonstrates how to create a basic CompletableFuture.CompletableFuture.supplyAsync runs the task asynchronously on the default thread pool of Java. It has the option to supply your custom executor to define the ThreadPool .
package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSimpleSnippet {
        public static void main(String[] args) {
                long started = System.currentTimeMillis();

                // configure CompletableFuture
                CompletableFuture<Integer> futureCount = createCompletableFuture();

                        // continue to do other work
                        System.out.println("Took " + (started - System.currentTimeMillis()) + " milliseconds" );

                        // now its time to get the result
                        try {
                          int count = futureCount.get();
                                System.out.println("CompletableFuture took " + (started - System.currentTimeMillis()) + " milliseconds" );

                           System.out.println("Result " + count);
                         } catch (InterruptedException | ExecutionException ex) {
                            // Exceptions from the future should be handled here
                        }
        }

        private static CompletableFuture<Integer> createCompletableFuture() {
                CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                            () -> {
                                try {
                                    // simulate long running task
                                    Thread.sleep(5000);
                                } catch (InterruptedException e) { }
                                return 20;
                            });
                return futureCount;
        }

}
The usage of the thenApply method is demonstrated by the following code snippet.
package snippet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCallback {
        public static void main(String[] args) {
                long started = System.currentTimeMillis();

                CompletableFuture<String>  data = createCompletableFuture()
                            .thenApply((Integer count) -> {
                                int transformedValue = count * 10;
                                return transformedValue;
                            }).thenApply(transformed -> "Finally creates a string: " + transformed);

                        try {
                            System.out.println(data.get());
                        } catch (InterruptedException | ExecutionException e) {

                        }
        }

        public static CompletableFuture<Integer> createCompletableFuture() {
                CompletableFuture<Integer>  result = CompletableFuture.supplyAsync(() -> {
                try {
                    // simulate long running task
                    Thread.sleep(5000);
                } catch (InterruptedException e) { }
                return 20;
            });
                return result;
        }

}




No comments:

Post a Comment

Required details

--------------------------------------------------------------------------------------------------------------------------- C:\Program File...