Divide et impera: spremere il processore con il framework Fork/Join

Il framework Fork/Join (F/J) è una piccola libreria che permette di parallelizzare gli algoritmi di tipo divide et impera. Introdotta con Java 7, (ma disponibile anche in Java 6) consente di sfruttare l’architettura multicore del processore per scomporre l’esecuzione un algoritmo in più parti che vengono elaborate contemporaneamente (fork) e ricombinarle alla fine (join) ottenendo un aumento significativo nelle prestazioni. Vediamo assieme come trarre vantaggio da questa nuova feature di Java 7.

Divide et impera

Fork/Join nasce per parallelizzare gli algoritmi di tipo divide et impera di cui facciamo un breve richiamo. Questa classe di algoritmi prevede che un problema “complesso” venga suddiviso in due o più sottoproblemi più piccoli (divide). Se anche questi sono ancora troppo grandi vengono suddivisi ulteriormente e il processo viene ripetuto fino ad ottenere un insieme di problemi risolvibili banalmente o per lo meno semplicemente (impera). Le soluzioni vengono poi ricombinate nel verso opposto alla scomposizione aggregando soluzioni via via più complesse fino ad ottenere il risultato del problema iniziale. Due classici esempi sono sicuramente QuickSort e MergeSort.

Un algoritmo divide et impera viene programmato “naturalmente” in maniera ricorsiva (possiamo sempre, poi, realizzare una versione iterativa più efficiente), dove:

  • ogni divisione corrisponde ad un passo di ricorsione;
  • la base di ricorsione corrisponde all’impera;
  • il ritorno dalla ricorsione permette di ricombinare il risultato.

A meno di lanciarsi nell’avventura di una implementazione fatta con i thread, tutto questo lavoro ad oggi viene svolto da un unico processore, lasciando gli altri a sonnecchiare o poco più. Uno spreco bello e buono sapendo che i moderni processori sono dotati di più core (non parleremo di hyper-threading perché per Java non fa alcuna differenza).

Fork/Join

Fork/Join entra in gioco a questo punto, aiutando a suddividere il carico di lavoro su più processori nella fase di divide. Dal punto di vista di chi utilizza F/J (cioè noi) l’idea di base è molto semplice. In F/J ogni problema da risolvere viene chiamato task e viene implementato come sottoclasse di java.util.concurrent.RecursiveTask. Quando il dato che il task deve elaborare è troppo grande, vengono creati nuovi task con un sottoinsieme dei dati iniziali e lanciati tramite il metodo fork. Da questo punto in poi, l’esecuzione diventa parallela. Terminato ogni sottotask, che ovviamente può essere ancora spezzato in unità più piccole, avviene la fase di join, dove il risultato di ogni sottotask è disponibile e viene combinato con gli altri per fornire la soluzione richiesta. Tutto qui.

Passiamo a vedere un po’ di codice che risolve un problema volutamente semplice. Abbiamo una lista di oggetti comparabili, vogliamo trovare quanti di questi sono uguali ad un oggetto di test. La classe Counter svolge questo lavoro senza riservarci sorprese.

package it.cosenonjaviste.testforkjoin;

import java.util.*;

public class Counter<T extends Comparable<T>> {
    private List<T> _list;
    private T _test;

    public Counter(List<T> list, T test) {
        _list = list;
        _test = test;
    }

    public int doWork() {
        int count = 0;
        for (int i = 0; i < _list.size(); i++) {
            if (_list.get(i).compareTo(_test) == 0) {
                count++;
            }
        }
        return count;
    }
}

Passiamo ora ad una versione ricorsiva della classe dove suddividiamo in due parti il problema. Per comodità, passiamo sempre tutta la lista di oggetti ma ne limitiamo l’accesso utilizzando degli indici.

package it.cosenonjaviste.testforkjoin;

import java.util.*;

public class CounterR<T extends Comparable<T>> {
  private List<T> _list;
  private T _test;
  private int _start;
  private int _end;

  public CounterR(List<T> list, T test, int start,
      int end) {
    _list = list;
    _test = test;
    _start = start;
    _end = end;
  }

  public CounterR(List<T> list, T test) {
    this(list, test, 0, list.size());
  }

