Task Parallel Library a RStein. Async 1 - Popis základních tříd a obcházení omezení v TPL
(Obnoveno ze zálohy, omlouvám se za formátování kódu)
V následující sérií článků chci představit některé konstrukce ze své knihovny RStein.Async. Většina popisovaných tříd intenzivně využívá a někdy i s gustem zneužívá Task Parallel library. V článcích se tedy objeví i mnoho informací o samotné knihovně TPL a klíčových slovech async a await v C#. V článcích předpokládám jen základní znalost TPL. Pod základní znalostí si představuju, že víte, jak spustíte nový Task, k čemu se dá Task použít a jak získáte výsledek zpracování Tasku.
Knihovna RStein.Async vznikla jako vedlejší důsledek zkoumání možností Schedulerů v TPL, kdy jsem na projektech zkoušel, co si mohu s TPL dovolit a co je mi v TPL už odepřeno, nebo jsem zjišťoval, jaké je skutečně chování tříd, které jsou v dokumentaci nedostatečně popsány. Články jsou určeny i pro čtenáře, kteří třeba neví, proč by měli používat ConcurrentExclusiveSchedulerPair, protože v jednom díle popíšu nejen to, jaké jsou výhody tohoto scheduleru oproti běžně využívaným a hlavně zneužívaným synchronizačním primitivám (lock-Monitor, Mutex, SpinLock, Condition variable atd), ale napíšeme si i vlastní ConcurrentStrandSchedulerPair a pitváním jeho vnitřností zjistíme, jak se dá napsat ekvivalent třídy ConcurrentExclusiveSchedulerPair. Také chci ukázat, jak je možné napsat jednoduché aktory (a tím skutečně nemyslím ty směšné panďuláky na diagramu případů užití ) s využitím našeho speciálního strand scheduleru a porovnám je s aktory, které lze napsat pomocí samotného TPL Dataflow v .Net Frameworku.
Pro lidi, kteří vyvíjejí v C++ a znají knihovnu BOOST ASIO, může být zajímavé, že se článku objeví názvy tříd, které důvěrně znají - io_service a strand. A dodám, že jsem nepoužil jen názvy, ale i odpovědnosti těchto tříd se shodují s odpovědnostmi tříd v Boostu, i když jsou mé třídy pochopitelně napsány zcela jinak.
Snad se mi vás podařilo navnadit a pro nedočkavé dodám, že si mohou již dnes celou knihovnu stáhnout z Bitbucketu.
git clone git@bitbucket.org:renestein/rstein.async.git
Forky a pull requesty od kohokoli jsou skutečně vítány.
Seriál Task Parallel Library a RStein.Async (předběžná osnova)
Task Parallel Library a RStein. Async 1 z n – Popis základních tříd a obcházení omezení v TPL.
Task Parallel Library a RStein. Async 2 z n – (boost) ASIO v .Net a IoServiceScheduler.
Task Parallel Library a RStein. Async 3 z n – Ukázky použití IoServiceScheduleru. Coroutines.
Task Parallel Library a RStein. Async 4 z n – ThreadPoolScheduler založený na IoServiceScheduleru.
Task Parallel Library a RStein. Async 6 z n – Vytvoření StrandScheduleru.
Task Parallel Library a RStein. Async 7 z n – Náhrada za některé synchronizační promitivy – ConcurrentStrandSchedulerPair.
Task Parallel Library a RStein. Async 8 z n – Jednoduchý “threadless” actor model s využitím StrandScheduleru.
Task Parallel Library a RStein. Async 9 z n – Píšeme aktory I.
Task Parallel Library a RStein. Async 10 z n – Píšeme aktory II.
Task Parallel Library a RStein. Async 11 z n – Píšeme nový synchronizační kontext - IOServiceSynchronizationContext.
Task Parallel Library a RStein. Async 12 z n – Použití IOServiceSynchronizationContextu v konzolové aplikaci a Windows službě.
(bude upřesněno)
Poznámka: V celé sérii článků budu používat slovo Task pro třídu, task pro název proměnné / argumentu metody a ”anglicismy” tásk/tásky místo “úloha/úlohy“ nebo jiného českého patvaru při zmínce o /úlohách-táscích/ v dalším textu. Předpokládám, že pro většinu vývojářů je takový text srozumitelnější.
V průběhu celého seriálu budeme psát nové schedulery. Jak asi víte, TaskScheduler je v TPL nízkoúrovňová třída, která je odpovědná za vyřízení předaných objektů Task. Každý potomek abstraktní třídy TaskScheduler rozhoduje o tom, kolik threadů se použije k vyřízení požadavků, i o tom, v jakém pořadí a kdy přesně budou předané objekty Task spuštěny. V .Net Frameworku jsou dva základní schedulery, které by měly pro většinu běžných scénářů postačovat. Scheduler, který je dostupný ve vlastnosti TaskScheduler.Default, využívá výchozí .Net ThreadPool a scheduler vrácený vlastností TaskScheduler.FromCurrentSynchronizationContext se hodí pro aplikace, ve kterých musí platit, že s ovládacími prvky na formuláři manipuluje jen tzv. UI thread, který ovládací prvek vytvořil (Windows Forms, WPF, Silverlight, Metro - Modern UI), jinak dojde k výjimce.
Chcete-li napsat vlastní TaskScheduler, podědíte z třídy TaskScheduler a přepíšete následující metody:
protected internal abstract void QueueTask(
Task task
)
Metoda QueueTask většinou uloží předaný objekt task do nějaké své interní kolekce k pozdějšímu vyřízení.
protected abstract bool TryExecuteTaskInline(
Task task,
bool taskWasPreviouslyQueued
)
Metoda TryExecuteTaskInline je volána, jestliže infrastruktura TPL rozhodne, že objekt task by měl být spuštěn v aktuálním vlákně. Typicky je tato metoda volána, když čekáte na výsledek zpracování tasku (task.Wait) a thread, ve kterém je metoda Wait přímo či nepřímo zavolána, není blokován, ale využit infrastrukturou TPL ke zpracování tásku. Pravidelně zabíjený nebo i jen blokovaný thread skutečně není ve vícevláknových aplikacích dobrý thread. V druhém argumentu – taskWasPreviouslyQueued – máte příznak, který sděluje, jestli task již byl nebo nebyl předán metodě QueueTask a podle toho lze upravit logiku v Scheduleru. Jak uvidíme později u StrandScheduleru, tento příznak pro nás bude vemi důležitý proto, abychom dostáli všem zárukám při zpracování tásků, které StrandScheduler svým klientům poskytuje.
Jestliže se ve vlastním Scheduleru rozhodneme, že teď je možné task vyřídit, stačí zavolat metodu TryExecuteTask z bázové třídy TaskScheduler a ta se postará o veškeré další záležitosti včetně uložení výsledku zpracování nebo výjimky do objektu task.
Další metodu používá hlavně debugger, který dovede zobrazit frontu tásků čekajících na vyřízení v našem scheduleru.
protected abstract IEnumerable<Task> GetScheduledTasks()
Každý scheduler by také měl být schopen sdělit, kolik tásků dokáže v jednom okamžiku vyřizovat paralelně. Neboli jaký je nejvyšší stupeň konkurence v Scheduleru, což je údaj, který poskytneme zájemcům ve vlastnosti MaximumConcurrencyLevel.
public virtual int MaximumConcurrencyLevel { get; }
Na kód speciálních schedulerů se můžete podívat v Parallel Extension Extras od Microsoftu.
Když začnete psát méně tradiční schedulery, narazíte na jedno zásadní omezení. Nový objekt task je po pokusu o spuštění tásku (Task.Run, TaskFactory.Run, Task.Start) asociován s právě použitým schedulerem a nikdy už nemůže být předán jinému scheduleru. Když se o něco takového pokusíte, metoda TryExecuteTask vyhodí výjimku, ve které vám sdělí, že žonglování s táskem mezi schedulery není povoleno.
To asi nevypadá jako nějaké zásadní omezení, protože proč bychom měli vůbec chtít přehodit tásk z jednoho scheduleru do druhého? Jak uvidíte v dalších dílech serálu, napíšeme si postupně pro své schedulery dekorátory, kteří například zajistí, že po určitou dobu nebudou tásky zpracovávány, ale jen schraňovány v privátní frontě a teprve po splnění dalších podmínek uvolněny k vyřízení. Tedy tásk bude aktivován v nějakém jiném scheduleru, než je ten, který tásk později vyřídí.
Bez přepsání TPL bohužel nelze toto omezení, které by se dalo parafrázovat větou “tásk předán scheduleru, z toho nutně a nepodmíněně plyne, že ten samý scheduler tásk také vyřídí”, jednoduše potlačit.
V knihovně RStein.Async jsem musel tedy zkusit navrhnout rozhraní a třídy tak, aby se “vlčí” knihovna TPL “nažrala” a přitom můj návrh (doufám, že ne “kozí”! ) zůstal celý.
Nejprve tedy musíme uspokojit TPL a přitom musíme být schopni zavolat metodu TryExecuteTask z třídy TaskScheduler odkudkoli z naší knihovny. TPL proto nabídnu speciální scheduler, který je z hlediska TPL plnohodnotným schedulerem. Tento scheduler nebude dělat nic jiného, než delegovat vykonání všech metod na mé vlastní “reálné” schedulery a čekat, až “reálný” scheduler požádá o vykonání Tasku
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public interface IProxyScheduler
{
bool DoTryExecuteTask(Task task);
TaskScheduler AsTplScheduler();
}
}
Rozhraní IProxyScheduler umí jen dvě věci. Metoda AsTplScheduler musí vrátit scheduler, se kterým umí pracovat TPL, a implementace metody DoTryExecuteTask zavolá metodu TryExecuteTask z TPL scheduleru.
Náš konkrétní proxy scheduler vypadá takto:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public class ProxyScheduler : TaskScheduler, IProxyScheduler, IDisposable
{
private readonly ITaskScheduler m_realScheduler;
public ProxyScheduler(ITaskScheduler realScheduler)
{
if (realScheduler == null)
{
throw new ArgumentNullException("realScheduler");
}
m_realScheduler = realScheduler;
m_realScheduler.ProxyScheduler = this;
}
public override int MaximumConcurrencyLevel
{
get
{
return m_realScheduler.MaximumConcurrencyLevel;
}
}
public void Dispose()
{
Dispose(true);
}
public virtual bool DoTryExecuteTask(Task task)
{
if (task == null)
{
throw new ArgumentNullException("task");
}
bool taskExecuted = TryExecuteTask(task);
if (taskExecuted)
{
task.RemoveProxyScheduler();
}
return taskExecuted;
}
public virtual TaskScheduler AsTplScheduler()
{
return this;
}
protected override void QueueTask(Task task)
{
task.SetProxyScheduler(this);
m_realScheduler.QueueTask(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!taskWasPreviouslyQueued)
{
task.SetProxyScheduler(this);
}
return m_realScheduler.TryExecuteTaskInline(task, taskWasPreviouslyQueued);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return m_realScheduler.GetScheduledTasks();
}
protected void Dispose(bool disposing)
{
if (disposing)
{
m_realScheduler.Dispose();
}
}
}
}
V kódu asi není po přečtení předchozích odstavců moc překvapivých řádků. Tento scheduler bude vydán kdykoli, kde je očekáván Tpl scheduler, a proto:
1) Dědíme z abstraktní třídy TaskScheduler z TPL a podporujeme před chvílí popisované rozhraní IProxyScheduler.
2) Metoda AsTplScheduler vrátí odkaz na samotný objekt “this” – aktuální proxy scheduler.
3) Metody QueueTask, TryExecuteTaskInline, GetScheduledTasks a MaximumConcurrenyLevel jsou implementovány tak, že delegují na nějaký “reálný” scheduler z naší knihovny.
4) Metoda DoTryExecuteTask volá metodu TryExecuteTask.
Metody QueueTask a TryExecuteTaskInline také asociují a deasociují ProxyScheduler s předaným táskem pomocí extenzních metod SetProxyScheduler a RemoveProxyScheduler. Teď nás tolik trápit nemusí, jak jsem tyto metody napsal. Zájemci se ale mohou na kód podívat v předstihu.
Ještě jednou k terminologii schedulerů v knihovně RStein.Async, která může být zpočátku matoucí. ProxyScheduler má ve svém názvu slovo proxy, protože z pohledu všech dalších tříd v knihovně RStein.Async tásky vyřizují jiné (“reálné”) schedulery, kteří nejsou, jak uvidíme za chvíli, potomkem třídy TaskScheduler z TPL a kteří čekají na to, až ProxyScheduler zavolá jejich metody. Při spuštění tásku musí ale i “reálný” scheduler požádat ProxyScheduler, aby metodou TryExecuteTask upozornil infrastrukturu TPL, že je třeba task nyní vyřídit.
Takže knihovna RStein.Async používá ProxyScheduler, který ale knihovna TPL vidí jako jediný pro ni dostupný “reálný” scheduler.
Laboroval jsem s různými názvy pro ProxyScheduler, ale všechny další varianty mi přišly ještě horší.
Rozhraní ITaskScheduler je rozhraní, které podporují všechny "reálné" schedulery v knihovně RStein.Async.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public interface ITaskScheduler : IDisposable
{
int MaximumConcurrencyLevel
{
get;
}
IProxyScheduler ProxyScheduler
{
get;
set;
}
Task Complete
{
get;
}
void QueueTask(Task task);
bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
IEnumerable<Task> GetScheduledTasks();
}
}
Toto rozhraní obsahuje všechny metody a vlastnosti, které jsme popisoval výše u schedulerů v TPL. Na tyto metody deleguje ProxyScheduler, jehož instance je z rozhraní ITaskScheduler také dostupná. V rozhraní naleznete také vlastnost Complete, která vrací Task, jenž by měl být ve stavu “dokončen” v okamžiku, když scheduler již skončil svou práci a nemá být dále používán.
Schedulery v knihovně RStein.Async mají mnoho společných rysů, a proto jsem základní charakteristiky vytáhl do vlastní bázové třídy TaskSchedulerBase, aby všechny Schedulery nemusely reimplementovat celé rozhraní ITaskScheduler.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public abstract class TaskSchedulerBase : ITaskScheduler
{
private const string PROXY_SCHEDULER_ALREADY_SET_EXCEPTION_MESSAGE = "ProxyScheduler is already set and cannot be modified!";
private readonly CancellationTokenSource m_schedulerCancellationTokenSource;
private readonly TaskCompletionSource<object> m_serviceCompleteTcs;
private readonly object m_serviceLockObject;
private bool m_disposed;
private IProxyScheduler m_proxyScheduler;
protected TaskSchedulerBase()
{
m_disposed = false;
m_serviceLockObject = new Object();
m_serviceCompleteTcs = new TaskCompletionSource<object>();
m_schedulerCancellationTokenSource = new CancellationTokenSource();
}
protected object GetServiceLockObject
{
get
{
return m_serviceLockObject;
}
}
protected virtual CancellationToken SchedulerRunCanceledToken
{
get
{
return m_schedulerCancellationTokenSource.Token;
}
}
protected virtual CancellationTokenSource SchedulerRunCancellationTokenSource
{
get
{
return m_schedulerCancellationTokenSource;
}
}
public abstract int MaximumConcurrencyLevel
{
get;
}
public virtual IProxyScheduler ProxyScheduler
{
get
{
return m_proxyScheduler;
}
set
{
lock (GetServiceLockObject)
{
checkIfDisposed();
if (value == null)
{
throw new ArgumentNullException("value");
}
if (m_proxyScheduler != null)
{
throw new InvalidOperationException(PROXY_SCHEDULER_ALREADY_SET_EXCEPTION_MESSAGE);
}
m_proxyScheduler = value;
}
}
}
public virtual Task Complete
{
get
{
return m_serviceCompleteTcs.Task;
}
}
public void Dispose()
{
lock (m_serviceLockObject)
{
if (m_disposed)
{
return;
}
try
{
Dispose(true);
m_disposed = true;
m_serviceCompleteTcs.TrySetResult(null);
SchedulerRunCancellationTokenSource.Cancel();
}
catch (Exception ex)
{
Trace.WriteLine(ex);
m_serviceCompleteTcs.TrySetException(ex);
}
}
}
public abstract void QueueTask(Task task);
public abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
public abstract IEnumerable<Task> GetScheduledTasks();
protected abstract void Dispose(bool disposing);
protected void checkIfDisposed()
{
if (m_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
}
}
}
TaskSchedulerBase ponechá klíčové metody a vlastnosti abstraktní, protože zpracování tásků mohou řešit jen odvozené třídy, ale sama nabídne podporu pro ukončení činnosti scheduleru v mětodě Dispose .
O metodě Dispose v Schedulerech ještě budeme mluvit, protože deterministické ukončení činnosti scheduleru je pro některé scénáře klíčové, ale zde jen shrnu.
TaskSchedulerBase garantuje, že metoda Dispose bude volána jen jednou. Metoda Dispose - jako jedno z mála míst v knihovně – používá kritickou sekci (lock). Metoda převede Task ve vlastnosti Complete do stavu “dokončen” bez ohledu na to, jestli chráněná metoda Dispose v odvozených třídách proběhne bez problémů, nebo jestli dojde k vyvolání výjimky, takže libovolný kód v aplikaci, který závisí na informaci, že nějaký scheduler dokončil svou činnost, může pokračovat, i když došlo k výjimce. Metoda Dispose také stornuje CancellationToken, aby i další kód v odvozených třídách mohl reagovat na ukončení činnosti scheduleru.
Základní rozhraní a třídy máme, obešli jsme i některá striktní omezení v TPL a je načase začít psát specializované schedulery. Dnes si ještě ukážeme jen primitivní CurrentThreadScheduler, u kterého pojmenování naznačuje, že všechny tásky budou vždy vykonány ihned a v aktuálním threadu. Již v dalším díle nás ale čeká zajímavý a užitečný IoServiceScheduler.
CurrentThreadScheduler je třída na pár řádků, ale už alespoň nejde o abstraktní třídu, a my si jejím napsáním ověříme, že naše stávající infrastruktura funguje.
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace RStein.Async.Schedulers
{
public class CurrentThreadScheduler : TaskSchedulerBase
{
private const int MAXIMUM_CONCURRENCY_LEVEL = 1;
public override int MaximumConcurrencyLevel
{
get
{
checkIfDisposed();
return MAXIMUM_CONCURRENCY_LEVEL;
}
}
public override void QueueTask(Task task)
{
checkIfDisposed();
ProxyScheduler.DoTryExecuteTask(task);
}
public override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
checkIfDisposed();
ProxyScheduler.DoTryExecuteTask(task);
return true;
}
public override IEnumerable<Task> GetScheduledTasks()
{
checkIfDisposed();
return Enumerable.Empty<Task>();
}
protected override void Dispose(bool disposing)
{
}
}
}
Náš “reálný” scheduler s názvem CurrentThreadScheduler v metodách TryExecuteTaskInline a QueueTask spustí tásk s využitím ProxyScheduleru.
Metoda GetScheduledTasks vrátí prázdnou kolekci tásků, protože žádné tásky v metodě QueueTask neskladujeme.
I když jde o jednoduchý scheduler, měli bychom mít testy, které ověří, že se scheduler chová podle našich představ.
Nejprve CurrentThreadScheduler instanciujeme a předáme ho ProxyScheduleru.
protected override ITaskScheduler Scheduler
{
get
{
return m_scheduler;
}
}
protected override IProxyScheduler ProxyScheduler
{
get
{
return m_proxyScheduler;
}
}
public override void InitializeTest()
{
m_scheduler = new CurrentThreadScheduler();
m_proxyScheduler = new ProxyScheduler(m_scheduler);
base.InitializeTest();
}
......................
//Base tests
public TaskFactory TestTaskFactory
{
get
{
return m_testTaskFactory;
}
}
public override void InitializeTest()
{
m_testTaskFactory = new TaskFactory(ProxyScheduler.AsTplScheduler());
base.InitializeTest();
}
Metoda ProxyScheduler.GetTplScheduler() je využita k vytvoření instance TaskFactory z TPL, která nyní bude - nepřímo a aniž by si toho byla vědoma - používat ke spuštění tasků náš CurrentThreadScheduler.
A tady jsou testy:
Jestliže vytvoříme jeden tásk, tento tásk musí být vyřízen.
[TestMethod]
public async Task WithTaskFactory_When_One_Task_Is_Queued_Then_Task_is_Executed()
{
bool wasTaskExecuted = false;
await TestTaskFactory.StartNew(() => wasTaskExecuted = true);
Assert.IsTrue(wasTaskExecuted);
}
Když vytvoříme více tásků (v tomto testu jich je 8096), musí být všechny tásky vyřízeny.
[TestMethod]
public async Task WithTaskFactory_When_Tasks_Are_Queued_Then_All_Tasks_Are_Executed()
{
const int NUMBER_OF_TASKS = 8096;
int numberOfTasksExecuted = 0;
var tasks = Enumerable.Range(0, NUMBER_OF_TASKS)
.Select(_ => TestTaskFactory.StartNew(() => Interlocked.Increment(ref numberOfTasksExecuted))).ToArray();
await Task.WhenAll(tasks);
Assert.AreEqual(NUMBER_OF_TASKS, numberOfTasksExecuted);
}
Další testy ověřují charakteristiky, které by měl mít každý náš ITaskScheduler. Jedná se hlavně o ověření, že třída dodržuje doporučení “odlehčeného” Dispose idiomu .
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
public void QueueTask_When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
var dummyTask = new Task(() => {});
Scheduler.Dispose();
Scheduler.QueueTask(dummyTask);
}
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
public void TryExecuteTaskInline_When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
var dummyTask = new Task(() => {});
Scheduler.Dispose();
Scheduler.TryExecuteTaskInline(dummyTask, false);
}
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
public void GetScheduledTasks_When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
Scheduler.Dispose();
Scheduler.GetScheduledTasks();
}
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
public void MaximumConcurrencyLevel_When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
Scheduler.Dispose();
var maximumConcurrencyLevel = Scheduler.MaximumConcurrencyLevel;
}
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
private void SetProxyScheduler__When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
Scheduler.Dispose();
Scheduler.ProxyScheduler = null;
}
[TestMethod]
[ExpectedException(typeof (ObjectDisposedException))]
private void GetProxyScheduler__When_TaskScheduler_Disposed_Then_Throws_ObjectDisposedException()
{
Scheduler.Dispose();
var proxyScheduler = Scheduler.ProxyScheduler;
}
[TestMethod]
public void Dispose_Repeated_Call_Does_Not_Throw()
{
Scheduler.Dispose();
Scheduler.Dispose();
}
[TestMethod]
public void Dispose_Does_Not_Throw()
{
Scheduler.Dispose();
}
}
A to je dnes skutečně vše.
V další části uvidíme nejen slibovaný IoServiceScheduler, ale ukážeme si, že IoServiceScheduler má speciální chování, které musí být pokryto mnohem robustnějšími testy.
Monday, 26 May 2014 09:51:00 (Central Europe Standard Time, UTC+01:00)