Hrátky s Reaktivním frameworkem (RX extenze)
V předchozím článku jsem ukazoval, jak volat asynchronně metody z C# Posterous API v Silverlightu. C# Posterous API nabízí asynchronní zpracování pomocí jednoho z doporučovaného přístupu k asynchronním operacím v .Net Frameworku – metoda s konvenčním sufixem Async (LoadPostsAsync) spustí vykonání operace v jiném vlákně a výsledky operace jsou nabídnuty v argumentech události, která je (opět) jen dle jmenné konvence spojena s asynchronní operací (událost LoadPostsCompleted). C# Posterous API nenabízí ve svém rozhraní metody pro podporu dalšího a již od verze 1.0 .Net Frameworku přítomného asynchronního vzoru, který je spojen s dvojicí metod začínajících prefixem Begin a End. (BeginGetRequest, EndGetRequest, BeginRead, EndRead apod.)
Dále předpokládám, že oba přístupy k vytváření asynchronních opreací znáte a že jste si vědomi i toho, jak se způsob práce s asynchronními API odlišuje od práce s běžnými synchronnními metodami.
V již odkazovaném článku bylo dobře patrné, jak je řízení toku asynchronních operaci odlišné od sady volání běžných synchronních operací.
Pro připomenutí:
posterousAccount.SitesLoaded += (o, e) =>
{
throwIfAsyncEx(e.Exception);
posterousAccount.PrimarySite.PostsLoaded += (_, e2) =>
{
throwIfAsyncEx(e2.Exception);
Posts = (from p in e2.Value
select new ViewPost
{
Title = p.Title,
Body = p.Body,
Url = p.Url
}).ToList();
};
posterousAccount.PrimarySite.LoadAllPostsAsync();
};
posterousAccount.LoadSitesAsync();
Jediné, co tento kód dělá, je, že nejprve (!) nahraje všechny blogy (příkaz k asynchronnímu nahrání posterousAccount.LoadSitesAsync(); je na posledním (!) řádku. Na prvním (!) řádku máme zpracování výsledku volání metody LoadSitesAsync, ve kterém opět nejdříve (!) lambdou přihlášenou k odběru události PostsLoaded (posterousAccount.PrimarySite.PostsLoaded += (_, e2)) řekneme, jak zpracujeme výsledek následného (!) volání další asynchronní metody (posterousAccount.LoadSitesAsync());. Tato “inverzní“ práce s asynchronními metodami a zpracováním jejich výsledku je na hony a možná ještě dále vzdálena intuitivní práci se synchronními metodami.
Zkusme se nyní podívat, jak by nám s “převrácením starších asynchronních metod z hlavy zpět na synchronní nohy” mohl pomoci RX Framework. Úplné základy v tomto článku nezazní a začátečníky odkazuji na sérii přednášek na Channel 9, kde dozvíte i zajímavé podrobnosti o genezi celého RX Frameworku a matematické dualitě rozhraní IEnumerable a IObservable (jinými slovy o společných rysech dobře známých GoF návrhových vzorů Iterátor a Observer).
Současné příklady jsou vytvořeny v aplikaci Windows Forms pro .Net 3.5. Silverlight má své zvláštnosti a a rozchození příkladů v SL si zaslouží další článek, protože teď by řešení problémů specifických pro SL zamlžovalo cíl příkladu. Aplikace je pro .Net 3.5, protože stejná aplikace pro .Net 4.0 hlásí konflikt (ambiguous reference) mezi NF typy a RX typy.
Upozornění: Nic z toho, co napíšu neberte ani jako dogmata ani, nedej bože, jako best practices. RX Framework je v Betě, zdokumentován je mizerně a z jednoho řádku u každé metody se dá jen těžko bez dalších experimetů vytušit, co přesně metoda dělá. Tento článek je výsledkem hraní si pro účely jednoho projektu, kam se RX extenze hodí a zjednodušují (alespoň to tak prozatím vypadá ) dost rutinních činností.
Zde j výsledek našeho snažení, abychom měli motivaci se RX Frameworkem zabývat.
var resultPosts = from sites in account.GetSites()
from site in sites.ToObservable()
from posts in site.GetPosts()
from post in posts.ToObservable()
where post.Private == false
select post;
Získání blogů (Sites) i blogpostů (post) je stále asynchronní, ale výsledný kód vypadá jako běžný LINQ (To Enumerable) dotaz. Žádné inverzní volání a práce s výsledkem, jen prostý dotaz, jehož zvláštností je pouze to, že v některých místech voláme metodu ToObservable.
Jak jsem dosáhl tohoto výsledku?
Podíváme-li se na první řádek, vidíme, že voláme metodu account.GetSites. Metoda GetSites součástí C# Posterous API není a jedná se o extenzní metodu. Tato extenzní metoda je zvláštní tím, že její návratovou hodnotou je je jedno z klíčových rozhraní v RX Frameworku – rozhraní IObservable<T>.
public static IObservable<IEnumerable<IPosterousSite>> GetSites(this IPosterousAccount account)
Rozhrani IObservable má v RX Frameworku podobný význam jako rozhraní IEnumerable v celém .Net Frameworku. Zjednodušeně můžeme rozhraní IObservable popsat jako ceduli, kterou třída implementující rozhraní dává celému světu najevo: “Miluju voyery, jestliže chcete sledovat, co se ve mně děje, dejte mi sem pozorovatele a já na sebe všechno podstatné, co se od této chvíle stane, postupně vyzvoním ”.
Rozhraní IObservable je tedy příslib, že zainteresovaný pozorovatel dostane data, která třída podporující toto rozhraní nabízí. Svůj zájem pozorovatel deklaruje tak, že předá odkaz sám na sebe do metody Subscribe.
public interface IObservable<T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Pozorovatel (IObserver) reaguje (proto reaktivní framework) na informace, které jsou mu poskytnuty objektem podporujícím rozhraní IObservable.
public interface IObserver<T>
{
void OnCompleted();
void OnNext(T value);
void OnError(Exception exn);
}
Metoda OnNext je na IObserver volána vždy, když Observable objekt má k dispozici další data. Metodou OnError Observable objekt signalizuje chyby a metodou OnCompleted Observeru říká “jsem u konce, nic dalšího už pro tebe nemám”.
Naše metoda GetSites tedy říká – zavolejte mě a já vám nabídnu IObservable objekt, který, až budou data k dispozici, vašemu observeru (IObserver) vydá kolekci (IEnumerable) objektů IPosterousSite.
Extenzní metoda GetSites vypadá takto:
public static IObservable<IEnumerable<IPosterousSite>> GetSites(this IPosterousAccount account)
{
checkAccountNotNull(account);
var sitesEvents = Observable.FromEvent<EventArgsValue<IEnumerable<IPosterousSite>>>(handler => account.SitesLoaded += handler,
handler => account.SitesLoaded -= handler)
.Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT);
return sitesEvents.GetFinalObservableEvents(account.LoadSitesAsync);
}
Po kontrole, zda předaný IPosterousAccount není null, využijeme pomocnou metodu Observable.FromEvent z RX Frameworku, která nám vrátí IObservable objekt. Tento IObservable objekt notifikuje případného observera o každé nastalé události sites.Loaded. V našem případě Observera notifikuje o právě jedné události, protože jsme použili metodu Take (Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT)) a konstanta DEFAULT_TAKE_EVENTS_COUNT má hodnotu 1. Jak si můžete všimnout, metoda FromEvent nám dovoluje s událostmi, které postupně nastávají, zacházet jako (s potenciálně nekonečnou) kolekcí hodnot. Metodě FromEvent jsme pouze museli říct, jaká třída nese argumenty událost (EventArgsValue<IEnumerable<IPosterousSite>) a poskytli jsme ji dva delegáty pro registraci/deregistraci obslužných handlerů, které nám RX Framework předá (handler => account.SitesLoaded += handler, handler => account.SitesLoaded –= handler). U našeho volání metody Take bych ještě poznamenal, že po vyvolání první události dojde automaticky RX Frameworkem k deregistraci obslužného handleru.
Proměnná sitesEvents je IObserver tohoto typu.
IObservable<IEvent<EventArgsValue<IEnumerable<IPosterousSite>>>>
Argumenty události jsou vždy zabaleny do instance třidy IEvent, která je vydána zaregistrovanému observeru v jeho metodě OnNext. Všimněte si ale, že návratovou hodnotou metody GetSites je již Observable, který observeru předá hodnoty bez IEvent (IObservable<IEnumerable<IPosterousSite>>).
Vidíme, že na sitesEvents je volána další má extenzní metoda GetFinalObservableEvents, které je předán delegát Action ukazující na asynchronní metodu account.LoadSitesAsync, a výsledek volání GetFinalObservableEvents je vrácen klientovi.
Metoda GetFinalObservableEvents:
public static IObservable<TEventData> GetFinalObservableEvents<TEventData>(this IObservable<IEvent<EventArgsValue<TEventData>>> sourceEvents, Action runAction)
{
if (sourceEvents == null)
{
throw new ArgumentNullException("sourceEvents");
}
var retObservable = new DelegateObservable<TEventData>(
observer =>
{
var eventObserver = new EventObserver<TEventData>(observer);
var unsubScribe = sourceEvents.Subscribe(eventObserver);
runAction();
return unsubScribe;
});
return retObservable;
}
Metoda GetFnalObservableEvents vrací opět Observable, ale tentokrát jde o Observable typu IObservable<TEventData> - jinými slovy, v našem případě IObservable<IEnumerable<IPosterousSite>>. Jak je toho dosaženo? Zdrojový IObservable objekt nazvaný sourceEvents je předán instanci třídy DelegateObservable, což je v současném scénaři již ten hledaný Observable podporující rozhraní IObservable<IEnumerable<IPosterousSite>>. DelegateObservable je tedy adaptér, který převádí události zabalené do IEvent na “rozbalené” hodnoty očekávané observerem. DelegateObservable je můj pomocný IObservable, který dostává do konstruktoru lambdu představující tělo jeho metody Subscribe, abychom nemuseli reimplementovat rozhraní IObservable v různých třídách stále dokola.
Výpis třídy DelegateObservable
public class DelegateObservable<T> : IObservable<T>
{
private readonly Func<IObserver<T>, IDisposable> m_subscribeDel;
public DelegateObservable(Func<IObserver<T>, IDisposable> subscribeDelegate)
{
m_subscribeDel = subscribeDelegate;
if (m_subscribeDel == null)
{
throw new ArgumentNullException("subscribeDelegate");
}
}
#region Implementation of IObservable<out T>
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
return m_subscribeDel(observer);
}
#endregion
}
Předaná lambda v našem případě vytvoří instanci třídy eventsObserver, což je observer, který bude zpracovávat přicházející události, a do konstruktoru mu podhodí observer předaný klientským kódem – eventsObserver je tedy další adaptér, který je zdopovědný za “rozbalení” dat z instance IEvent a za předání těchto dat klientskému (“konečnému”) observeru.
Třída EventObserver:
public class EventObserver<T> : IObserver<IEvent<EventArgsValue<T>>>
{
private readonly IObserver<T> m_innerObserver;
private bool m_exceptionOccured;
public EventObserver(IObserver<T> innerObserver)
{
if (innerObserver == null)
{
throw new ArgumentNullException("innerObserver");
}
m_innerObserver = innerObserver;
m_exceptionOccured = false;
}
#region Implementation of IObserver<T>
public virtual void OnNext(IEvent<EventArgsValue<T>> eventData)
{
if (eventData.EventArgs.Exception != null)
{
OnError(eventData.EventArgs.Exception);
return;
}
//Rozbalení a předání dat Observeru
m_innerObserver.OnNext(eventData.EventArgs.Value);
}
public virtual void OnError(Exception exception)
{
m_innerObserver.OnError(exception);
m_exceptionOccured = true;
}
public virtual void OnCompleted()
{
//Chyba ukončí sekvenci sama o sobě
if (!m_exceptionOccured)
{
m_innerObserver.OnCompleted();
}
}
Třída EventObserver implementuje rozhraní IObservable s těmito generickými argumenty - IObserver<IEvent<EventArgsValue<T>>> . V C# Posterous API všechny události předávají svá data v instanci třídy EventArgsValue<T>, což znamená, že pro naše účely je EventObserver univerzálně použitelný observer pro zpracování výsledků asynchronní operace.
Pro úplnost zde je výpis třídy EventArgsValue
public class EventArgsValue<T> : EventArgs
{
private readonly T m_value;
private readonly Exception m_exception;
internal EventArgsValue(T value, Exception exception)
{
m_value = value;
m_exception = exception;
}
public T Value
{
get
{
return m_value;
}
}
public Exception Exception
{
get
{
return m_exception;
}
}
}
V lambdě předané do objektu DelegateObservable také musíme spustit asynchronní operaci – to je volání Action delegáta runAction, který nám předala již metoda GetSites. Každý IObservable také z metody vrací objekt implementující rozhraní IDisposable – volání metody Dispose dovoluje klientovi odpojit se objektu IObservable. Lambda předaná do instance DelegateObservable vrátí IDisposable objekt, který je vydán po připojení EventObservera k “streamu události“ (sourceEvents).
Přidání extenzních metod k dalším třídám je triviální – zde je extenzní metoda pro IPosterousSite, která nahraje všechny blogposty.
public static IObservable<IEnumerable<IPosterousPost>> GetPosts(this IPosterousSite site)
{
throwIfSiteNull(site);
var postsEvents = Observable.FromEvent<EventArgsValue<IEnumerable<IPosterousPost>>>(handler => site.PostsLoaded += handler,
handler => site.PostsLoaded -= handler)
.Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT);
return postsEvents.GetFinalObservableEvents(site.LoadAllPostsAsync);
}
A nyní se můžeme znovu podívat, jak naše API použijeme v klientském kódu – díky alternativní implementaci Query vzoru v RX frameworku můžeme používat staré dobré známé LINQ dotazy.
Metoda loadPosts:
private void loadPosts()
{
toolStripStatusLabel1.Text = TEXT_LOAD_DATA_START;
IPosterousApplication app = PosterousApplication.Current;
IPosterousAccount account = app.GetPosterousAccount("<Posterous user name>", "Posterous password");
var syncContext = SynchronizationContext.Current;
var resultPosts = from sites in account.GetSites()
from site in sites.ToObservable()
from posts in site.GetPosts()
from post in posts.ToObservable()
where post.Private == false
select post;
resultPosts.Subscribe(post =>
{
lock (m_threadsSet)
{
m_threadsSet.Add(Thread.CurrentThread.ManagedThreadId);
}
syncContext.Post(_ =>
{
var UCpost = new UC_Post
{
Title = post.Title,
Body = post.Body
};
flowLayoutPanel1.Controls.Add(UCpost);
},
null
);
},
ex => syncContext.Post(_ =>
{
throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex);
},
null),
() => syncContext.Post(_ =>
{
lock (m_threadsSet)
{
m_threadsSet.Run(tId => lstThreads.Items.Add(tId));
}
toolStripStatusLabel1.Text = TEXT_LOAD_DATA_END;
},
null
));
}
V proměnné resultPosts jsou uloženy všechny blogposty (IPosterousPost) ze všech blogů (IPosterousSite). Blogy i blogposty jsou nahrány asynchronně, ale v klientském kódu nevidíme žádná specialitky kvůli asynchronnímu nahrávání dat. Na proměnných sites i posts v dotazu je volána další extenzní metoda z RX Frameworku ToObservable, protože jak víme, výsledkem volání asynchronních metod byly typy IEnumerable<IPosterousPost> a IEnumerable<IPosterousSite>.
Důležité je, že zpracování dotazu je opět “lazy” – to znamená, že k získání dat dojde až poté, co k resultsPosts zaregistruju svého Observera metodou Subscribe (analogie “Lazy” vyhodnocování v LINQ To IEnumerable a procházení dotazu v cyklu foreach). Metoda Subscribe má několik variant a jedna z nich nám dovoluje pro metody OnNext, OnError a OnCompleted předat delegáty, aniž bychom byli nuceni vytvářet svou třídu implementující rozhraní IObserver.
První delegát (OnNext) vezme předaný post a vytvoří pro něj UserControl, který vloží do FlowPanelu na formuláři. Ještě předtím pro zajímavost do Hashsetu ukládám identifikátory vláken, které se na zpracování dotazu podílí. S prvky na formuláři můžeme pracovat jen z UI threadu, a proto je vložení User controlu provedeno přes SynchronizationContext uložený do proměnné syncContext před spuštěním dotazu.
Druhý delegát (OnError) pouze přes SynchronizationContext zpropaguje výjimku, která nastala při asynchronním zpracování, do UI threadu. Všimněte si, jak je zpracování výjimek jednoduché – rozdíl vynikne při srovnání s opakovaným voláním metody throwIfAsyncEx v kódu na začátku tohoto článku.
Třetí delegát (OnCompleted) naplní listbox na formuláři ID použitých threadů a změní text ve status baru.
Zde je výsledný formulář. V listboxu nahoře si můžete všimnout, že u mě byly k vykonání dotazu použity celkem 3 thready.
Tím bychom mohli skončit, ale RX Framework má pro asynchronní operace ještě další zajímavou podporu. Pomocí metody Observable.FromAsyncPattern můžeme vytvořit IObservable rychle a bezpracně ze standardního a výše již zmíněného asynchronního Begin/End vzoru. V C# Posterous API metody Begin*/End* nejsou, proto je zkusme dodat pomocí extenzních metod.
Rozhraní IPosterousAccount bude obohaceno o extenzní metody BeginLoadSites a EndLoadSite.
Metoda BeginLoadSites
public static IAsyncResult BeginLoadSites(this IPosterousAccount account, AsyncCallback callback, object context)
{
checkAccountNotNull(account);
var loadSiteAction = new Action(account.LoadSites);
return RXEventsHelper.GetAsyncResultEx(loadSiteAction, callback, context);
}
Jak vidíte, přesně dle konvencí .Net vzoru metoda vrací odkaz na rozhraní IAsyncResult a přijímá callBack, což je tedy u tohoto vzoru metoda, která má být vyvolána po dokončení asynchronního zpracování, a jak také vzor vyžaduje, posledním argumentem je libovolný objekt reprezentující libovolný “stavový token” operace, který v metodě End* klient používá pro korelací mezi požadavkem a odpovědí.
Veškerá práce je přenesena na metodu GetAsyncResultEx v mém RXEventsHelperu – metoda vyžaduje, abyste ji poslali v delegátu Action metodu, která má být spuštěna asynchronně.
Metoda RXEventsHelper.GetAsyncResultEx.
public static IAsyncResult GetAsyncResultEx(Action runAction, AsyncCallback callback, object context)
{
if (runAction == null)
{
throw new ArgumentNullException("runAction");
}
Exception ex = null;
var proxyCallback = new AsyncCallback(ar =>
{
IAsyncResult proxyResult = new AsyncResultEx(ar, runAction);
callback(proxyResult);
});
return runAction.BeginInvoke(proxyCallback, context);
}
Hlavním trikem je využití možností delegátů – každý delegát v .Net Frameworku vždy obsahuje asynchronní metody BeginInvoke a EndInvoke, které splňují nároky asynchronního vzoru. My tedy na předaném delegátu runAction zavoláme metodu BeginInvoke, ale místo klientské callBack Funkce podhodíme svou proxy funkci (proxyCallback), která po dokončení asynchronního volání připraví pro naši End metodu vlastní IAsyncResult (AsyncResultEx).
Třída AsyncResultEx zapouzdřuje původní IAsyncResult (argument ar předaný do konstruktoru v předešlém výpisu) a navíc, když na její instanci zavoláme metodu EndAction, na předaném delegátovi (argument runAction v předešlém výpisu) je zavolána metoda EndInvoke, čehož využije naše metoda EndLoadSites.
Třída AsyncResultEx
public class AsyncResultEx : IAsyncResult
{
#region private variables
private IAsyncResult m_originalAsyncResult;
private readonly Action m_originaldelegate;
#endregion private variables
public AsyncResultEx(IAsyncResult origAsyncResult, Action originaldelegate)
{
if (origAsyncResult == null)
{
throw new ArgumentNullException("origAsyncResult");
}
m_originalAsyncResult = origAsyncResult;
m_originaldelegate = originaldelegate;
}
#region properties
public virtual IAsyncResult OriginalAsyncResult
{
get
{
return m_originalAsyncResult;
}
}
public virtual Action OriginalDelegate
{
get
{
return m_originaldelegate;
}
}
#endregion properties
#region methods
public virtual void EndAction()
{
if (OriginalDelegate != null)
{
OriginalDelegate.EndInvoke(OriginalAsyncResult);
}
}
#endregion methods
#region Implementation of IAsyncResult
public virtual bool IsCompleted
{
get
{
return m_originalAsyncResult.IsCompleted;
}
}
public virtual object AsyncState
{
get
{
return m_originalAsyncResult.AsyncState;
}
}
public virtual WaitHandle AsyncWaitHandle
{
get
{
return m_originalAsyncResult.AsyncWaitHandle;
}
}
public virtual bool CompletedSynchronously
{
get
{
return m_originalAsyncResult.CompletedSynchronously;
}
}
#endregion
}
Extenzní metoda EndLoadSites
public static IEnumerable<IPosterousSite> EndLoadSites(this IPosterousAccount account, IAsyncResult result)
{
checkAccountNotNull(account);
var exResult = result as AsyncResultEx;
if (exResult == null)
{
throw new ArgumentException("result");
}
exResult.EndAction();
return account.Sites;
}
Zde vidíme volání metody EndAction na podhozené instanci AsyncResultE. Poté metoda EndLoadSites jen vrátí kolekci Sites objektu account, protože ta nyní již musí být po asynchronním volání naplněna daty.
Se stávající infrastrukturou si opět si můžeme rychle připravit další Begin a End metody. Zde jsou extenzní metody BeginLoadPosts a EndLoadPosts pro IPosterousSite.
public static IAsyncResult BeginLoadPosts(this IPosterousSite site, AsyncCallback callback, object context)
{
throwIfSiteNull(site);
var loadPostsAction = new Action(site.LoadAllPosts);
return RXEventsHelper.GetAsyncResultEx(loadPostsAction, callback, context);
}
public static IEnumerable<IPosterousPost> EndLoadPosts(this IPosterousSite site, IAsyncResult result)
{
throwIfSiteNull(site);
var exResult = result as AsyncResultEx;
if (exResult == null)
{
throw new ArgumentException("result");
}
exResult.EndAction();
return site.Posts;
}
A metoda loadPosts2, která dělá to samé, co předchozí metoda loadPosts, ale používá naše nové extenzní Begin/End metody.
private void loadPosts2()
{
toolStripStatusLabel1.Text = TEXT_LOAD_DATA_START;
IPosterousApplication app = PosterousApplication.Current;
IPosterousAccount account = app.GetPosterousAccount("posterousname", "posterouspassword");
var syncContext = SynchronizationContext.Current;
var resultPosts = from sites in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)())
from site in sites.ToObservable()
from posts in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)())
from post in posts.ToObservable()
where post.Private == false
select post;
resultPosts.Subscribe(post =>
{
lock (m_threadsSet)
{
m_threadsSet.Add(Thread.CurrentThread.ManagedThreadId);
}
syncContext.Post(_ =>
{
var UCpost = new UC_Post
{
Title = post.Title,
Body = post.Body
};
flowLayoutPanel1.Controls.Add(UCpost);
},
null
);
},
ex => syncContext.Post(_ =>
{
throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex);
},
null),
() => syncContext.Post(_ =>
{
lock (m_threadsSet)
{
m_threadsSet.Run(tId => lstThreads.Items.Add(tId));
}
toolStripStatusLabel1.Text = TEXT_LOAD_DATA_END;
},
null
)
);
}
Upozornil bych jen na dvě specialitky či zrádná místa, která (alespoň v této BETA verzi RX) dělají kód méně intuitvním, než by bylo žádoucí:
Jedná se o tyto dva řádky:
from sites in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)())
from posts in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)())
Metoda FromAsyncPattern přijímá delegáty na naše asynchronní metody, ale místo spolehnutí se na typovou inference jsem musel generický argument předat explicitně(Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>) – pokud argument nezadáte, kompilátor hlásí “ambigous reference”.
Dále je patrné, že výsledek funkce FromAsyncPattern, kterým je další funkce vracející IObservable, je předán jako argument metodě Observable.Defer. Metoda Observable.Defer zajistí, že k vyhodnocení předané funkce dojde až poté, co je k výsledkům dotazu přihlášen observer – jinými slovy, metoda Defer nám pomáhá zachovat “lazy” vyhodnocení dotazu.
Dotaz bude fungovat i v této podobě (bez Defer):
var resultPosts = from sites in Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)()
from site in sites.ToObservable()
from posts in Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)()
from post in posts.ToObservable()
where post.Private == false
select post;
Ale jeho vyhodnocení už není “lazy”. Všimněte si závorek na konci výrazu - FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)() - výsledné IObservable získám okamžitým zavoláním funkce vrácené z metody FromAsyncPattern. Vadit vám to začne v okamžiku, kdy zkonstruujete dotaz, ihned se odpálí asynchronní volání, dojde k chybě a vy budete mít v aplikaci neošetřenou výjimku v threadu na pozadí, protože IObserver ještě není přihlášen (není možné zavolat druhého delegáta - ex => syncContext.Post(_ => { throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex); }, null), ).
Snad se vám tato exkurze líbila. Já ještě na RX Framework konečný názor nemám, ale něco neodbytného ve mně říká, že by mohlo jít o další LINQ, který otřese programátorským světem. Některé extenze pravděpodobně zahrnu do samostatného jmenného prostoru v C# Posterous API.
Tuesday, 02 February 2010 07:43:00 (Central Europe Standard Time, UTC+01:00)
.NET Framework | C# Posterous API | LINQ | Návrhové vzory | RX Extensions | Windows Forms
Ukázka práce s Posterous API – zálohování blogu
Stáhnout výsledné exe -RSPosterousBackup.
Po jednoduchém přehledu možností mého C# Posterous API wrapperu se nyní podíváme, jak se dá API použít k zálohování vašeho blogu. Pro účely tohoto článku předpokládám, že jste úvodní článek o API wrapperu četli.
Zálohovač blogu (RSPosterousBackup.exe) je jednoduchá konzolová aplikace, které stačí předat uživatelské jméno (parametr –u) a heslo (parametr –p) vašeho účtu na Posterous a také adresář vašem počítači (parametr bd), do kterého chcete blog zazálohovat.
Jednoduchá ukázka:
RSPosterousBackup.exe -u:rene@renestein.net -p:mojeheslo -bd:c:\_Archiv\PosterousBackup
Blog uživatelského účtu reneATrenestein.net s heslem mojeheslo bude zazálohován do adresáře c:\_Archiv\PosterousBackup.
Program referencuje samozřejmě knihovnu RStein.Posterous.API a pro (své, uznávám ) pobavení jsem také použil RX for .Net Framework 3.5 SP1. Z RX Frameworku jsou referencovány assembly System.CoreEx, System.Interactive a System.Threading. Nejzajímavější na RX Frameworku je, že pro verzi 3.5 .Net Frameworku zpřístupňuje Parallel Linq – PLINQ, který je součástí připravovaného .Net Frameworku 4.0.
Ještě upozornění – v žádném případě nechci tvrdit, že kód, který uvidíte, využívá PLINQ správným způsobem. Program je jen pískovištěm, na kterém jsem si zkoušel a ověřoval, co RX a PLINQ umí a jak vypadá výsledný kód.
Po spuštění RSPosterousBackup.exe funkce Main pouze předá argumenty z příkazové řádky metodě BackupData v třídě BackupEngine, která představuje výkonný mozek celého zálohovače Posterous blogu.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace RSPosterousBackup
{
class Program
{
static void Main(string[] args)
{
var engine = new BackupEngine();
engine.BackupData(args);
Console.ReadLine();
}
}
}
Zde je třída BackupEngine
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using RStein.Posterous.API;
namespace RSPosterousBackup
{
public class BackupEngine
{
#region constants
public static readonly string POSTEROUS_USER = "-u";
public static readonly string POSTEROUS_PASSWORD = "-p";
public static readonly string BACKUP_DIRECTORY= "-bd";
public static readonly string HELP_KEY = "-?";
public static readonly string HELP_ALT_KEY = "-h";
public static readonly string POSTEROUS_FILE_EXTENSION = ".posterous";
public static readonly string POSTEROUS_MEDIA_FORMAT = "Media_{0}";
public static readonly string POSTEROUS_COMMENT_FORMAT = "Comment_{0}";
public static readonly string POSTEROUS_SITE_FORMAT = "Site_{0}";
#endregion constants
#region private variables
#endregion private variables
#region constructors
public BackupEngine()
{
}
#endregion constructors
public void BackupData(string[] commandLine)
{
if (commandLine == null)
{
throw new ArgumentNullException("commandLine");
}
var parser = new CommandLineParser();
Dictionary<string, string> cmSwitches = parser.Parse(commandLine);
Debug.Assert(cmSwitches != null);
if (cmSwitches.Keys.Any(key => (key.Equals(HELP_KEY, StringComparison.OrdinalIgnoreCase)) ||
(key.Equals(HELP_ALT_KEY,StringComparison.OrdinalIgnoreCase))))
{
logMessage(GlobalConstants.CMDLINE_SHOW_USAGE);
return;
}
if (!checkSwitches(cmSwitches))
{
logMessage(GlobalConstants.CMDLINE_SHOW_USAGE);
return;
}
backupDataInner(cmSwitches);
}
private void backupDataInner(Dictionary<string, string> cmSwitches)
{
string pUser = cmSwitches[POSTEROUS_USER];
string pPassword = cmSwitches[POSTEROUS_PASSWORD];
string backupDir = cmSwitches[BACKUP_DIRECTORY];
try
{
IPosterousAccount account = PosterousApplication.Current.GetPosterousAccount(pUser, pPassword);
if (!Directory.Exists(backupDir))
{
Directory.CreateDirectory(backupDir);
}
var currDirBackupName = new StringBuilder(DateTime.Now.ToLocalTime().ToString());
currDirBackupName.ToSafeFileName();
string currentBackupDir = Path.Combine(backupDir, currDirBackupName.ToString());
account.Sites.Run(site =>
{
string sitePath = Path.Combine(currentBackupDir, String.Format(POSTEROUS_SITE_FORMAT, site.Id.ToString()));
Directory.CreateDirectory(sitePath);
});
var processedPosts = (from site in account.Sites.AsParallel()
from post in site.Posts.AsParallel()
.Do(postPost =>
Console.WriteLine(String.Format(GlobalConstants.POST_BACKUP_MESSAGE_FORMAT,
postPost.Title,
Thread.CurrentThread.ManagedThreadId)))
.Do(postPost =>
{
string siteDir = Path.Combine(currentBackupDir, String.Format(POSTEROUS_SITE_FORMAT, site.Id.ToString()));
var titlBuilder = new StringBuilder(postPost.Title + postPost.Id.ToString());
titlBuilder.ToSafeFileName();
string postDirName = Path.Combine(siteDir,
titlBuilder.ToString()
);
Directory.CreateDirectory(postDirName);
string postFileName = Path.Combine(postDirName, titlBuilder.ToString() + POSTEROUS_FILE_EXTENSION);
using (var fileStream = File.Open(postFileName, FileMode.Create))
using (var stremWriter = new StreamWriter(fileStream, Encoding.UTF8))
{
stremWriter.Write(postPost.Body);
}
postPost.Media
.AsParallel()
.Run(media =>
{
var mediaName = new StringBuilder(String.Format(POSTEROUS_MEDIA_FORMAT, Guid.NewGuid()));
mediaName.ToSafeFileName();
string mediaFile = Path.Combine(postDirName, mediaName.ToString());
using (var fileStream = File.Open(mediaFile, FileMode.Create))
{
media.Content.CopyToStream(fileStream);
}
});
postPost.Comments
.AsParallel()
.Run(comment =>
{
var commentFileName = new StringBuilder(String.Format(POSTEROUS_COMMENT_FORMAT, Guid.NewGuid()));
commentFileName.ToSafeFileName();
string commentFile = Path.Combine(postDirName, commentFileName.ToString());
using (var fileStream = File.Open(commentFile, FileMode.Create))
using (var stremWriter = new StreamWriter(fileStream, Encoding.UTF8))
{
stremWriter.WriteLine(comment.Author.Name);
stremWriter.Write(comment.Body);
}
});
})
select post).ToList();
logMessage(String.Format(GlobalConstants.FINAL_BACKUP_MESSAGE_FORMAT, processedPosts.Count));
}
catch (Exception e)
{
Trace.WriteLine(e);
Console.WriteLine(e);
}
}
private bool checkSwitches(Dictionary<string, string> cmSwitches)
{
return ckeckIfSwitchNullOrEmpty(POSTEROUS_USER, cmSwitches) &&
ckeckIfSwitchNullOrEmpty(POSTEROUS_PASSWORD, cmSwitches) &&
ckeckIfSwitchNullOrEmpty(BACKUP_DIRECTORY, cmSwitches);
}
private bool ckeckIfSwitchNullOrEmpty(string switchKey, Dictionary<string, string> cmSwitches)
{
string val;
cmSwitches.TryGetValue(switchKey, out val);
if (String.IsNullOrEmpty(val))
{
logMessage(String.Format(GlobalConstants.INVALID_CML_SWITCH_FORMAT_STRING_EX, switchKey));
return false;
}
return true;
}
private void logMessage(string message)
{
Console.WriteLine(message);
}
}
}
Třída BackupEngine nejdříve v metodě BackupData pomocí instance třídy CommandLineParser rozpársuje příkazový řádek a voláním pomocné metody checkSwitches zkontroluje, zda byly předány všechny vyžadované parametry. Jestliže nějaký parametr chybí nebo byl zadán parametr pro zobrazení nápovědy (-h, –?), program zobrazí nápovědu a k zálohování nedojde.
Ihned po dokončení všech předběžných kontrol je volána privátní metoda backupDataInner, která je odpovědná za zálohování blogu. Metoda backupDataInner získá odkaz na Posterous účet (PosterousApplication.Current.GetPosterousAccount(pUser, pPassword); a poté pro každou Site (samostatný blog) založí nový podadresář v adresáři, jehož názvem je aktuální lokální datum a čas a který je vytvořen v adresáři předaném v parametru –bd uživatelem.
Adresářová struktura pro každý zálovaný blog:
<adresář určený –bd přepínačem>\<adresář - názvem je aktuální datum a čas>\Site_<Site Id>
Příklad adresářové struktury:
"c:\_Archiv\PosterousBackup\20.1.2010 15_57_07\Site_851694"
Zálohován tedy není jeden blog, ale všechny blogy, které jsou asociovány s daným posterous účtem.
Můžete si všimnout, že pro založení podaresáře používvám jednu z RX extenzních metod – metodu Run.
account.Sites.Run(site =>
{
string sitePath = Path.Combine(currentBackupDir, String.Format(POSTEROUS_SITE_FORMAT, site.Id.ToString()));
Directory.CreateDirectory(sitePath);
});
Metoda Run je náhradou za extenzní metodu ForEach, kterou jste si dříve museli sami dopsat nebo jste byli nuceni použít metodu ForEach ve třídě List takto.
account.Sites.ToList().ForEach(site => {//zbytek kódu lambdy identický s kódem výše…});
Zdůrazním, že metoda Run vykoná nějaký vedlejší efekt nad každým elementem v IEnumerable
Signatura metody Run:
public static void Run<TSource>(
this IEnumerable<TSource> source,
Action<TSource> action
)
Dále do proměnné processedPosts uložíme pomocí speciálního a pro naši kratochvíli jediného LINQ dotazu všechny zpracované blogposty (instance podporující rozhraní IPosterousPost). Jak vidíte, stačí se přes Posterous API dotázat do kolekce Sites (blogy) a poté z každého blogu zpracovat všechny blogspoty.
from site in account.Sites.AsParallel()
from post in site.Posts.AsParallel()…
Na více místech v dotazu si můžete všimnout volání extenzní metody AsParalllel, kterým dáváte najevo, že zpracování jednotlivých blogpostů a také médií (IPosterousMedium) a komentářů (IPosterousComment) může proběhnout ve více vláknech – o detaily se ale postará PLINQ, vy sami žádná nová vlákna nespouštíte ani nespravujete.
Můžete si také všimnout, že na několika místech volám extenzní metodu Do .Metoda Do pracuje podobně jako před chvílí zmiňovaná metoda Run. Na každý element v zdrojové kolekci aplikuje předanou funkci, ale poté na rozdíl od metody Run element předá k dalšímu zpracování.
Signatura metody Do:
public static IEnumerable<TSource> Do<TSource>(
this IEnumerable<TSource> source,
Action<TSource> action
)
Zde vypíšeme přes RX extenzní metodu Do titulek právě zpracovávaného blogpostu a Id vlákna, které blogspot zpracovává. Tato metoda je zde jen na ukázku, že každý element ve zdrojové kolekci je metodou Do předán dále ke zpracování
.Do(postPost =>
Console.WriteLine(String.Format(GlobalConstants.POST_BACKUP_MESSAGE_FORMAT,
postPost.Title,
Thread.CurrentThread.ManagedThreadId)))
Na metodu Do navazuje další metoda Do, ve které proběhne zpracování každého blogspotu. V adresáří každé Site (blogu) je vytvořen pro každý blogspot vytvořen nový podadresář, jehož názvem je titulek (vlastnost Title) společně s Id blogpostu. Do tohoto podadresáře je uložen text blogspotu. Soubor s textem blogspotu má příponu posterous a také jsou do podadresáře uložena média (zvukové, obrazové a video soubory) a všechny komentáře k blogspotu.
Hlavní část programu je za námi. Zde jsou ještě výpisy pomocných tříd.
Třída CommandLineParser pro pársování hodnot předaných uživatelem v příkazovém řádku.
using System;
using System.Collections.Generic;
using System.Linq;
namespace RSPosterousBackup
{
public class CommandLineParser
{
#region constants
public static readonly char COMMAND_PARTS_SEPARATOR = '-';
public static readonly char COMMAND_VALUE_SEPARATOR = ':';
public const int MIN_KEY_PARTS = 1;
public const int MAX_KEY_PARTS = 2;
#endregion constants
#region constructors
public CommandLineParser()
{
}
#endregion constructors
#region methods
public virtual Dictionary<string, string> Parse (string[] commandLine)
{
if (commandLine == null)
{
throw new ArgumentNullException("commandLine");
}
return parseInner(commandLine);
}
private Dictionary<string, string> parseInner(string[] commandLine)
{
var dict = (from part in commandLine
let keyValuePair = part.Split(new[] {COMMAND_VALUE_SEPARATOR}, MAX_KEY_PARTS)
select new
{
Key = keyValuePair.First().Trim().ToLower(),
Value = keyValuePair.Length > MIN_KEY_PARTS ? keyValuePair.Last().Trim() : String.Empty
}).ToDictionary(kv => kv.Key, kv => kv.Value);
return dict;
}
#endregion methods
}
}
Konstanty
namespace RSPosterousBackup
{
public static class GlobalConstants
{
public static readonly string INVALID_CML_SWITCH_FORMAT_STRING_EX = "Invalid switch {0}";
public static readonly char SAFE_FILE_PATH_CHAR = '_';
public static readonly string POST_BACKUP_MESSAGE_FORMAT = "Processing post: {0} - in thread {1}";
public static readonly string FINAL_BACKUP_MESSAGE_FORMAT = "Total posts: {0}";
public static readonly string CMDLINE_SHOW_USAGE =
@"Usage:
RSPosterousBackup.exe -u:<posterous user name> p:<posterous password> -bd:<backup directory>
Example:RSPosterousBackup.exe -u:GIC@Roma.com p:Rubicon -bd:c:\PosterousBackup
RSPosterousBackup.exe -? - show this help";
}
}
Třída StringBuilderExtensions s extenzní metodou ToSafeFileName, která v navrhovaném jménu souboru nahradí nepovolené znaky podtržítkem.
using System.IO;
using System.Linq;
using System.Text;
namespace RSPosterousBackup
{
public static class StringBuilderExtensions
{
public static void ToSafeFileName(this StringBuilder builder)
{
Path.GetInvalidFileNameChars().Run(ch => builder.Replace(ch, GlobalConstants.SAFE_FILE_PATH_CHAR));
}
}
}
Stáhnout výsledné exe -RSPosterousBackup.
Wednesday, 20 January 2010 17:54:21 (Central Europe Standard Time, UTC+01:00)
.NET Framework | C# Posterous API | LINQ | RX Extensions