\


 Tuesday, 03 June 2014
Task Parallel Library a RStein. Async 2 – (boost) ASIO v .Net a IoServiceScheduler.
(Článek obnoven ze zálohy 21. 1. 2020, omlouvám se za formátování kódu.)

V dnešním příspěvku o TPL a knihovně RStein.Async napíšeme slibovaný IoServiceScheduler. Jestliže jste si ještě neprošli  první díl seriálu a nemáte přehled o tom, k čemu slouží “proxy” schedulery a “reálné” schedulery, jaká omezení z TPL jsem obešel a proč jsem zavedl tuto na první pohled podivnou terminologii, bude lepší, když si první díl přečtete dříve, než se začtete do dalších částí seriálu.
Knihovna RStein.Async je dostupná  na Bitbucketu.
git clone git@bitbucket.org:renestein/rstein.async.git

Také si hned v úvodu dovolím podotknout, že tento díl je velmi dlouhý, a optimisticky dodám, že ostatní díly seriálu by měly být o dost kratší.Veselý obličej


Seriál  Task Parallel Library a RStein.Async  (předběžná osnova)

Task Parallel Library a RStein. Async 1 z n –  Popis základních tříd a obcházení omezení v TPL.

Task Parallel Library a RStein. Async 2 z n –  (boost) ASIO v .Net a IoServiceScheduler.

Task Parallel Library a RStein. Async 3 z n – Ukázky použití IoServiceScheduleru. Coroutines.

Task Parallel Library a RStein. Async 4 z n  – ThreadPoolScheduler založený na IoServiceScheduleru.

Task Parallel Library a RStein. Async 6 z n – Vytvoření StrandScheduleru.

Task Parallel Library a RStein. Async 7 z n – Náhrada za některé synchronizační promitivy – ConcurrentStrandSchedulerPair.

Task Parallel Library a RStein. Async 8 z n – Jednoduchý “threadless” actor model s využitím StrandScheduleru.

Task Parallel Library a RStein. Async 9 z n – Píšeme aktory I.

Task Parallel Library a RStein. Async 10 z n – Píšeme aktory II.

Task Parallel Library a RStein. Async 11 z n – Píšeme nový synchronizační kontext  - IOServiceSynchronizationContext.

Task Parallel Library a RStein. Async 12 z n – Použití IOServiceSynchronizationContextu v konzolové aplikaci a Windows službě.

(bude upřesněno)


Poznámka: V celé sérii článků budu používat slovo Task pro třídu, task pro název proměnné / argumentu metody a ”anglicismy” tásk/tásky místo “úloha/úlohy“ nebo jiného českého patvaru při zmínce o /úlohách-táscích/ v dalším textu. Předpokládám, že pro většinu vývojářů je takový text srozumitelnější.

IoServiceScheduler byl pojmenován na počest své starší příbuzné io_service v  knihovně Boost.ASIO. I když převezmeme mnoho rysů z io_service, nebudeme otrocky kopírovat všechny její vlastnosti. Pro vývojáře, kteří io_service znají, podotknu, že náš IoServiceScheduler nelze považovat za plnohodnotnou implementaci vzoru Proactor. Bylo by sice snadné zavést v .Net Frameworku novou konvenci pro zpracování asynchronních IO operací i registrovat nové asynchronní poskytovatele a imitovat tak většinu rysů z Boost.Asio, ale protože v .Net Frameworku máme jiné idiomy, šlo by o zbytečné nošení cizorodého kódu do hájemství Microsoftu. Mrkající veselý obličej

IoServiceScheduleru charakterizuje to, že máme dokonale pod kontrolou, které thready zpracují vytvořené tásky. Dokud IoScheduleru nepropůjčíme thread tím, že zavoláme jednu z jeho metod Run, RunOne, Poll nebo PollOne, žádné tásky zpracovávány nebudou. Řečeno mírně jinak, použití IoServiceScheduleru v aplikaci zaručuje, že tásky nebudou vyřízeny jiným threadem, než tím, který IoServiceScheduleru výslovně a na dobu určitou propůjčíme. Samo o sobě nevypadá takové chování jako žádný zázrak, ale v průběhu celého seriálu zjistíme, jak na správném chování IoServiceScheduleru závisí další třídy .

Nejdříve se podíváme na rozhraní IoServiceScheduleru, které vychází z io_service, a já se pokusím stručně popsat, jak jednotlivé metody pracují. Poté se podíváme na testy, které ověřují korektní chování metod v IoServiceScheduleru, a napíšeme samotné metody IoServiceScheduleru.
Metody odpovědné za vyřizování tásků v IoServiceScheduleru:

Metoda int Run()
Po zavolání metody Run IoServiceScheduler začne v aktuálním threadu  vyřizovat tásky. Metoda Run skončí teprve tehdy, když začne platit jedna z uvedených podmínek:
1) IoServiceScheduler již neobsahuje žádné další tásky ke zpracování a současně jsme IoServiceScheduleru nepředali žádný (viz níže v článku) objekt “Work”, kterým sdělujeme, že metoda Run má čekat na další tásky do té doby, dokud objekt “Work” nezlikvidujeme. Objekt Work nyní  zjednodušeně berme jako vytížení IoServiceScheduleru nějakým táskem – předstíranou prací, která udrží metodu Run v zápřahu a nedovolí jí skončit.
2) IoServiceScheduler již neobsahuje žádné další tásky ke zpracování a dříve předaný objekt Work jsme zlikvidovali (=zavolali jsme jeho metodu Dispose).
3) Zavolali jsme metodu Dispose, kterou si vynutíme ukončení činnosti IoScheduleru.
Metoda vrátí počet zpracovaných tásků.
Počet threadů, které mohou v jednom okamžiku zavolat metodu Run, není omezen.

Metoda int RunOne()
Metoda RunOne vyřídí v aktuálním threadu právě jeden tásk. Jestliže IoServiceScheduler žádný tásk neobsahuje, metoda RunOne zablokuje aktuální vlákno do té doby, dokud nějaký tásk není IoServiceScheduleru předán.
Metoda RunOne tedy skončí svou činnost, když nastane jedna z těchto podmínek.
1) Metoda vyřídila právě jeden tásk.
2) Zavolali jsme metodu Dispose, kterou si vynutíme ukončení činnosti IoScheduleru.
Metoda vrátí počet zpracovaných tásků , návratová hodnota by měla být vždy rovna jedné.
Počet threadů, které mohou v jednom okamžiku zavolat metodu RunOne, není omezen.