  public Integer doWork() {
    if (_start == _end)
      return 0;

    if (_start + 1 == _end) {
      if (_list.get(_start).compareTo(_test) == 0)
        return 1;
      else
        return 0;
    } else {
      int size = (_end - _start) + 1;

      // split initial problem
      CounterR<T> c1 = new CounterR<T>(_list, _test,
          _start, _start + size / 2);
      CounterR<T> c2 = new CounterR<T>(_list, _test,
          _start + size / 2 + 1, _start + size - 1);
      return c1.doWork() + c2.doWork();
    }

  }
}

E’ venuto il momento di applicare F/J. A questo punto faremo in modo che CounterR estenda la classe di framework RecursiveTask: chiamiamo CounterFJ questa estensione e ne implementiamo il metodo astratto compute. In questo metodo, che sostituisce doWork, creeremo nuove istanze di CounterFJ che lavoreranno solo su di una porzione del problema; come prima, passiamo tutta la lista di input ma ne limitiamo l’accesso utilizzando degli indici.

Per effettuare il divide in parallelo, utilizzeremo il metodo fork su ogni istanza di CounterFJ. Una volta che tutti i sotto task sono terminati, usando il metodo join otteremmo il conteggio delle occorrenze che andremo a sommare per dare il risultato finale.

package it.cosenonjaviste.testforkjoin;

import java.util.*;
import java.util.concurrent.*;

public class CounterFJ<T extends Comparable<T>>
    extends RecursiveTask<Integer> {

  private static final int THRESHOLD = 250;
  private static final long serialVersionUID = 5129597438543076860L;
  private List<T> _list;
  private T _test;
  private int _start;
  private int _end;

  public CounterFJ(List<T> list, T test, int start,
      int end) {
    _list = list;
    _test = test;
    _start = start;
    _end = end;
  }

  public CounterFJ(List<T> list, T test) {
    this(list, test, 0, list.size());
  }

  @Override
  protected Integer compute() {

    if (_end - _start < THRESHOLD) {
      int count = 0;
      for (int i = _start; i < _end; i++) {
        if (_list.get(i).compareTo(_test) == 0) {
          count++;
        }
      }
      return new Integer(count);
    }

    int size = (_end - _start) + 1;

    // split initial problem in two
    CounterFJ<T> subTask1 = new CounterFJ<T>(_list,
        _test, _start, _start + size / 2);
    CounterFJ<T> subTask2 = new CounterFJ<T>(_list,
        _test, _start + size / 2 + 1, _start + size
            - 1);

    subTask1.fork();
    subTask2.fork();

    Integer count = subTask1.join() + subTask2.join();

    return count;
  }
}

Come prima, ci siamo limitati a suddividere la lista in due parti soltanto, tuttavia è possibile realizzare una versione un po’ più sofisticata che permetta di parametrizzare anche il fattore di suddivisione. A differenza dell’esempio ricorsivo però, fissiamo una soglia per la dimensione del problema (threshold) sotto la quale non ha senso continuare a generare nuovi task perché il costo del fork supera quello della risoluzione immediata. Se invece la dimensione del problema è abbastanza grande, costruiamo i sotto-task suddividendo l’input. Messi da parte i task, invochiamo il fork su ciascuno e, al termine dell’esecuzione parallela, recuperiamo il risultato tramite join. La chiamata al task iniziale è presentata immediatamente qui sotto:

Long test = 42L;
CounterFJ<Long> counter = new CounterFJ<>(list,test);

ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println("Occurrences:" + counter.join());

La cosa che salta subito all’occhio è l’utilizzo della classe ForkJoinPool che è l’entry point del framework. Tramite il metodo invoke, andiamo ad eseguire il RecursiveTask di partenza che restituirà il risultato dell’esecuzione sempre tramite il metodo join. ForkJoinPool ha anche un costruttore alternativo che permette di specificare il grado di parallelismo che si vuole raggiungere. Qualora si utilizzi il costruttore di default, viene utilizzato il numero di processori che la JVM ha a disposizione.

E’ possibile specificare un grado di parallelismo superiore al numero di processori disponibili, tuttavia questo comporterà un degrado delle prestazioni tale da essere peggiore dell’esecuzione senza Fork/Join.

Le prestazioni

