Potenza e controllo
con le Parallel Libraries
Twitter: @raffaeler
Email: [email protected]
Articoli e codice: http://www.iamraf.net
Blog: http://blogs.ugidotnet.org/raffaele
Profilo MVP: https://mvp.support.microsoft.com/profile/raffaele
Massima performance possibile
• La legge di Amdahls dice che si può ottenere al massimo un
guadagno di
– 1/[percentuale spesa nell'esecuzione sequenziale]
• Tutto quindi dipende dalla suddivisione in task
– In pratica ogni task è l'unità minima di lavoro sequenziale
• In assenza di I/O il mapping 1 task per ogni core permette usare al
massimo le cpu
• Numero ottimale di thread = core / percentuale media di utilizzo
per task
– Se un task impegna la cpu al 50%
– Se ho a disposizione 4 core
– Il numero di thread ottimale è 8
Il Parallel Computing firmato Microsoft
Data Partitioning
Query
Managed
PLinq
Standard Query
Operators
TPL
(Task Parallel
Library)
Data Merging
Algoritmi
Strutture dati di
coordinazione
Nativo
Concurrency
Runtime
PPL
(Parallel Patterns
Library)
Agents Library
I Task
• Disaccoppia il concetto di esecuzione dal thread
– Il Task può essere mappato su Thread o su Fiber
– Un Thread può eseguire più Task
• Ogni task è esattamente un'istanza della classe Task
• Sono in System.Threading.Tasks (mscorlib)
• I delegate usati da Task
– Se non tornano risultati: Action o Action<object>
– Se tornano risultati: Func<T> o Func<object, T>
• La proprietà Result fornisce accesso al risultato
• Espone le proprietà ID e Status
• Espone i metodi WaitAny, WaitAll
Lo scheduler
• Usano un 'managed user mode scheduler'
– Integrato all'interno del ThreadPool standard
• Scalabilità automatica
– Lo scheduler a runtime decide il numero di core da
utilizzare
– Il developer ha comunque modo di limitarli
• È possibile fornire un custom Scheduler
• È possibile avviare un task nel contesto del thread
corrente (sincrono)
Task
using(ManualResetEvent mre1 = new ManualResetEvent(false))
using(ManualResetEvent mre2 = new ManualResetEvent(false))
using(ManualResetEvent mre3 = new ManualResetEvent(false))
{
ThreadPool.QueueUserWorkItem(delegate { A(); mre1.Set(); } );
ThreadPool.QueueUserWorkItem(delegate { B(); mre2.Set(); } );
ThreadPool.QueueUserWorkItem(delegate { C(); mre3.Set(); } );
WaitHandle.WaitAll(new WaitHandle[]{mre1, mre2, mre3});
}
Task t1 = Task.Factory.StartNew(delegate { A(); });
Task t2 = Task.Factory.StartNew(delegate { B(); });
Task t3 = Task.Factory.StartNew(delegate { C(); });
Task.WaitAll(t1, t2, t3);
==
Task t1 = Task.Factory.StartNew(() => A());
Task t2 = Task.Factory.StartNew(() => B());
Task t3 = Task.Factory.StartNew(() => C());
Task.WaitAll(t1, t2, t3);
Continuations
• Permettono di decidere l'ordine temporale di
esecuzione di Task e Task<T>
– Utile quando una esecuzione necessita il risultato di una
esecuzione precedente
– Non viene garantito che il thread di esecuzione sia sempre
lo stesso
var res = Task<int>.Factory.StartNew(
() => A(10))
.ContinueWith<int>(a => B(a.Result))
.ContinueWith<int>(b => C(b.Result));
AggregateException
• L'esecuzione parallela può implicare eccezioni contemporanee
• La TPL aggrega le eccezioni in AggregateException
– Questo significa che la migrazione di codice esistente non è trasparente
(le catch devono essere rivedute)
– L'esecuzione non continua e riporta solo le eccezioni contemporanee
– La collection InnerExceptions contiene le eccezioni con lo Stack Trace
try
{
Task.WaitAll(t1, t2);
}
catch(AggregateException err)
{
foreach(var e in err.InnerExceptions)
{
Console.WriteLine("Catched! " + e.Message);
}
throw err.Flatten();
}
try
{
Task.WaitAll(t1, t2, t3);
}
catch(AggregateException err)
{
err.Handle(e =>
{
Console.WriteLine("Catched {0}: {1}",
e.GetType().FullName, e.Message);
return true; // handled
});
}
Parallel.For, Parallel.ForEach,
Parallel.Invoke
for (int i = 0; i < N; i++)
{
results[i] = Compute(i);
}
Parallel.For(0, N, i =>
{
results[i] = Compute(i);
} );
foreach(MyClass c in data)
{
Compute(c);
}
Parallel.ForEach(data, c =>
{
Compute(c);
} );
static void WalkTree<T>(
Tree<T> tree, Action<T> func)
{
if (tree == null) return;
WalkTree(tree.Left, func);
WalkTree(tree.Right, func);
func(tree.Data);
}
static void WalkTree<T>(
Tree<T> tree, Action<T> func)
{
if (tree == null) return;
Parallel.Invoke(
() => WalkTree(tree.Left, func) ,
() => WalkTree(tree.Right, func) ,
() => func(tree.Data) );
}
Parallel.For - break e stop
• ParallelLoopState permette di invocare Break e Stop
– Break permette un'interruzione 'soft'
• Le operazioni in esecuzione dal task da cui viene eseguito Break (o
i task child) verranno completate
• LowestBreakIteration è il minimo indice al di sotto del quale tutti i
risultati sono validi
– LowestBreakIteration è un nullable type
– Stop interrompe totalmente l'esecuzione
• A fine loop LowestBreakIteration.HasValue è false
Cancellation
• CancellationTokenSource e CancellationToken
servono a informare i Task che devono interrompere
le operazioni
• La segnalazione ad un Task parent viene propagata
ad i Task Child per tutta la gerarchia
• Il Task deve essere collaborativo e controllare
periodicamente la richiesta di annullamento
PLINQ
PARALLEL LINQ
PLinq
• Permette di eseguire query parallele le query verso
oggetti e XML
• NON serve a parallelizzare Linq to SQL o l'entity
framework
– È comunque possibile eseguire query parallele sui risultati
ottenuti con queste tecnologie
• Il concetto base consiste nel "partizionare" porzioni
di una query
– Ogni thread calcola un pezzo e lo aggiunge al risultato
PLinq
• ParallelEnumerable è la classe che contiene gli
extension methods con le implementazioni parallele
di Linq
– Count, First, Last, Distinct, Join, Average, Aggregate, …
Operatori specifici di PLinq
•
AsParallel
–
•
AsSequential
–
•
Per forzare l'esecuzione parallela anche quando verrebbe eseguita sequenzialmente dal runtime
ForAll
–
•
Suggerimenti su come eseguire il merge dei risultati paralleli parziali
WithExecutionMode
–
•
Numero massimo di Core da usare nella query
WithMergeOptions
–
•
Si richiede di monitorare il flag di annullamento
WithDegreeOfParallelism
–
•
La query deve rispettare / non rispettare l'ordine della sequenza sorgente
WithCancellation
–
•
Il resto della query deve essere eseguita in modo sequenziale invece di parallelo
AsOrdered, AsUnordered
–
•
Parallelizzare il resto della query, se possibile
Permette di fruire in modo parallelo il risultato senza necessità di eseguire il merge sul thread che ha avviato
la query
Aggregate
–
Permette l'aggregazione parziale dei risultati mantenuti in partizioni specifiche al TLS
Esecuzione parallela
• L'uso di AsParallel non implica che l'esecuzione
avvenga in modo parallelo
• PLinq analizza la query e decide di parallelizzarla solo
se la ritiene "safe"
• WithExecutionMode e WithDegreeOfParallelism
sono molto utili per testare il guadagno al crescere
dei core
– In produzione è preferibile lasciare a PLinq la decisione del
numero di core da usare
Query ordering in PLinq
• Le query parallele sono eseguite a pezzi in thread
diversi
– Ogni volta l'ordine del risultato può essere differente
Thread 2
Thread 1
Thread 4
Risultato
t
Thread 3
linea temporale
3
2
1
4
Query ordering in PLinq
• Possibilmente non fare affidamento sull'ordine del
risultato
– I database fanno la stessa cosa
• Se è richiesto un risultato ordinato
– Se possibile, ordinarlo dopo la query e non prima
Array.Sort(a);
var q = from x in a.AsParallel() ...
var q = from x in a.AsParallel() ...
Array.Sort(q);
– Diversamente specificare la clausola AsOrdered
Array.Sort(a);
var q = from x in a.AsParallel().AsOrdered() ...
– Oppure usare la clausola OrderBy
Array.Sort(a);
var q = from x in a.AsParallel().OrderBy(...) ...
Cancellation
• Si usa WithCancellation
• CancellationToken e CancellationTokenSource
funzionano in modo identico a quanto già visto nei
Task
COLLECTION PARALLELE
Collection in PFX
System.Collections.Concurrent
• PFX offre diverse collection di dati tra cui:
– ConcurrentBag<T>
• Lista non ordinata di oggetti
– ConcurrentDictionary<TKey, TValue>
• Dizionario
– BlockingCollection<T>
• Ottimizzata per gli scenari Producer-Consumer
• Implementa IEnumerable<T>, ICollection<T> e IDisposable
– ConcurrentQueue<T> (analoga a Queue<T>)
• Una lista FIFO concorrente
• bool TryDequeue(out T result) e bool TryPeek(out T result)
– ConcurrentStack<T> (analoga a Stack<T>)
• Una list LIFO concorrente
• bool TryPop(out T result) e bool TryPeek(out T result)
• In particolare le Queue sono fondamentali per smistare dei lavori su un
numero fisso di worker thread
Classi 'Slim'
• Sono classi 'leggere' cioè che non fanno uso di risorse
native, con grande beneficio in termini di
performance
– ManualResetSlim
– SemaphoreSlim
– CountDownEvent (usa ManualResetSlim internamente)
• In sostanza non devono eseguire la transizione
User‒Mode  Kernel‒Mode
TIPS & TRICKS
Task lunghi o corti?
• TaskCreationOptions.LongRunning
– usato per dire allo scheduler di incrementare il numero di
thread (es: i task sono impegnati in I/O)
• Se un task dura più di 500ms entra in gioco il
meccanismo di "starvation detection"
– è una strategia per evitare potenziali deadlock
• Difendersi dallo starvation:
– Task più piccoli possibile
– Custom scheduler
– ThreadPool.SetMaxThreads(Environment.ProcessorCount)
Parallelizzare: problemi e soluzioni
• Per la gran parte dei casi rendere parallelo è problematico
(the free lunch ...)
• Problema 1: lo stato condiviso tra più thread è un freno alla
scalabilità e frutto di errori da incubo
• Problema 2: molti algoritmi sono intrinsecamente seriali e la
loro versione parallela (se esiste) spesso è radicalmente
differente
• Problema 3: i tradizionali sistemi di unit testing sono
totalmente inefficaci perché le modalità di esecuzione
dipendono dall'hardware e da altri fattori
PARALLEL PATTERNS
Una linea guida
• Scomporre i problemi in piccoli task
– La dimensione conta!
– Se ogni 'task' è troppo piccolo, l'overhead di gestione del task
finisce per penalizzare la sua esecuzione
• Coordinare l'esecuzione dei task
– Strettamente dipendente da come funziona l'algoritmo
• Condividere i dati necessari all'esecuzione
– Minore dipendenza da dati condivisi implica maggiori
performance
– La condivisione implica l'accesso in modo serializzato alla risorse
condivise
– Alcune tecniche minimizzano le implicazioni di performance
Parallel Loop Pattern
• È applicabile quando ogni step del loop non richiede i
dati precedenti
• Strumenti:
– Parallel.For
– Parallel.ForEach
– Parallel (PLinq)
Parallel Task Pattern
• Si possono isolare delle unità di lavoro distinte e con
dipendenze ben definite?
• Le dipendenze sono limitate?
• Se si verificano queste condizioni è possibile risolvere
creando tanti task quante sono le unità di lavoro
• Strumenti:
– Task, Task<…>
Parallel Aggregation Pattern
• Esiste qualche forma di aggregazione sul risultato dei
dati?
• Ogni step dell'elaborazione produce una parte del
risultato?
• Se si verificano queste condizioni si usano gli
strumenti di aggregazione di PLinq
• Strumenti:
– PLinq
• (from x in sequence.AsParallel() select f(x) )
.Sum();
Futures Pattern / Task Graph Pattern
• L'ordine di esecuzione di ogni step di elaborazione
dipende dal flusso dei dati?
• Si può immaginare il progresso dell'elaborazione con
un grafo dipendente dai dati?
• Se si verificano queste condizioni, è opportuno usare
le Continuations
• Strumenti:
– Task e Continuations
Dynamic Task Parallelism Pattern
• La suddivisione del problema in step è dinamica?
• L'algoritmo prevede l'uso di strutture ricorsive
durante l'elaborazione (es: grafi)?
• Se si tratta di percorrere un grafo, la soluzione si può
trovare in Parallel.Invoke e ForEach
• Strumenti:
– Parallel.*
Domande ?
Scarica

Parallel Patterns