Metoda int Poll()
Po zavolání metody Poll IoServiceScheduler začne v aktuálním threadu  vyřizovat tásky.
Metoda Poll skončí  tehdy, když začne platit jedna z uvedených podmínek:
1) IoServiceScheduler již neobsahuje žádné další tásky ke zpracování.
2) Zavolali jsme metodu Dispose, kterou si vynutíme ukončení činnosti IoScheduleru.
Na rozdíl od metody Run, metoda Poll skončí ihned poté, co zjistí, že již žádné další tásky v IoServiceScheduleru nejsou. Existence/absence objektu Work nemá na činnost metody Poll žádný vliv.
Metoda vrátí počet zpracovaných tásků .
Počet threadů, které mohou v jednom okamžiku zavolat metodu Poll, není omezen.

Metoda int PollOne()
Metoda PollOne vyřídí v aktuálním threadu maximálně jeden tásk. Jestliže IoServiceScheduler žádný tásk neobsahuje, metoda PollOne ihned ukončí svou činnost a vrátí řízení volajícímu kódu.
Metoda PollOne skončí svou činnost, když nastane jedna z těchto podmínek.
1) Metoda vyřídila právě jeden tásk.
2) Scheduler v době volání metody PollOne neobsahuje žádný tásk.
2) Zavolali jsme metodu Dispose, kterou si vynutíme ukončení činnosti IoScheduleru.
Na rozdíl od metody RunOne metoda PollOne nikdy neblokuje aktuální vlákno tím, že by čekala na zařazení nového tásku ke zpracování v IoServiceScheduleru.
Metoda vrátí počet zpracovaných tásků.  Návratová hodnota by měla být vždy 0 (tásků), nebo 1 (tásk).
Počet threadů, které mohou v jednom okamžiku zavolat metodu PollOne, není omezen.

IoServiceScheduler je potomkem TaskSchedulerBase a k zařazení i zpracování tásků v Scheduleru nabízí nám dobře známé metody.
public override void QueueTask(Task task){…};

public override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {…}

Bez problémů tedy můžeme IoServiceScheduler předat do TaskFactory z TPL, podobně jako jsme si to již předvedli minule u CurrentThreadScheduleru.

IoServiceScheduler navíc nabízí k vytváření tásků  a jejich zařazení ke zpracování i alternativní rozhraní svého předka z Boost.Asio.

Metoda Task Dispatch(Action action);

Metoda z delegáta v argumentu action vytvoří nový tásk a připraví ho ke zpracování.

Jestliže je volána metoda Dispatch ve stejném threadu, ve kterém je nyní aktivní metoda Run, RunOne, Poll, nebo PollOne, tak metoda Dispatch může tásk vykonat ihned (“inline”).

Jestliže tedy stávající tásk vyřizovaný IoServiceSchedulerem zavolá metodu Dispatch, může být delegát action zavolán ihned, protože si můžeme být jisti, že tásk vykonáme ve “správném” threadu, který byl propůjčen IoServiceScheduleru.
Metoda vrátí  tásk, který je dokončen, když svou činnost skončí delegát v argumentu action.

Metoda Task Post(Action action);
Stejně jako metoda Dispatch, i metoda Post z delegáta v argumentu action vytvoří nový tásk a připraví ho ke zpracování.
Na rozdíl od metody Dispatch a bez ohledu na to, ve kterém threadu je metoda Post aktivována, metoda Post nesmí nikdy delegáta action vykonat ihned (“inline”), ale musí jen vytvořit nový tásk, zařadit ho ke zpracování a vrátit řízení.

Metoda vrátí tásk, který je dokončen, když svou činnost skončí delegát v argumentu action.

Metoda Action Wrap(Action action);
Metoda Wrap přebírá i vrací argument typu Action. Argument action je “zabalen” do delegáta, který po svém vyvolání argument action předá metodě Dispatch, o níž už víme, že z delegáta action vytvoří nový tásk ke zpracování.
Metodě Wrap  předáte kdykoli v delegátu action odkaz na  kód, u kterého požadujete, abyste ho mohli sami později zařadit ke zpracování v tomto IoServiceScheduleru ve formě tásku, a ona vám vrátí delegáta se stejnou signaturou, jakou má argument action , a který po svém vyvolání přesně toto zvládne.

Metody Dispatch, Post a Wrap mají přetížené varianty, které místo delegáta typu Action přijímají odkaz na delegáta typu Func<Task>. Tyto varianty existují proto, abychom se částečně zbavili  některých nepříjemných problémů s async lambda výrazy, které skvěle popsal Stephen Toub na MSDN blogu.

Jak jsem již poznamenal, metody Dispatch, Post a Wrap představují alternativní rozhraní pro vytváření tásků, a protože toto rozhraní bude mít odlišné klienty, než rozhraní známé z TPL, vzpomeneme si na princip “Interface Seggregation“ a zmíněné metody extrahujeme do samostatného rozhraní s názvem IAsioTaskService.

using System;

using System.Threading.Tasks;

namespace RStein.Async.Schedulers

{

public interface IAsioTaskService : IDisposable

{

Task Dispatch(Action action);

Task Dispatch(Func<Task> function);

Task Post(Action action);

Task Post(Func<Task> function);

Action Wrap(Action action);

Action Wrap(Func<Task> function);

}

}

Pro ty, kdo mají raději obrázky, zde je rozhraní třídy IoServiceScheduler znovu.

image

Po nezbytném úvodu bychom měli mít mnohem lepší představu o odpovědnostech IoServiceScheduleru a nyní si zkusíme IoService Scheduler napsat.

public class IoServiceScheduler : TaskSchedulerBase, IAsioTaskService