Non è interesse di questo post fare uno studio approfondito delle prestazioni di F/J, anche perché i lettori più curiosi possono trovare molte più informazioni qui, piuttosto vogliamo dare un’idea qualitativa del miglioramento di prestazioni con il grafico presentato qui sotto. Sull’asse Y si trova il tempo di esecuzione della ricerca in millisecondi, sull’asse X il numero di elementi della lista. Sono presentati tre grafici: senza F/J (verde), usando due core (rosso) e utilizzando quattro core (blu). L’esecuzione è stata ripetuta più volte sullo stesso set di dati generato casualmente all’inizio della batteria di test, incrementando gradualmente la dimensione del problema.
Prestazioni Fork/Join

Alcune avvertenze

Fork/Join è molto semplice da usare, ma bisogna avere prestare attenzione ad alcuni aspetti a cui abbiamo già accennato ma che qui riepiloghiamo:

  • non specificare un grado di parallelismo maggiore del numero di processori disponibili, altrimenti vengono vanificati i vantaggi del framework;
  • prevedere una soglia sotto la quale non generare più task, altrimenti l’overhead di gestione del parallelismo è più grande del benificio acquisito;
  • i task non devono essere bloccati su operazioni di I/O, Fork/Join è stato pensato per algoritmi che fanno pesante uso della CPU, non per sostituirsi alla gestione multithreaded di cui costituiscono piuttosto un complemento.

Dietro le quinte

Ma come funziona internamente F/J? Diamo una spiegazione semplificata lasciando ai più volenterosi l’approfondimento. Ogni thread allocato dal ForkJoinPool dispone di una coda doppia cioè che supporta sia operazioni FIFO che LIFO. In questa coda, detta deque, vengono accodati i task da eseguire e che sono estratti con politica LIFO. All’inizio dell’esecuzione, un solo thread accoda il primo task e lo esegue. Se il thread incontra un fork durante l’esecuzione, accoda i nuovi task nella propria coda e li processa estraendo i più recenti dalla coda. Se invece viene incontrata un’operazione di join, il thread processa gli altri task, se ce ne sono, fino a che è possibile continuare l’operazione di join. Qualora un thread non abbia task da eseguire, prende un task dalla coda di un altro thread estraendolo dal fondo della coda (FIFO). Quindi il procedimento si ripete fino a che tutti i thread non abbiano svuotato la propria coda. A seguito di questa strategia si afferma che F/J è basato sul work-stealing, concetto mutuato direttamente da un altro framework chiamato Cilk. E’ possibile dimostrare facilmente che in F/J i task si spostano di continuo da un thread all’altro ma per rendere più evidente questo aspetto abbiamo preparato un piccolo programma che ne da dimostrazione. Due screenshot estratti da una esecuzione e qui sotto illustrati permettono di interpretare più agevolmente il video.

Screenshot 1
fj-passo1

Nell’istante rappresentato nel primo screenshot, un solo thread è in esecuzione e il suo deque contiene tre task, i numeri tra parentesi rappresentano la porzione di array che viene esaminata. Si deduce quindi che l’array iniziale è stato diviso in due parti.

Screenshot 2
fj-passo2

Successivamente (secondo screenshot) altri tre thread nel pool “rubano” i task dal deque del primo. Il task spostato viene identificato con un rettangolino sul lato destro dello stesso colore del thread che lo ha preso. Il rettangolino a sinistra rappresenta invece il thread di partenza. Un task di colore “pieno” indica che il task non è ancora stato completato, un colore più chiaro indica nuovamente che il task è stato spostato, mentre il task viene colorato di bianco se la sua elaborazione è stata completata.

Per coloro che volessero vedere un esempio di esecuzione è disponibile un piccolo video dove avvengono diversi “furti” di task tra un thread e l’altro. Quelli che invece vogliono smanettare sui parametri e vedere il comportamento del framework possono scaricare il sorgente della applicazione.

Conclusioni

Fork/Join è un framework molto semplice ma contemporaneamente molto potente. Sicuramente sarà di aiuto per ottimizzare quelle operazioni che stressano la CPU e ridurne il tempo di esecuzione. Tuttavia, anche se non avete l’esigenza di andare a fare ottimizzazioni così specifiche del codice, F/J lavorerà comunque per voi in futuro perché l’esecuzione parallela delle lambda expression di cui abbiamo parlato tempo fa utilizza proprio questo framework.

Per approfondire

Giampaolo Trapasso

Sono laureato in Informatica e attualmente lavoro come Software Engineer in Databiz Srl. Mi diverto a programmare usando Java e Scala, Akka, RxJava e Cassandra. Qui mio modesto contributo su StackOverflow e il mio account su GitHub