Java zaawansowane #13: pule wątków (egzekutory)

W poprzednich lekcjach wątki były odpalane za pomocą odpalania za każdym razem osobnej instancji. Nie jest to jednak bardzo wygodny sposób. Wyobraź sobie, że musisz ten sam wątek odpalić tysiąc razy! Wymagałoby to za każdym razem tworzenia nowego obiektu i wywoływania na nim metody start. Jest jednak dużo wygodniejszy sposób, mianowicie skorzystanie z puli wątków.

CounterThread firstCounter = new CounterThread();
firstCounter.start();
CounterThread secondCounter = new CounterThread();
secondCounter.start();
CounterThread thirdCounter = new CounterThread();
thirdCounter.start();

Powyższy kod powinien już być Ci znany z poprzednich lekcji. Teraz postaram się go zmodyfikować, używając do tego dedykowanych struktur.

Interfejs Executor

public class ExecutorRunner {
    public static void main(String[] args) {
    	Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(new CounterThread());
    }
}

W metodzie main zadeklarowałem interfejs Executor, który następnie za pomocą metody execute, przydzielam mu zadanie do wykonania (w tym przypadku jest to odpalenie wątku klasy CounterThread). Zastosowano tutaj egzekutor z pojedynczym wątkiem (newSingleThreadExecutor). Dokładnie omawiam go pod koniec lekcji (rodzaje puli wątków). Taki kod powinien pozwolić Ci poprawnie odpalić Twój wątek, jednak możesz go jeszcze bardziej zoptymalizować.

Interfejs ExecutorService

    private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) {
    	ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        for (int i = 0; i < THREAD_POOL_SIZE; i++) {
            executorService.execute(new CounterThread());
        }
        executorService.shutdown();
    }

Tym razem skorzystałem z rozszerzenia dla interfejsu Executor o nazwie ExecutorService. Poza metodą execute, możliwa jest tutaj obsługa wywołań asynchronicznych a także zamykanie wątków. Kolejnym usprawnieniem jest skorzystanie z puli wątków, pozwalającej na użycie więcej niż jednego wątku jednocześnie. Egzekutor newFixedThreadPool w swoim konstruktorze przyjmuje liczbę wątków, które będą wykorzystane do zakończenia zadania. Stała THREAD_POOL_SIZE jest obliczana na podstawie liczby dostępnych procesorów, dzięki czemu jeden procesor może pracować na jednym wątku. Dobrą praktyką jest zamknięcie całego serwisu po wykonaniu wszystkich operacji za pomocą metody shutdown.

Efekt na konsoli:

pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
pool-1-thread-1 : 10
...

Różne pule wątków

Aby zainicjalizować interfejs Executor lub ExecutorService można wykorzystać jedną z kilku dostępnych puli wątków:

  • FixedThreadPool – najczęściej używana pula wątków. Stara się zaangażować jak największą liczbę wątków aby ukończyć zadanie, aż do osiągnięcia maksymalnego rozmiaru puli, a następnie będzie utrzymywać jej rozmiar (gdy jeden wątek zakończy swoje działanie, utworzy kolejny).
  • SingleThreadExecutor – egzekutor jednowątkowy tworzy pojedynczego procesu roboczego wątek do przetwarzania zadań, zastępując go, jeśli nieoczekiwanie umiera. Gwarantuje, że zadania będą przetwarzane sekwencyjnie zgodnie z kolejnością ich (FIFO, LIFO, kolejka priorytetowa*).
  • ChachedThreadPool – pula, która ma większą elastyczność w zakresie wykorzystania bezczynnych wątków. Jest szczególnie przydatna, gdy chcesz wykonać rozsądną liczbę krótkich zadań lub z zadaniami, które większość czasu oczekują na rezultat innego. Egzekutor ten stara się przydzielać nowe wątki do zadań, jeśli to jest tylko możliwe, jednocześnie kasując te, które przez długi czas są bezczynne.
  • ScheduledThreadPool – pula wątków o stałym rozmiarze, która obsługuje opóźnione i okresowe wykonywanie zadań.

Interfejs Callable

Czasami chcesz, aby wiele wątków pracowało nad obliczeniem jednego konkretnego rezultatu. Aby wykonać taką operację równolegle, musisz podzielić ją na kilka odrębnych podzadań, a po ich zakończeniu połączyć wyniki wszystkich w jeden rezultat. W przypadku zwykłych zadań Runnable jest to niewykonalne, ponieważ metoda run nie zwraca żadnej wartości. Z pomocą posłuży Ci wtedy interfejs Callable.