{

public const int REQUIRED_WORK_CANCEL_TOKEN_VALUE = 1;

public const int POLLONE_RUNONE_MAX_TASKS = 1;

public const int UNLIMITED_MAX_TASKS = -1;

private readonly ThreadLocal<IoSchedulerThreadServiceFlags> m_isServiceThreadFlags;

private readonly CancellationTokenSource m_stopCancelTokenSource;

private readonly BlockingCollection<Task> m_tasks;

private readonly object m_workLockObject;

private CancellationTokenSource m_workCancelTokenSource;

private volatile int m_workCounter;

public IoServiceScheduler()

{

m_tasks = new BlockingCollection<Task>();

m_isServiceThreadFlags = new ThreadLocal<IoSchedulerThreadServiceFlags>(() => new IoSchedulerThreadServiceFlags());

m_stopCancelTokenSource = new CancellationTokenSource();

m_workLockObject = new object();

m_workCounter = 0;

}

.....

}

IoServiceScheduler je potomkem naší bázové třídy TaskSchedulerBase a podporuje rozhraní IAsioTaskService. Po předchozích odstavcích určitě nejste překvapeni. Veselý obličej

V konstruktoru inicializujeme několik důležitých proměnných. V threadově bezpečné kolekci m_tasks typu BlockingTaskCollection budeme držet tásky zařazené ke zpracování. V threadově lokální proměnné  m_isServiceThreadFlags uložíme pro každý thread, který bude IoServiceScheduleru propůjčen, informaci, že jde o thread, který IoServiceScheduler po volání metod Poll, PollOne, Run a RunOne nyní vlastní, dále informaci o tom, kolik tásků můžeme v tomto threadu nyní vyřídit a kolik již jich bylo vyřízeno. Celá třída IoServiceSchedulerThreadFlags vypadá takto.

namespace RStein.Async.Schedulers

{

public class IoSchedulerThreadServiceFlags

{

public IoSchedulerThreadServiceFlags()

{

ResetData();

}

public bool IsServiceThread

{

get;

set;

}

public int MaxOperationsAllowed

{

get;

set;

}

public int ExecutedOperationsCount

{

get;

set;

}

public void ResetData()

{

IsServiceThread = false;

MaxOperationsAllowed = ExecutedOperationsCount = 0;

}

}

}

Vraťme se zpátky ke konstruktoru IoServiceScheduleru. Proměnná m_stopCancelTokenSource je instance CancellationTokenSource, která je stornována ihned poté, co je činnost IoServiceScheduleru voláním metody Dispose ukončena.

Proměnné m_workLockObject a m_workCounter se týkají objektů Work, které jsem letmo popisoval výše u metody Run. Zopakujme, že objekt Work představuje “práci”, která udrží metodu Run IoServiceScheduleru v chodu, i když IoServiceScheduler neobsahuje žádné tásky, a zprostředkovaně tak dosáhne toho,že si IoServiceScheduler ponechá jednou propůjčený thread i pro tásky, které mohou být do IoServiceScheduleru přidány “později”.

Ještě lepší asi bude, když se podíváme, jak je objekt Work udělán.

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Threading;

namespace RStein.Async.Schedulers

{

public sealed class Work : IDisposable

{

private readonly CancellationTokenSource m_cancelTokenSource;

public Work(IoServiceScheduler scheduler)

{

m_cancelTokenSource = new CancellationTokenSource();

scheduler.AddWork(this);

}

internal CancellationToken CancelToken

{

get

{

return m_cancelTokenSource.Token;

}

}

internal void RegisterWorkDisposedHandler(Action action)

{

m_cancelTokenSource.Token.Register(action);

}

public void Dispose()

{

Dispose(true);

}

private void Dispose(bool disposing)

{

if (disposing)

{

m_cancelTokenSource.Cancel();

}

}

}

}

Kdo zná Boost.Asio, musí mu být zřejmé, jak jsem se snažil zachovat styl práce s objektem Work.
Ve stručnosti a aniž byste viděli kód v IoServiceScheduleru:

  1. Když vytvoříte objekt Work, předáte mu odkaz na IoServiceScheduler, jehož později volaná metoda Run nemá skončit.
  2. Objekt Work notifikuje IoServiceScheduler o své existenci tím, že volá internal metodu AddWork scheduleru.
  3. IoServiceScheduler si poznamená, že existuje nový objekt Work. K tomu využije proměnné, které jsme viděli v jeho konstruktoru. Také si IoServiceScheduler u objektu Work ihned předplatí přes metodu RegisterWorkDisposedHandler informaci o tom, že byl objekt Work zničen. Zničením míníme vyvolání metody Dispose.
  4. IoServiceScheduleru dokáže pracovat s neomezeným počtem objektů Work, i když platí, že k tomu, aby metoda Run IoServiceScheduleru nevrátila řízení po zpracování všech tásků, stačí, aby existoval jeden objekt Work. Přidání dalších a dalších objektů Work nemá již na činnost IoServiceScheduleru vliv.

Na to, jak přesně IoServiceScheduler spravuje objekty Work, se můžete podívat sami. My se teď soustředíme na metody Run, RunOne, Poll, PollOne, bez kterých by IoServiceScheduler byl jen skládkou depresivních tásků, které nebudou nikdy zpracovány.

Z předchozího popisu metod Run, RunOne, Poll, PollOne vyplynulo, že mají podobné odpovědnosti a liší se hlavně v tom, za jakých podmínek přestanou zpracovávat  tásky a vrátí  dočasně propůjčený thread.
Jako první si na paškál vezmeme metodu Run a podíváme na důležité testy, kterými musí metoda Run projít.
Každý test používá IoServiceScheduler, který vytvoříme takto.

private ProxyScheduler m_proxyScheduler;

private IoServiceScheduler m_scheduler;

protected override ITaskScheduler Scheduler

{

get

{

return m_scheduler;

}

}

public override void InitializeTest()

{

m_scheduler = new IoServiceScheduler();

m_proxyScheduler = new ProxyScheduler(m_scheduler);

base.InitializeTest();

}

public override void CleanupTest()

{

m_scheduler.Dispose();

m_proxyScheduler.Dispose();

base.CleanupTest();

}

