Threads pools with the Executor Framework
Thread pools manage a pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed.
A thread pool can be described as a collection of
Runnable objects
(work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new
Runnable object to the work queue.
The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds life cycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.
package de.vogella.concurrency.threadpools;
/**
* MyRunnable will count the sum of the number from 1 to the parameter
* countUntil and then write the result to the console.
* <p>
* MyRunnable is the task which will be performed
*
* @author Lars Vogel
*
*/
public class MyRunnable implements Runnable {
private final long countUntil;
MyRunnable(long countUntil) {
this.countUntil = countUntil;
}
@Override
public void run() {
long sum = 0;
for (long i = 1; i < countUntil; i++) {
sum += i;
}
System.out.println(sum);
}
}
Now you run your runnables with the executor framework.
package de.vogella.concurrency.threadpools;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
In case the threads should return some value (result-bearing threads) then you can use thejava.util.concurrent.Callable class.
Futures and Callables
The executor framework presented in the last chapter works withRunnableobjects. Unfortunately aRunnablecannot return a result to the caller.In case you expect your threads to return a computed result you can usejava.util.concurrent.Callable. TheCallableobject allows to return values after completion.TheCallableobject uses generics to define the type of object which is returned.If you submit aCallableobject to anExecutorthe framework returns an object of typejava.util.concurrent.Future.Futureexposes methods allowing a client to monitor the progress of a task being executed by a different thread. Therefore aFutureobject can be used to check the status of aCallableand to retrieve the result from theCallable.On theExecutoryou can use the method submit to submit aCallableand to get a future. To retrieve the result of the future use theget()method.package de.vogella.concurrency.callables; import java.util.concurrent.Callable; public class MyCallable implements Callable<Long> { @Override public Long call() throws Exception { long sum = 0; for (long i = 0; i <= 100; i++) { sum += i; } return sum; } }package de.vogella.concurrency.callables; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableFutures { private static final int NTHREDS = 10; public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); List<Future<Long>> list = new ArrayList<Future<Long>>(); for (int i = 0; i < 20000; i++) { Callable<Long> worker = new MyCallable(); Future<Long> submit = executor.submit(worker); list.add(submit); } long sum = 0; System.out.println(list.size()); // now retrieve the result for (Future<Long> future : list) { try { sum += future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println(sum); executor.shutdown(); } }CompletableFuture
Asynchronous task handling is important for any application which performs time consuming activities, as IO operations. Two basic approaches to asynchronous task handling are available to a Java application: a thread can wait for a task, which is a blocking approach. Or the task can perform an action directly when the event completes, this is called a nonblocking approach.CompletableFutureextends the functionality of theFutureinterface with standard techniques for executing application code when a task completes, including various ways to combine tasks.CompletableFuturesupport both blocking and nonblocking approaches, including regular callbacks.This callback can be executed in another thread as the thread in which theCompletableFutureis executed.The following example demonstrates how to create a basic CompletableFuture.CompletableFuture.supplyAsyncruns the task asynchronously on the default thread pool of Java. It has the option to supply your custom executor to define theThreadPool.package snippet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureSimpleSnippet { public static void main(String[] args) { long started = System.currentTimeMillis(); // configure CompletableFuture CompletableFuture<Integer> futureCount = createCompletableFuture(); // continue to do other work System.out.println("Took " + (started - System.currentTimeMillis()) + " milliseconds" ); // now its time to get the result try { int count = futureCount.get(); System.out.println("CompletableFuture took " + (started - System.currentTimeMillis()) + " milliseconds" ); System.out.println("Result " + count); } catch (InterruptedException | ExecutionException ex) { // Exceptions from the future should be handled here } } private static CompletableFuture<Integer> createCompletableFuture() { CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync( () -> { try { // simulate long running task Thread.sleep(5000); } catch (InterruptedException e) { } return 20; }); return futureCount; } }The usage of thethenApplymethod is demonstrated by the following code snippet.package snippet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableFutureCallback { public static void main(String[] args) { long started = System.currentTimeMillis(); CompletableFuture<String> data = createCompletableFuture() .thenApply((Integer count) -> { int transformedValue = count * 10; return transformedValue; }).thenApply(transformed -> "Finally creates a string: " + transformed); try { System.out.println(data.get()); } catch (InterruptedException | ExecutionException e) { } } public static CompletableFuture<Integer> createCompletableFuture() { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> { try { // simulate long running task Thread.sleep(5000); } catch (InterruptedException e) { } return 20; }); return result; } }
http://www.vogella.com/tutorials/JavaConcurrency/article.html#threads-pools-with-the-executor-framework
No comments:
Post a Comment