Potenza e controllo con le Parallel Libraries Twitter: @raffaeler Email: [email protected] Articoli e codice: http://www.iamraf.net Blog: http://blogs.ugidotnet.org/raffaele Profilo MVP: https://mvp.support.microsoft.com/profile/raffaele Massima performance possibile • La legge di Amdahls dice che si può ottenere al massimo un guadagno di – 1/[percentuale spesa nell'esecuzione sequenziale] • Tutto quindi dipende dalla suddivisione in task – In pratica ogni task è l'unità minima di lavoro sequenziale • In assenza di I/O il mapping 1 task per ogni core permette usare al massimo le cpu • Numero ottimale di thread = core / percentuale media di utilizzo per task – Se un task impegna la cpu al 50% – Se ho a disposizione 4 core – Il numero di thread ottimale è 8 Il Parallel Computing firmato Microsoft Data Partitioning Query Managed PLinq Standard Query Operators TPL (Task Parallel Library) Data Merging Algoritmi Strutture dati di coordinazione Nativo Concurrency Runtime PPL (Parallel Patterns Library) Agents Library I Task • Disaccoppia il concetto di esecuzione dal thread – Il Task può essere mappato su Thread o su Fiber – Un Thread può eseguire più Task • Ogni task è esattamente un'istanza della classe Task • Sono in System.Threading.Tasks (mscorlib) • I delegate usati da Task – Se non tornano risultati: Action o Action<object> – Se tornano risultati: Func<T> o Func<object, T> • La proprietà Result fornisce accesso al risultato • Espone le proprietà ID e Status • Espone i metodi WaitAny, WaitAll Lo scheduler • Usano un 'managed user mode scheduler' – Integrato all'interno del ThreadPool standard • Scalabilità automatica – Lo scheduler a runtime decide il numero di core da utilizzare – Il developer ha comunque modo di limitarli • È possibile fornire un custom Scheduler • È possibile avviare un task nel contesto del thread corrente (sincrono) Task using(ManualResetEvent mre1 = new ManualResetEvent(false)) using(ManualResetEvent mre2 = new ManualResetEvent(false)) using(ManualResetEvent mre3 = new ManualResetEvent(false)) { ThreadPool.QueueUserWorkItem(delegate { A(); mre1.Set(); } ); ThreadPool.QueueUserWorkItem(delegate { B(); mre2.Set(); } ); ThreadPool.QueueUserWorkItem(delegate { C(); mre3.Set(); } ); WaitHandle.WaitAll(new WaitHandle[]{mre1, mre2, mre3}); } Task t1 = Task.Factory.StartNew(delegate { A(); }); Task t2 = Task.Factory.StartNew(delegate { B(); }); Task t3 = Task.Factory.StartNew(delegate { C(); }); Task.WaitAll(t1, t2, t3); == Task t1 = Task.Factory.StartNew(() => A()); Task t2 = Task.Factory.StartNew(() => B()); Task t3 = Task.Factory.StartNew(() => C()); Task.WaitAll(t1, t2, t3); Continuations • Permettono di decidere l'ordine temporale di esecuzione di Task e Task<T> – Utile quando una esecuzione necessita il risultato di una esecuzione precedente – Non viene garantito che il thread di esecuzione sia sempre lo stesso var res = Task<int>.Factory.StartNew( () => A(10)) .ContinueWith<int>(a => B(a.Result)) .ContinueWith<int>(b => C(b.Result)); AggregateException • L'esecuzione parallela può implicare eccezioni contemporanee • La TPL aggrega le eccezioni in AggregateException – Questo significa che la migrazione di codice esistente non è trasparente (le catch devono essere rivedute) – L'esecuzione non continua e riporta solo le eccezioni contemporanee – La collection InnerExceptions contiene le eccezioni con lo Stack Trace try { Task.WaitAll(t1, t2); } catch(AggregateException err) { foreach(var e in err.InnerExceptions) { Console.WriteLine("Catched! " + e.Message); } throw err.Flatten(); } try { Task.WaitAll(t1, t2, t3); } catch(AggregateException err) { err.Handle(e => { Console.WriteLine("Catched {0}: {1}", e.GetType().FullName, e.Message); return true; // handled }); } Parallel.For, Parallel.ForEach, Parallel.Invoke for (int i = 0; i < N; i++) { results[i] = Compute(i); } Parallel.For(0, N, i => { results[i] = Compute(i); } ); foreach(MyClass c in data) { Compute(c); } Parallel.ForEach(data, c => { Compute(c); } ); static void WalkTree<T>( Tree<T> tree, Action<T> func) { if (tree == null) return; WalkTree(tree.Left, func); WalkTree(tree.Right, func); func(tree.Data); } static void WalkTree<T>( Tree<T> tree, Action<T> func) { if (tree == null) return; Parallel.Invoke( () => WalkTree(tree.Left, func) , () => WalkTree(tree.Right, func) , () => func(tree.Data) ); } Parallel.For - break e stop • ParallelLoopState permette di invocare Break e Stop – Break permette un'interruzione 'soft' • Le operazioni in esecuzione dal task da cui viene eseguito Break (o i task child) verranno completate • LowestBreakIteration è il minimo indice al di sotto del quale tutti i risultati sono validi – LowestBreakIteration è un nullable type – Stop interrompe totalmente l'esecuzione • A fine loop LowestBreakIteration.HasValue è false Cancellation • CancellationTokenSource e CancellationToken servono a informare i Task che devono interrompere le operazioni • La segnalazione ad un Task parent viene propagata ad i Task Child per tutta la gerarchia • Il Task deve essere collaborativo e controllare periodicamente la richiesta di annullamento PLINQ PARALLEL LINQ PLinq • Permette di eseguire query parallele le query verso oggetti e XML • NON serve a parallelizzare Linq to SQL o l'entity framework – È comunque possibile eseguire query parallele sui risultati ottenuti con queste tecnologie • Il concetto base consiste nel "partizionare" porzioni di una query – Ogni thread calcola un pezzo e lo aggiunge al risultato PLinq • ParallelEnumerable è la classe che contiene gli extension methods con le implementazioni parallele di Linq – Count, First, Last, Distinct, Join, Average, Aggregate, … Operatori specifici di PLinq • AsParallel – • AsSequential – • Per forzare l'esecuzione parallela anche quando verrebbe eseguita sequenzialmente dal runtime ForAll – • Suggerimenti su come eseguire il merge dei risultati paralleli parziali WithExecutionMode – • Numero massimo di Core da usare nella query WithMergeOptions – • Si richiede di monitorare il flag di annullamento WithDegreeOfParallelism – • La query deve rispettare / non rispettare l'ordine della sequenza sorgente WithCancellation – • Il resto della query deve essere eseguita in modo sequenziale invece di parallelo AsOrdered, AsUnordered – • Parallelizzare il resto della query, se possibile Permette di fruire in modo parallelo il risultato senza necessità di eseguire il merge sul thread che ha avviato la query Aggregate – Permette l'aggregazione parziale dei risultati mantenuti in partizioni specifiche al TLS Esecuzione parallela • L'uso di AsParallel non implica che l'esecuzione avvenga in modo parallelo • PLinq analizza la query e decide di parallelizzarla solo se la ritiene "safe" • WithExecutionMode e WithDegreeOfParallelism sono molto utili per testare il guadagno al crescere dei core – In produzione è preferibile lasciare a PLinq la decisione del numero di core da usare Query ordering in PLinq • Le query parallele sono eseguite a pezzi in thread diversi – Ogni volta l'ordine del risultato può essere differente Thread 2 Thread 1 Thread 4 Risultato t Thread 3 linea temporale 3 2 1 4 Query ordering in PLinq • Possibilmente non fare affidamento sull'ordine del risultato – I database fanno la stessa cosa • Se è richiesto un risultato ordinato – Se possibile, ordinarlo dopo la query e non prima Array.Sort(a); var q = from x in a.AsParallel() ... var q = from x in a.AsParallel() ... Array.Sort(q); – Diversamente specificare la clausola AsOrdered Array.Sort(a); var q = from x in a.AsParallel().AsOrdered() ... – Oppure usare la clausola OrderBy Array.Sort(a); var q = from x in a.AsParallel().OrderBy(...) ... Cancellation • Si usa WithCancellation • CancellationToken e CancellationTokenSource funzionano in modo identico a quanto già visto nei Task COLLECTION PARALLELE Collection in PFX System.Collections.Concurrent • PFX offre diverse collection di dati tra cui: – ConcurrentBag<T> • Lista non ordinata di oggetti – ConcurrentDictionary<TKey, TValue> • Dizionario – BlockingCollection<T> • Ottimizzata per gli scenari Producer-Consumer • Implementa IEnumerable<T>, ICollection<T> e IDisposable – ConcurrentQueue<T> (analoga a Queue<T>) • Una lista FIFO concorrente • bool TryDequeue(out T result) e bool TryPeek(out T result) – ConcurrentStack<T> (analoga a Stack<T>) • Una list LIFO concorrente • bool TryPop(out T result) e bool TryPeek(out T result) • In particolare le Queue sono fondamentali per smistare dei lavori su un numero fisso di worker thread Classi 'Slim' • Sono classi 'leggere' cioè che non fanno uso di risorse native, con grande beneficio in termini di performance – ManualResetSlim – SemaphoreSlim – CountDownEvent (usa ManualResetSlim internamente) • In sostanza non devono eseguire la transizione User‒Mode Kernel‒Mode TIPS & TRICKS Task lunghi o corti? • TaskCreationOptions.LongRunning – usato per dire allo scheduler di incrementare il numero di thread (es: i task sono impegnati in I/O) • Se un task dura più di 500ms entra in gioco il meccanismo di "starvation detection" – è una strategia per evitare potenziali deadlock • Difendersi dallo starvation: – Task più piccoli possibile – Custom scheduler – ThreadPool.SetMaxThreads(Environment.ProcessorCount) Parallelizzare: problemi e soluzioni • Per la gran parte dei casi rendere parallelo è problematico (the free lunch ...) • Problema 1: lo stato condiviso tra più thread è un freno alla scalabilità e frutto di errori da incubo • Problema 2: molti algoritmi sono intrinsecamente seriali e la loro versione parallela (se esiste) spesso è radicalmente differente • Problema 3: i tradizionali sistemi di unit testing sono totalmente inefficaci perché le modalità di esecuzione dipendono dall'hardware e da altri fattori PARALLEL PATTERNS Una linea guida • Scomporre i problemi in piccoli task – La dimensione conta! – Se ogni 'task' è troppo piccolo, l'overhead di gestione del task finisce per penalizzare la sua esecuzione • Coordinare l'esecuzione dei task – Strettamente dipendente da come funziona l'algoritmo • Condividere i dati necessari all'esecuzione – Minore dipendenza da dati condivisi implica maggiori performance – La condivisione implica l'accesso in modo serializzato alla risorse condivise – Alcune tecniche minimizzano le implicazioni di performance Parallel Loop Pattern • È applicabile quando ogni step del loop non richiede i dati precedenti • Strumenti: – Parallel.For – Parallel.ForEach – Parallel (PLinq) Parallel Task Pattern • Si possono isolare delle unità di lavoro distinte e con dipendenze ben definite? • Le dipendenze sono limitate? • Se si verificano queste condizioni è possibile risolvere creando tanti task quante sono le unità di lavoro • Strumenti: – Task, Task<…> Parallel Aggregation Pattern • Esiste qualche forma di aggregazione sul risultato dei dati? • Ogni step dell'elaborazione produce una parte del risultato? • Se si verificano queste condizioni si usano gli strumenti di aggregazione di PLinq • Strumenti: – PLinq • (from x in sequence.AsParallel() select f(x) ) .Sum(); Futures Pattern / Task Graph Pattern • L'ordine di esecuzione di ogni step di elaborazione dipende dal flusso dei dati? • Si può immaginare il progresso dell'elaborazione con un grafo dipendente dai dati? • Se si verificano queste condizioni, è opportuno usare le Continuations • Strumenti: – Task e Continuations Dynamic Task Parallelism Pattern • La suddivisione del problema in step è dinamica? • L'algoritmo prevede l'uso di strutture ricorsive durante l'elaborazione (es: grafi)? • Se si tratta di percorrere un grafo, la soluzione si può trovare in Parallel.Invoke e ForEach • Strumenti: – Parallel.* Domande ?