Nachdem hier die letzten Tage die Änderungen aus dem Project Coin von Java 7 vorgestellt wurden, möchte ich heute auf das Fork / Join Framework eingehen. Eine Neuerung die das Mutli Threading vereinfachen soll.
Genauer gesagt ist das fork/join Framework eine Implementierung des ExecutorService Interfaces, welches beim Abarbeiten von eines Tasks die Verteilung dessen auf mehrere Prozessoren erleichtern soll. Dabei wird ein aufwendiger Task in immer kleinere Teile aufgebrochen bis die Teile klein genug sind um sie zu verarbeiten. Diese Teile werden dann über einen Thread Pool abgearbeitet und schlussendlich wieder zusammengesetzt zu einem fertigen Ergebnis.
Der oben genannte Vorgang hier noch verpackt in Pseudocode
if (aufgabe klein genug) erledige aufgabe else teile die aufgabe in 2 gleich große teile starte verarbeitungsthread der 2 aufgaben und vereine die ergebnisse
Eigentlich recht simpel, nun wollen wir aber noch auf den Code dazu blicken.
Einfaches Beispiel
Nehmen wir an wir haben folgende Datenklasse
public class user{ String name Map salesPerYear //... }
In unserem System sind über 50.000 Kunden erfasst. Nun wollen wir den maximalen Umsatz den uns ein Kunde im Jahr 2010 beschert hat ermitteln.
Serielle Implementierung
Wenn wir nun ganz simpel jeden Kunden nach einander durchiterieren und deren Umsatz vergleichen schafft man damit einen lang werkenden Task.
User max = userList.get(0); for(User u: userList){ if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){ max = u; } } return max;
Zwar wird der Task fertig, doch je nach Datenanzahl ist die Verarbeitungsgeschwindigkeit nicht gerade berauschend.
Fork/Join Implementierung
Die selbe Aufgabe mit dem Rekursiven Ansatz des Fork/Join Frameworks sieht zwar nach mehr Code aus, doch sollte wesentlich performanter verarbeitet werden.
public class MaxSaleTask extends RecursiveAction { private List<User> userList; public User max; private int sThreshold = 10000; public MaxSaleTask(List<User> userList){ this.userList = userList; this.max = userList.get(0); //vereinfacht } protected void computeDirectly(){ for(User u: userList){ if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){ max = u; } } } protected void compute(){ int listSize = userList.size(); if(listSize < sThreshold){ computeDirectly(); } else { int split = listSize / 2; MaxSaleTask left = new MaxSaleTask(userList.subList(0,split)); MaxSaleTask right = new MaxSaleTask(userList.subList(split,listSize)); invokeAll(left,right); if(left.max.getSalesForYear(2010) > right.max.getSalesForYear(2010)){ max = left.max; } else { max = right.max; } } } }
Damit ein Task im Fork/Join Framework verarbeitet werden kann muss er eine RecursiveAction erweitern. Diese gibt eine compute() Methode zum erweitern vor, in der der Split ausprogrammiert werden muss. Ebenfalls stellt sie eine invokeAll Methode zur Verfügung, die etwaige weitere Tasks startet.
Bei unserem Beispiel mit über 50.000 Kunden gehen wir mal von exakt 53.432 aus und rechnen uns die Taskanzahl aus
1. Task mit 53432 Kunden startet 2 neue Tasks
2.-3. Task mit je 26716 Kunden starten je 2 neue Tasks
4.-7. Task mit je 13358 Kunden starten wiederum je 2 neue Tasks
8.-15. Task mit je 6679 Kunden fallen nun unter die Schranke sThreshold und werden direkt bearbeitet.
Sie geben das Resultat zurück an die Tasks 4-7, diese vereinen das Ergebnis geben es an 2 und 3 zurück, diese wiederum an den 1. Task, der am Schluss das Resultat, den max User zur Verfügung stellt.
Die Wahl der Schranke ist hierbei für die Performance entscheidend. Wählt man die Schranke zu hoch arbeiten zwar weniger Tasks, diese dafür aber länger, ist die Schranke zu niedrig ergeben sich eine enorme Anzahl kleinen Tasks und der Verwaltungsoverhead wird zu groß und Fressen den Vorteil der paralellen Verarbeitung wieder auf. Eine direkte Empfehlung seitens der Entwickler gibt es verständlicher weise nicht, kann sich diese Schranke je nach Art der Aufgabe unterschiedlich auswirken. Zur Wahl der Schanke muss man somit seinen Code profilen. Dabei sollte man nicht zwingend zu penibel sein, es reicht aus eine zu kleine oder zu große Schranke zu vermeiden.
Um diesen Prozess nun überhaupt anstoßen zu können benötigt man einen ForkJoinPool, dem man den initialen Task übergibt und diesen dann anstößt.
ForkJoinPool pool = new ForkJoinPool(); MaxSaleTask mst = new MaxSaleTask(userList); pool.invoke(mst); return mst.max;
Will man im Gegensatz zu dem hier gezeigten Beispiel ein Result per Return Aufruf zurückliefern muss man statt der RecursiveAction einen RecurisveTask erweitern.
Happy Coding
Quellen:
Java Fork Paper http://gee.cs.oswego.edu/dl/papers/fj.pdf
Fork and Join: Java Can Excel at Painless Paralell Programming Too! http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Fork Join Tutorial http://download.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
Schreibe einen Kommentar