public class CallableResult implements Callable<Integer>{

	@Override
	public Integer call() throws Exception {
		Thread.sleep(1000); // only for simulating complex calculation
		return new Random().nextInt(100000); // draw a random number
	}

}

Klasa CallableResult implementuje interfejs Callable, który jest bardzo podobny do interfejsu Runnable. Główną różnicą tutaj jest, że zamiast metody run, jest używana metoda call, która zwraca wartość zadeklarowanego typu. W moim przypadku będzie to wylosowana liczba typu Integer.

Metoda submit i obiekty Future

    public static void main(String[] args) throws InterruptedException, ExecutionException {
    	ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    	int sum = 0;
        for (int i = 0; i < THREAD_POOL_SIZE; i++) {
            int partialResult = executorService.submit(new CallableResult()).get();
            System.out.println("Partial result for thread number: " + i  + " is " + partialResult);
            sum += partialResult;
        }
        executorService.shutdown();
        if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
            System.err.println("Error, thread didn't shutdown after " + TIMEOUT + " miliseconds.");
        }
        
        System.out.println("Final result is " + sum);
    }	

Następnie wykorzystuję do uruchomienia zadania metodę submit zamiast poprzedniej metody execute. Metoda submit pozwala na startowanie zadań zarówno używających interfejsu Runnable jak i Callable. Główną różnicą między submit a execute, jest to, że ta pierwsza zwraca obiekt klasy generycznej Future. Zwraca ona wynik obliczeń asynchronicznych. Obiekt Future funkcjonuje jako uchwyt do wyniku zadania asynchronicznego, dzięki czemu np. masz możliwość podejrzenia rezultatu poprzez metodę get.

Rezultat obliczeń (u Ciebie będzie on inny, bo liczby użyte w zadaniu są przecież losowe):

Partial result for thread number: 0 is 97577
Partial result for thread number: 1 is 1154
Partial result for thread number: 2 is 45138
Partial result for thread number: 3 is 15559
Partial result for thread number: 4 is 81590
Partial result for thread number: 5 is 12990
Partial result for thread number: 6 is 46964
Partial result for thread number: 7 is 29054
Final result is 330026

Shutdown vs shutdownNow

Na koniec warto przyjrzeć się sposobom, jakie pule wątków oferują, aby zamykać zadania:

  • shutdown – inicjuje uporządkowane zamknięcie wcześniej przesłanych zadań (ang. submitted), zgodnie z momentem, kiedy zostały uruchomione. Gwarantuje także, że żadne nowe zadania nie będą już przyjmowane.
  • shutdownNow – próbuje zatrzymać wszystkie aktywnie wykonywane zadania, zatrzymuje przetwarzanie oczekujących zadań i zwraca listę zadań oczekujących na wykonanie. Ta implementacja anuluje zadania używając Thread.interrupt, więc każde zadanie, które nie reaguje na próbę przerwania, może nigdy nie zostać zakończone.
  • awaitTermination – jest to czasowa blokada, która przez określony czas czeka na zakończenie pracy już wystartowanych zadań. Używasz jej po wywołaniu metody shutdown. Po przekroczenia limitu czasu lub przerwania bieżącego wątku, w zależności od tego, co nastąpi wcześniej, blokada jest zwalniana.

Przykład użycia awaitTermination wraz z shutdown:

executorService.shutdown();
if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
     System.err.println("Error, thread didn't shutdown after " + 10000 + " miliseconds.");
}

Żadna z tych metod nie daje Ci 100% gwarancji, że wątek zakończy swoje działanie. Jak już wiesz z poprzednich lekcji, programista może jedynie „poprosić” JVM, aby zabił dany wątek, jednak to do samej Javy należy decyzja, czy ta operacja zostanie wykonana czy nie.

*Wytłumaczenie różnic między kolejkami FIFO, LIFO i priorytetową: https://typeofweb.com/struktury-danych-kolejka-fifo-lifo-priorytetowa/

Kod z lekcji: https://github.com/developeronthego/java-advanced/tree/master/src/main/java/advanced/lesson13

Może Ci się również spodoba

1 Odpowiedź

  1. Caleb pisze:

    Całkiem spoko wyjaśnione.. dzięki

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *