Site icon Java blog

Java #57: egzekutory – pule wątków

java executors

Egzekutory

W poprzednich lekcjach wątki były odpalane za pomocą odpalania za każdym razem osobnej instancji. Nie jest to jednak bardzo wygodny sposób. Wyobraź sobie, że musisz ten sam wątek odpalić tysiąc razy! Wymagałoby to za każdym razem tworzenia nowego obiektu i wywoływania na nim metody start. Jest jednak dużo wygodniejszy sposób, mianowicie skorzystanie z puli wątków.

CounterThread firstCounter = new CounterThread();
firstCounter.start();
CounterThread secondCounter = new CounterThread();
secondCounter.start();
CounterThread thirdCounter = new CounterThread();
thirdCounter.start();

Powyższy kod powinien już być Ci znany z poprzednich lekcji. Teraz postaram się go zmodyfikować, używając do tego dedykowanych struktur.

Interfejs Executor

public class ExecutorRunner {
    public static void main(String[] args) {
    	Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(new CounterThread());
    }
}

W metodzie main zadeklarowałem interfejs Executor, który następnie za pomocą metody execute, przydzielam mu zadanie do wykonania (w tym przypadku jest to odpalenie wątku klasy CounterThread). Zastosowano tutaj egzekutor z pojedynczym wątkiem (newSingleThreadExecutor). Dokładnie omawiam go pod koniec lekcji (rodzaje puli wątków). Taki kod powinien pozwolić Ci poprawnie odpalić Twój wątek, jednak możesz go jeszcze bardziej zoptymalizować.

Interfejs ExecutorService

    private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) {
    	ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        for (int i = 0; i < THREAD_POOL_SIZE; i++) {
            executorService.execute(new CounterThread());
        }
        executorService.shutdown();
    }

Tym razem skorzystałem z rozszerzenia dla interfejsu Executor o nazwie ExecutorService. Poza metodą execute, możliwa jest tutaj obsługa wywołań asynchronicznych a także zamykanie wątków. Kolejnym usprawnieniem jest skorzystanie z puli wątków, pozwalającej na użycie więcej niż jednego wątku jednocześnie. Egzekutory typu newFixedThreadPool w swoim konstruktorze przyjmuje liczbę wątków, które będą wykorzystane do zakończenia zadania. Stała THREAD_POOL_SIZE jest obliczana na podstawie liczby dostępnych procesorów, dzięki czemu jeden procesor może pracować na jednym wątku. Dobrą praktyką jest zamknięcie całego serwisu po wykonaniu wszystkich operacji za pomocą metody shutdown.

Efekt na konsoli:

pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
pool-1-thread-1 : 10
...

Różne pule wątków

Aby zainicjalizować interfejs Executor lub ExecutorService można wykorzystać jedną z kilku dostępnych puli wątków:

Interfejs Callable

Czasami chcesz, aby wiele wątków pracowało nad obliczeniem jednego konkretnego rezultatu. Aby wykonać poprawnie taką operację równolegle, musisz podzielić ją na kilka odrębnych podzadań, a po ich zakończeniu połączyć wyniki wszystkich w jeden rezultat. W przypadku zwykłych zadań Runnable jest to niewykonalne, ponieważ metoda run nie zwraca żadnej wartości. Z pomocą posłuży Ci wtedy interfejs Callable.

public class CallableResult implements Callable<Integer>{

	@Override
	public Integer call() throws Exception {
		Thread.sleep(1000); // only for simulating complex calculation
		return new Random().nextInt(100000); // draw a random number
	}

}

Klasa CallableResult implementuje interfejs Callable, który jest bardzo podobny do interfejsu Runnable. Główną różnicą tutaj jest, że zamiast metody run, jest używana metoda call, która zwraca wartość zadeklarowanego typu. W moim przypadku będzie to wylosowana liczba typu Integer.

Egzekutory korzystające z submit i obiekty Future

    public static void main(String[] args) throws InterruptedException, ExecutionException {
    	ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    	int sum = 0;
        for (int i = 0; i < THREAD_POOL_SIZE; i++) {
            int partialResult = executorService.submit(new CallableResult()).get();
            System.out.println("Partial result for thread number: " + i  + " is " + partialResult);
            sum += partialResult;
        }
        executorService.shutdown();
        if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
            System.err.println("Error, thread didn't shutdown after " + TIMEOUT + " miliseconds.");
        }
        
        System.out.println("Final result is " + sum);
    }	

Następnie wykorzystuję do uruchomienia zadania metodę submit zamiast poprzedniej metody execute. Metoda submit pozwala na startowanie zadań zarówno używających interfejsu Runnable jak i Callable. Główną różnicą między submit a execute, jest to, że ta pierwsza zwraca obiekt klasy generycznej Future. Zwraca ona wynik obliczeń asynchronicznych. Obiekt Future funkcjonuje jako uchwyt do wyniku zadania asynchronicznego, dzięki czemu np. masz możliwość podejrzenia rezultatu poprzez metodę get.

Rezultat obliczeń (u Ciebie będzie on inny, bo liczby użyte w zadaniu są przecież losowe):

Partial result for thread number: 0 is 97577
Partial result for thread number: 1 is 1154
Partial result for thread number: 2 is 45138
Partial result for thread number: 3 is 15559
Partial result for thread number: 4 is 81590
Partial result for thread number: 5 is 12990
Partial result for thread number: 6 is 46964
Partial result for thread number: 7 is 29054
Final result is 330026

Shutdown vs shutdownNow

Na koniec warto przyjrzeć się sposobom, jakie egzekutory oferują, aby zamykać zadania:

Przykład użycia awaitTermination wraz z shutdown:

executorService.shutdown();
if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
     System.err.println("Error, thread didn't shutdown after " + 10000 + " miliseconds.");
}

Żadna z tych metod nie daje Ci 100% gwarancji, że wątek zakończy swoje działanie. Egzekutory nie różnią się tutaj od klasycznego korzystania z wątków. Jak już wiesz z poprzednich lekcji, programista może jedynie „poprosić” JVM, aby zabił dany wątek, jednak to do samej Javy należy decyzja, czy ta operacja zostanie wykonana czy nie.

*Wytłumaczenie różnic między kolejkami FIFO, LIFO i priorytetową: https://typeofweb.com/struktury-danych-kolejka-fifo-lifo-priorytetowa/

Kod z lekcji: https://github.com/developeronthego/java-advanced/tree/master/src/main/java/advanced/lesson13

Powtórka z podstaw o wielowątkowości: Java #53: wątki – przetwarzanie wielowątkowe

Exit mobile version