Kdyby vás překvapilo, proč používáme i ProxyScheduler, znovu vás odkážu na první díl seriálu.
Ale nyní už opravdu pojďme k metodě Run.

První test ověří, že když nemáme žádné tásky ke zpracování, metoda Run ihned vrátí 0 – žádný tásk nebyl zpracován.

[TestMethod]

public void Run_When_Zero_Tasks_Added_Then_Returns_Zero()

{

var result = m_scheduler.Run();

Assert.AreEqual(0, result);

}

V dalších testech ověříme, že když předáme jeden tásk, tak metoda Run tento tásk vyřídí a vrátí hodnotu 1.

[TestMethod]

public void Run_When_One_Task_Added_Then_Returns_One()

{

const int NUMBER_OF_SCHEDULED_TASKS = 1;

m_scheduler.Dispatch(() =>

{

});

var result = m_scheduler.Run();

Assert.AreEqual(NUMBER_OF_SCHEDULED_TASKS, result);

}

[TestMethod]

public void Run_When_One_Task_Added_Then_Task_Is_Executed()

{

bool wasTaskCalled = false;

m_scheduler.Dispatch(() =>

{

wasTaskCalled = true;

});

m_scheduler.Run();

Assert.IsTrue(wasTaskCalled);

}

Také bychom měli ověřit, že když scheduler obsahuje více tásků, tak jsou všechny vyřízeny a metoda Run stále vrací správný počet vyřízených tásků.

[TestMethod]

public void Run_When_More_Tasks_Added_Then_All_Tasks_Are_Executed()

{

bool wasTask1Called = false;

bool wasTask2Called = false;

m_scheduler.Dispatch(() =>

{

wasTask1Called = true;

});

m_scheduler.Dispatch(() =>

{

wasTask2Called = true;

});

m_scheduler.Run();

Assert.IsTrue(wasTask1Called && wasTask2Called);

}

[TestMethod]

public void Run_When_Two_Tasks_Added_Then_Returns_Two()

{

const int NUMBER_OF_SCHEDULED_TASKS = 2;

Enumerable.Range(0, NUMBER_OF_SCHEDULED_TASKS)

.Select(_ => m_scheduler.Dispatch(() =>

{

})).ToArray();

var executedTasksCount = m_scheduler.Run();

Assert.AreEqual(NUMBER_OF_SCHEDULED_TASKS, executedTasksCount);

}

Napíšeme další testy, které otestují, že se metoda Run chová správně při použití objektu Work. První test ověří, že metoda Run zpracuje jeden tásk, a poté, co je na objektu Work zavolána metoda Dispose, tak metoda Run vrátí řízení a návratovou hodnotou je 1 - jeden vyřízený tásk. V metodě cancelWorkAfterTimeout si můžete všimnout, jak je objekt Work vytvářen. Nepříjemné na tomto testu je, že když bude v metodě Run chyba a metoda Run po odstranění objektu Work nevrátí řízení, test poběží tak dlouho, dokud nevyprší přidělený maximální časový interval pro provedení samotného testu.

[TestMethod]

public void Run_When_One_Task_Added_And_Cancel_Work_Then_Returns_One()

{

m_scheduler.Dispatch(() =>

{

});

cancelWorkAfterTimeout();

var result = m_scheduler.Run();

Assert.AreEqual(1, result);

}

private void cancelWorkAfterTimeout(int? sleepMs = null)

{

const int DEFAULT_SLEEP = 1000;

var sleepTime = sleepMs ?? DEFAULT_SLEEP;

var work = new Work(m_scheduler);

ThreadPool.QueueUserWorkItem(_ =>

{

Thread.Sleep(sleepTime);

work.Dispose();

});

}

Ověříme také, že metoda Run vrátí řízení po zrušení objektu Work, i když nezpracovala žádné tásky.

[TestMethod]

public void Run_When_Zero_Tasks_Added_And_Cancel_Work_Then_Returns_Zero()

{

cancelWorkAfterTimeout();

var result = m_scheduler.Run();

Assert.AreEqual(0, result);

}

Další testy kontrolují, že metoda Run vyřídí všechny tásky, poté vrátí řízení a v návratové hodnotě máme správný počet vyřízených tásků

[TestMethod]

public void Run_When_More_Tasks_Added_And_Cancel_Work_Then_All_Tasks_Are_Executed()

{

bool wasTask1Called = false;

bool wasTask2Called = false;

m_scheduler.Dispatch(() =>

{

wasTask1Called = true;

});

m_scheduler.Dispatch(() =>

{

wasTask2Called = true;

});

cancelWorkAfterTimeout();

m_scheduler.Run();

Assert.IsTrue(wasTask1Called && wasTask2Called);

}

[TestMethod]

public void Run_When_Two_Tasks_Added_And_Cancel_Work_Then_Returns_Two()

{

const int NUMBER_OF_SCHEDULED_TASKS = 2;

Enumerable.Range(0, NUMBER_OF_SCHEDULED_TASKS)

.Select(_ => m_scheduler.Dispatch(() =>

{

})).ToArray();

cancelWorkAfterTimeout();

var executedTasksCount = m_scheduler.Run();

Assert.AreEqual(NUMBER_OF_SCHEDULED_TASKS, executedTasksCount);

}

U paralelního/asynchronního kódu se občas nevyhneme  testům, které jsou nebezpečné, protože porušují některou z F.I.R.S.T zásad pro unit/integrační testy.

Další test ověřuje, že metoda Run nevrátí řízení, dokud nezruším objekt Work. Nebezpečný je proto, že objekt Work zruším po 3 sekundách a předpokládám, že doba, po kterou běží test, je delší než dvě sekundy. Testy, které pracují takto vágně s časovými intervaly, částečně porušují F a R akronymu F.I.R.S.T. Tento test určitě není rychlý (Fast), protože běží několik sekund, a také není zcela“opakovatelný” (“Repeatable”). Pragmaticky vzato je ale tento test - stejně jako některé další méně bezpečné testy v knihovně RStein.Async- velmi užitečný a dostatečně bezpečný, takže všem pohoršeným puristům se omlouvám a přeju jim jejich ideální svět. Ať hodí kamenem…Veselý obličej

[TestMethod]

public void Run_When_Work_Exists_And_Zero_Tasks_Then_Method_Does_Not_Return()

{

const int WORK_CANCEL_DELAY_MS = 3000;

const double RUN_MIN_DURATION_S = 2.0;

var time = StopWatchUtils.MeasureActionTime(() =>

{

cancelWorkAfterTimeout(WORK_CANCEL_DELAY_MS);

m_scheduler.Run();

});

Assert.IsTrue(time.TotalSeconds > RUN_MIN_DURATION_S);

}

I další test není zrovna “košer”. Ověřuje, že metoda Run vrátí řízení “ihned”, když neexistuje žádný tásk a objekt Work byl sice IoScheduleru předán, ale byl ještě před voláním metody Run zrušen. Pojem “ihned“ zde nabývá netradičního významu “řízení z metody Run musí být vráceno za méně než půl sekundy”.

[TestMethod]

public void Run_When_Work_Canceled_And_Zero_Tasks_Then_Method_Returns_Immediately()

{

const double RUN_MAX_DURATION_S = 0.5;

var work = new Work(m_scheduler);

work.Dispose();

var time = StopWatchUtils.MeasureActionTime(() => m_scheduler.Run());

Assert.IsTrue(time.TotalSeconds < RUN_MAX_DURATION_S);

}

Poslední test, na který se podíváme v tomto článku, je test, který ověřuje, že když je metoda Run současně volána z více threadů (v testu jsou použity tři thready), tak jsou všechny tásky vyřízeny a součet návratových hodnot metod Run, tedy celkový počet všech vyřízených tásků bez ohledu na to, v kterém threadu k vyřízení tásku došlo, je roven počtu tásků, který jsme do IoServiceScheduleru poslali.

[TestMethod]

public async Task Run_When_Called_From_Multiple_Threads_Then_All_Tasks_Executed()

{

const int NUMBER_OF_SCHEDULED_TASKS = 100;

const int DEFAULT_TASK_SLEEP = 100;

const int NUMBER_OF_WORKER_THREAD = 3;

var countDownEvent = new CountdownEvent(NUMBER_OF_WORKER_THREAD);

int executedTasks = 0;

var allTasks = Enumerable.Range(0, NUMBER_OF_SCHEDULED_TASKS).Select(_ => m_scheduler.Post(() => Thread.Sleep(DEFAULT_TASK_SLEEP))).ToArray();

Enumerable.Range(0, NUMBER_OF_WORKER_THREAD).Select(_ => ThreadPool.QueueUserWorkItem(__ =>

{

int tasksExecutedInThisThread = m_scheduler.Run();

Interlocked.Add(ref executedTasks, tasksExecutedInThisThread);

countDownEvent.Signal();

})).ToArray();

await Task.WhenAll(allTasks);

countDownEvent.Wait();

Assert.AreEqual(NUMBER_OF_SCHEDULED_TASKS, executedTasks);

}

Testů pro metodu  Run je více, protože je potřeba otestovat  hraniční případy, ale z ukázaných testů by mělo být zřejmé, jak metoda Run pracuje.

Podívejme se teď na kód v metodě Run.

public virtual int Run()

{

checkIfDisposed();

return runTasks(withWorkCancelToken());

}

private CancellationToken withWorkCancelToken()

{

lock (m_workLockObject)

{

return (existsWork()

? m_workCancelTokenSource.Token

: withoutCancelToken());

}

}

private CancellationToken withoutCancelToken()

{

return CancellationToken.None;

}

Metoda Run zavolá metodu runTasks, které předá CancelToken, jestliže existuje alespoň jeden objekt Work, abychom po zrušení objektu Work dále neblokovali metodou Run propůjčený thread. Jestliže objekt Work neexistuje, je předána konstanta CancellationToken.None = na zrušení objekt Work reagovat nebudeme a metoda Run vrátí řízení ihned poté, co vyřídí všechny tásky.

Metoda runTasks

private int runTasks(CancellationToken cancellationToken, int maxTasks = UNLIMITED_MAX_TASKS)

{

try

{

setCurrentThreadAsServiceAllFlags(maxTasks);

return runTasksCore(cancellationToken);

}

finally

{

resetThreadAsServiceAllFlags();

}

}

private void setCurrentThreadAsServiceAllFlags(int maxTasks)

{

resetThreadAsServiceAllFlags();

setThreadAsServiceFlag();

m_isServiceThreadFlags.Value.MaxOperationsAllowed = maxTasks;

}

private void setThreadAsServiceFlag()

{

m_isServiceThreadFlags.Value.IsServiceThread = true;

}

private void resetThreadAsServiceAllFlags()

{

m_isServiceThreadFlags.Value.ResetData();

}

Metoda runTasks kromě odkazu na CancelToken, který se má použít, má argument maxTasks. Ten udává, kolik tásků je možné nyní vyřídit. Výchozí hodnota “počet tásků není omezen” metodě Run vyhovuje.

Metoda runTasks si nejprve “přivlastní” aktuální thread.  “Přivlastněním” threadu mám na mysli to, že do threadově lokální proměnné m_isServiceThreadFlags si poznamenáme, že současný thread je nyní thread IoServiceScheduleru a že může být použit pro vyřizování tásků, a také si poznačíme, kolik tásků můžeme nyní vyřídit. Poté vyvoláme metodu runTasksCore.

V sekci finally metoda runTaks zaručí, že poté, co metoda runTasksCore doběhne, tak se vlastnictví threadu vzdáme a  v proměnné isServiceThreadFlags nastavíme voláním metody resetThreadAsServiceAllFlags() výchozí hodnoty.

V metodě runTasksCore  konečně zpracujeme existující tásky:

private int runTasksCore(CancellationToken cancellationToken)

{

bool searchForTask = true;

var usedCancellationToken = cancellationToken;

var serviceData = m_isServiceThreadFlags.Value;

while (searchForTask)

{

searchForTask = false;

m_stopCancelTokenSource.Token.ThrowIfCancellationRequested();

try

{

Task task;

if (!tryGetTask(usedCancellationToken, out task))

{

continue;

}

m_stopCancelTokenSource.Token.ThrowIfCancellationRequested();

searchForTask = TryExecuteTaskInline(task, true) && !tasksLimitReached();

m_stopCancelTokenSource.Token.ThrowIfCancellationRequested();

}

catch (OperationCanceledException e)

{

Trace.WriteLine(e);

if (m_stopCancelTokenSource.IsCancellationRequested)

{

break;

}

usedCancellationToken = CancellationToken.None;

searchForTask = !tasksLimitReached();

}

}

return serviceData.ExecutedOperationsCount;

}

private bool tryGetTask(CancellationToken cancellationToken, out Task task)

{

if (cancellationToken != CancellationToken.None)

{

return m_tasks.TryTake(out task, Timeout.Infinite, cancellationToken);

}

return m_tasks.TryTake(out task);

}

private bool tasksLimitReached()

{

var serviceData = m_isServiceThreadFlags.Value;

if ((serviceData.MaxOperationsAllowed == UNLIMITED_MAX_TASKS) ||

(serviceData.ExecutedOperationsCount < serviceData.MaxOperationsAllowed))

{

return false;

}

return true;

}

public override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)

{

checkIfDisposed();

if (!isInServiceThread())

{

return false;

}

if (tasksLimitReached())

{

return false;

}

bool taskExecutedNow = false;

try

{

m_isServiceThreadFlags.Value.ExecutedOperationsCount++;

taskExecutedNow = task.RunOnProxyScheduler();

}

finally

{

if (!taskExecutedNow)

{

m_isServiceThreadFlags.Value.ExecutedOperationsCount--;

}

}

return taskExecutedNow;

}

Metoda runTasksCore  používá předaný cancellationToken a do proměnné serviceData si uloží odkaz na m_isServiceThreadFlags, protože  potřebujeme vědět, kolik tásků jsme již zpracovali a kontrolovat, jestli jsme nepřekročili maximální počet tásků. Cyklus while běží do té doby, dokud máme hledat a zpracovávat další tásk. Proměnná searchForTask je inicializována na true, takže se vnoříme do cyklu, ve kterém ihned proměnnou searchForTask nastavíme na false, protože nevíme, jestli další tásky existují.

Metoda tryGetTask se pokusí vrátit další tásk. Všimněte si, že na kolekci m_tasks zavoláme metodu m_tasks.TryTake(out task, Timeout.Infinite, cancellationToken), která vrátí stávající tásk v kolekci, nebo zablokuje thread do té doby, dokud nebude do kolekce tásk přidán anebo dokud nebude stornován cancellationToken. Tato varianta metody TryTake se použije, když existuje objekt Work a cancellationToken tedy nemá hodnotu CancellationToken.None. Jestliže objekt Work neexistuje, použijeme na kolekci metodu m_tasks.TryTake(out task), která buď ihned vrátí tásk, nebo zjistí, že kolekce je prázdná a proměnnou tásk nastaví na hodnotu null. K blokaci threadu ale nikdy nedojde.

Vraťme se do metody runTasksCore. Jestliže nebyl tásk nalezen, vrátíme se na začátek cyklu while - proměnná searchForTask má hodnotu false, a proto cyklus while i metoda runTasksCore skončí. Když je tásk nalezen, pokusíme se ho pomocí metody TryExecuteTaskInline spustit. Metoda TryExecuteTaskInline vždy ověří, že jsme v threadu, který nyní patří IoServiceScheduleru, a zkontroluje, že jsme nepřekročili maximální počet tásků, které můžeme v tomto vlákně vyřídit. Možná se divíte, proč kontroluju, že jsme v threadu IoServiceScheduleru, když jsme dříve tento příznak nastavili. Nezapomeňte, že metodu TryExecuteTaskInline používá i TPL a že může být vyvolána při vytváření tásku, kdy žádný thread “nevlastníme”.
Metoda TryExecuteTaskInline se pokusí přes ProxyScheduler tásk spustit. Jestliže byl tásk vykonán, tak na threadově lokální proměnné m_isServiceThreadFlags inkrementuje počet již zpracovaných tásků.

Metoda runTasksCore dovolí zpracovat další tásk jen tehdy, jestliže metoda TryExecuteTaskInline tásk úspěšně spustila a současně nebylo dosaženo maximálního počtu tásků, které lze zpracovat.
searchForTask = TryExecuteTaskInline(task, true) && !tasksLimitReached();

Všimněte si, že v metodě runTasksCore na několika místech kontrolujeme, jestli nemáme ukončit zpracování tásků, protože byla volána metoda Dispose IoServiceScheduleru.

m_stopCancelTokenSource.Token.ThrowIfCancellationRequested();

Tato opakovaná kontrola může být drahá a měli bychom pomocí výkonnostních testů zjistit, jestli si tolik kontrol můžeme dovolit. Frekvence kontroly stavu CancelTokenu by měla být kompromisem mezi tím, že zareagujeme v našem kódu na stornování operace velmi rychle, ale současně neplatíme za toto časté monitorování příliš velké výkonnostní penále. Tipnul bych si, že na tomto místě  jsou v IoServiceScheduleru ještě výkonnostní rezervy, ale jasno budeme mít  až po spuštění profileru. V této fázi se snažíme hlavně o to, aby chování IoServiceScheduleru bylo v souladu se zadáním. Pustit profiler můžeme kdykoli později.

Sekce catch v runTasksCore reaguje na stornování CancelTokenu, resp. CancelTokenů. Připomeňme si ji.

....

catch (OperationCanceledException e)

{

Trace.WriteLine(e);

if (m_stopCancelTokenSource.IsCancellationRequested)

{

break;

}

usedCancellationToken = CancellationToken.None;

searchForTask = !tasksLimitReached();

}

}

....

Jestliže byl stornován m_m_stopCancelTokenSource.Token,  k čemuž dojde po volání metody Dispose, tak přes klíčové slovo break rychle ukončíme další zpracování tásků. Když je ale výjimka OperationCanceledException vyvolána pro CancelToken, který jsme dostali jako argument metody  - a u metody Run víme, že CancelToken předaný do metody runTaksCore reprezentuje to, že existuje alespoň jeden objekt Work – pokračujeme ve zpracování tásků, jen předtím nastavíme CancelToken používaný pro získání tásku z kolekce m_tasks na hodnotu CancellationToken.None, protože bez existence objektu Work už nemáme právo blokovat propůjčené vlákno v metodě tryGetTask.

Metoda runTaksCore po dokončení cyklu while vrátí počet zpracovaných tásků.

return serviceData.ExecutedOperationsCount;

Stejnou hodnotu vrátí svým klientům i veřejná metoda Run, ze které jsme vyšli.

A tady je odměna. I když vám může být z toho pitvání vnitřností metody Run špatně, nejste v tom sami, a i všechny testy pro metodu Run mají zelenou barvu. Veselý obličej

image

Odměnou nám ale spíš bude to, že metody RunOne, Poll a PollOne můžeme napsat s využitím metody runTasksCore.

U metody RunOne víme, že bez ohledu na počet tásků čekajících na vyřízení, musí vždy vyřídit maximálně jeden tásk. Zde je ukázka několika testů.

[TestMethod]

public void RunOne_When_More_Tasks_Added_Then_Only_First_Task_Is_Executed()

{

bool wasTask1Called = false;

bool wasTask2Called = false;

m_scheduler.Dispatch(() =>

{

wasTask1Called = true;

});

m_scheduler.Dispatch(() =>

{

wasTask2Called = true;

});

m_scheduler.RunOne();

Assert.IsTrue(wasTask1Called && !wasTask2Called);

}

[TestMethod]

public void RunOne_When_Two_Tasks_Added_Then_Returns_One()

{

const int NUMBER_OF_SCHEDULED_TASKS = 2;

const int NUMBER_OF_RUNNED_TASKS = 1;

Enumerable.Range(0, NUMBER_OF_SCHEDULED_TASKS)

.Select(_ => m_scheduler.Dispatch(() =>

{

})).ToArray();

var executedTasksCount = m_scheduler.RunOne();

Assert.AreEqual(NUMBER_OF_RUNNED_TASKS, executedTasksCount);

}

[TestMethod]

public void RunOne_When_More_Tasks_Added_And_Cancel_Work_Then_Only_First_Task_Is_Executed()

{

bool wasTask1Called = false;

bool wasTask2Called = false;

m_scheduler.Dispatch(() =>

{

wasTask1Called = true;

});

m_scheduler.Dispatch(() =>

{

wasTask2Called = true;

});

cancelWorkAfterTimeout();

m_scheduler.RunOne();

Assert.IsTrue(wasTask1Called && !wasTask2Called);

}

[TestMethod]

public void RunOne_When_Two_Tasks_Added_And_Cancel_Work_Then_Returns_One()

{

const int NUMBER_OF_SCHEDULED_TASKS = 2;

const int RUNNED_TASKS = 1;

Enumerable.Range(0, NUMBER_OF_SCHEDULED_TASKS)

.Select(_ => m_scheduler.Dispatch(() =>

{

})).ToArray();

cancelWorkAfterTimeout();

var executedTasksCount = m_scheduler.RunOne();

Assert.AreEqual(RUNNED_TASKS, executedTasksCount);

}

Také bychom měli ověřit pomocí dalších "ne-zcela-bezpečných" testů, že metoda RunOne nevrátí řízení do té doby, dokud nevyřídí alespoň jeden tásk.

//Unsafe test

[TestMethod]

public void RunOne_When_Zero_Tasks_Then_Method_Does_Not_Return()

{

const int SCHEDULE_WORK_AFTER_MS = 3000;

const double RUN_MIN_DURATION_S = 2.0;

var time = StopWatchUtils.MeasureActionTime(() =>

{

scheduleTaskAfterDelay(SCHEDULE_WORK_AFTER_MS);

m_scheduler.RunOne();

});

Assert.IsTrue(time.TotalSeconds > RUN_MIN_DURATION_S);

}

//Unsafe test

[TestMethod]

public void RunOne_When_Work_Canceled_And_Zero_Tasks_Then_Method_Does_Not_Return()

{

const int SCHEDULE_WORK_AFTER_MS = 3000;

const double RUN_MIN_DURATION_S = 2.0;

var work = new Work(m_scheduler);

work.Dispose();

var time = StopWatchUtils.MeasureActionTime(() =>

{

scheduleTaskAfterDelay(SCHEDULE_WORK_AFTER_MS);

m_scheduler.RunOne();

}

);

Assert.IsTrue(time.TotalSeconds > RUN_MIN_DURATION_S);

}

//Unsafe test

Myslím, že princip činnosti metody RunOne je zřejmý, takže můžeme přistoupit k napsání metody RunOne.

public const int POLLONE_RUNONE_MAX_TASKS = 1;

public virtual int RunOne()

{

checkIfDisposed();

return runTasks(withGlobalCancelToken(), POLLONE_RUNONE_MAX_TASKS);

}

private CancellationToken withGlobalCancelToken()

{

return m_stopCancelTokenSource.Token;

}

To je skutečně vše. Zavoláme metodu runTasks, předáme jí m_stopCancelTokenSource.Token, který se bude používat při vyzvedávání tásků z kolekce m_tasks, a omezíme počet vyřízených tásků na jeden. Když metodu RunOne spustíte v době, kdy IoServiceScheduler žádné tásky neobsahuje, pak počká buď na to, až bude tásk do scheduleru přidán, nebo až metoda Dispose stornuje m_stopCancelTokenSource.Token. Jestliže je v IoServiceScheduleru po zavolání metody RunOne alespoň jeden tásk k vyřízení, metoda jej zpracuje a vrátí řízení.

Testy pro metody Poll a PollOne si můžete projít sami.
Metoda Poll pracuje podobně jako metoda Run, jen  nikdy nezablokuje stávající thread a po vyřízení všech čekajících tásků ihned svou činnost ukončí.

public virtual int Poll()

{

checkIfDisposed();

return runTasks(withoutCancelToken());

}

Metodě runTasks nepředáme CancelToken, takže vyzvednutí tásku z kolekce m_tasks nebude blokující. Připomenu, že výchozí hodnota druhého argumentu maxTasks metody runTasks je "počet není omezen".

Metoda PollOne stejně jako metoda RunOne vyřídí maximálně jeden tásk, ale pokud žádný tásk v IoServiceScheduleru není, tak nikdy neblokuje thread a ihned vrátí řízení.

public virtual int PollOne()

{

checkIfDisposed();

return runTasks(withoutCancelToken(), maxTasks: POLLONE_RUNONE_MAX_TASKS);

}

Komentář  k metodě PollOne už asi není třeba. Pokud tápete, doporučuju si projít testy pro metodu PollOne.

IoServiceScheduler je funkční, ještě nám zbývá vytvořit metody Dispatch, Post a Wrap z rozhrani IAsioTaskService.

Metoda Dispatch.

public virtual Task Dispatch(Action action)

{

checkIfDisposed();

if (action == null)

{

throw new ArgumentNullException("action");

}

var task = Task.Factory.StartNew(action,

CancellationToken.None,

TaskCreationOptions.None,

ProxyScheduler.AsTplScheduler());

return task;

}

public virtual Task Dispatch(Func<Task> function)

{

checkIfDisposed();

if (function == null)

{

throw new ArgumentNullException("function");

}

var task = Task.Factory.StartNew(function,

CancellationToken.None,

TaskCreationOptions.None,

ProxyScheduler.AsTplScheduler()).Unwrap();

return task;

}

public override void QueueTask(Task task)

{

checkIfDisposed();

m_tasks.Add(task);

}

private bool isInServiceThread()

{

return m_isServiceThreadFlags.Value.IsServiceThread;

}

Metoda Dispatch vytvoří z předaného delegáta Task pomocí TaskFactory z TPL a jako cílový scheduler předá vlastní ProxyScheduler, takže TPL nakonec použije metody TryExecuteTaskInline a QueueTask z našeho IoServiceScheduleru.

Jak jsem psal výše, metoda Dispatch může spustit delegáta  ihned v aktuálním threadu (“inline”), jestliže je sama zavolána v  threadu, který je nyní propůjčen IoServiceScheduleru. Taková situace nastane vždy, když se tásk běžící v IoServiceScheduleru snaží přes metodu Dispatch do stejné instance IoServiceScheduleru přidat další tásk. Projdete-li si znovu kód metody TryExecuteTaskInline, který je v gistu výše v tomto článku, uvidíte, že metoda dovolí spuštění tásku “inline”, jestliže metoda isInServiceThread vrátí true.

Metoda Post stejně jako Dispatch vytvoří nový tásk, ale musí u ní platit, že nikdy nedovolí vykonání tásku "inline".

public virtual Task Post(Action action)

{

checkIfDisposed();

if (action == null)

{

throw new ArgumentNullException("action");

}

return postInner(() => Dispatch(action));

}

public virtual Task Post(Func<Task> function)

{

checkIfDisposed();

if (function == null)

{

throw new ArgumentNullException("function");

}

return postInner(() => Dispatch(function));

}

private Task postInner(Func<Task> dispatcher)

{

bool oldIsInServiceThread = m_isServiceThreadFlags.Value.IsServiceThread;

try

{

clearCurrentThreadAsServiceFlag();

return dispatcher();

}

finally

{

m_isServiceThreadFlags.Value.IsServiceThread = oldIsInServiceThread;

}

}

Metoda Post používá metodu Dispatch, ale v metodě postInner ještě před voláním metody Dispatch vždy dočasně odstraníme u současného threadu příznak, že jde o thread vlastněný IoServiceSchedulerem (clearCurrentThreadAsServiceFlag()), a proto metoda TryExecuteTaskInline nepovolí vykonání delegáta "inline".

Metody Wrap slouží k vytvoření delegáta, pomocí kterého vytvoříte později tásk v IoServiceScheduleru, na kterém byla metoda Wrap volána. Kromě metod Wrap, které vracejí delegáta Action, jsem do IoServiceScheduleru přidal metody WrapAsTask, které vracejí delegáta Func<Task> a dovolují tak nejen zařadit nový tásk ke zpracování, ale také počkat na dokončení tásku. Metody Wrap i WrapAsTask používají již popsanou metodu Dispatch.

public virtual Action Wrap(Action action)

{

checkIfDisposed();

if (action == null)

{

throw new ArgumentNullException("action");

}

return () => Dispatch(action);

}

public virtual Action Wrap(Func<Task> function)

{

checkIfDisposed();

if (function == null)

{

throw new ArgumentNullException("function");

}

return () => Dispatch(function);

}

public virtual Func<Task> WrapAsTask(Action action)

{

checkIfDisposed();

if (action == null)

{

throw new ArgumentNullException("action");

}

return () => Dispatch(action);

}

public virtual Func<Task> WrapAsTask(Func<Task> function)

{

checkIfDisposed();

if (function == null)

{

throw new ArgumentNullException("function");

}

return () => Dispatch(function);

}

Jestliže by vám nebylo jasné, jak metoda Wrap pracuje, nezbývá mi, než znovu podotknout, že se můžete podívat na testy metody Wrap v IoServiceScheduleru.
Vím, že tento díl byl hodně dlouhý a únavný (na počátku jste byli varováni! Mrkající veselý obličej), ale ještě horší mi přišlo rozdělit povídání o IoServiceScheduleru do několika dílů.  Jak jsem sliboval v úvodu, další díly by už měly být stravitelnější.

Už příště se podíváme, jak můžeme využít IoServiceScheduler při psaní “coroutines”, a v článku  s pořadovým číslem čtyři také zjistíme, že napsat ThreadPoolScheduler, který používá na těžkou práci  IoServiceScheduler, je triviální problém na pár řádků. U ThreadPoolScheduleru také poznáme, jak se dají IoServiceScheduleru propůjčit některé thready na delší dobu. A ani potom s IoServiceSchedulerem ještě neskončíme, protože se nám bude hodit i při řešení problémů v dalších dílech seriálu.



Tuesday, 03 June 2014 11:02:00 (Central Europe Standard Time, UTC+01:00)       
Comments [0]  C#


Comments are closed.