\


 Tuesday, 02 February 2010
Hrátky s Reaktivním frameworkem (RX extenze)

V předchozím článku jsem ukazoval, jak volat asynchronně metody z C# Posterous API v Silverlightu. C# Posterous API nabízí asynchronní zpracování pomocí jednoho z doporučovaného přístupu k asynchronním operacím v .Net Frameworku – metoda s konvenčním sufixem Async (LoadPostsAsync) spustí vykonání operace v jiném vlákně a výsledky operace jsou nabídnuty v argumentech události, která je (opět) jen dle jmenné konvence spojena s asynchronní operací (událost LoadPostsCompleted). C# Posterous API nenabízí ve svém rozhraní  metody pro podporu dalšího a již od verze 1.0 .Net Frameworku přítomného asynchronního vzoru, který je spojen s dvojicí metod začínajících prefixem Begin a End. (BeginGetRequest, EndGetRequest, BeginRead, EndRead apod.)

Dále předpokládám, že oba přístupy k vytváření asynchronních opreací znáte a že jste si vědomi i toho, jak se způsob práce s asynchronními API odlišuje od práce s běžnými synchronnními metodami.

V již odkazovaném článku bylo dobře patrné, jak je řízení toku asynchronních operaci odlišné od sady volání běžných synchronních operací.

Pro připomenutí:

posterousAccount.SitesLoaded += (o, e) =>
                      {
                          throwIfAsyncEx(e.Exception);
                          posterousAccount.PrimarySite.PostsLoaded += (_, e2) =>
                                                                          {
                                                                              throwIfAsyncEx(e2.Exception);
                                                                              Posts = (from p in e2.Value
                                                                                      select new ViewPost
                                                                                                 {
                                                                                                     Title = p.Title,
                                                                                                     Body = p.Body,
                                                                                                     Url = p.Url

                                                                                                 }).ToList();                                                                                                            
                                                                              
                                                                          };
                          posterousAccount.PrimarySite.LoadAllPostsAsync();
                      };



posterousAccount.LoadSitesAsync();

Jediné, co tento kód dělá, je, že nejprve (!) nahraje všechny blogy (příkaz k asynchronnímu nahrání posterousAccount.LoadSitesAsync(); je na posledním (!) řádku. Na prvním (!) řádku máme zpracování výsledku volání metody LoadSitesAsync, ve kterém opět nejdříve (!) lambdou přihlášenou k odběru události  PostsLoaded (posterousAccount.PrimarySite.PostsLoaded += (_, e2)) řekneme, jak zpracujeme výsledek následného (!) volání další asynchronní metody (posterousAccount.LoadSitesAsync());. Tato “inverzní“ práce s asynchronními metodami a zpracováním jejich výsledku je na hony a možná ještě dále vzdálena intuitivní práci se synchronními metodami.:-)

Zkusme se nyní podívat, jak by nám s “převrácením starších asynchronních metod z hlavy zpět na synchronní nohy” mohl pomoci RX Framework. Úplné základy v tomto článku nezazní a začátečníky odkazuji na sérii přednášek na Channel 9, kde dozvíte i zajímavé podrobnosti o genezi celého RX Frameworku  a matematické dualitě rozhraní IEnumerable a IObservable (jinými slovy o společných rysech dobře známých GoF návrhových vzorů Iterátor a Observer).

Současné příklady jsou vytvořeny v aplikaci Windows Forms pro .Net 3.5. Silverlight má své zvláštnosti a a rozchození příkladů v SL si zaslouží další článek, protože teď by řešení problémů specifických pro SL zamlžovalo cíl příkladu. Aplikace je pro .Net 3.5, protože stejná aplikace pro .Net 4.0 hlásí konflikt (ambiguous reference) mezi NF typy a RX typy.

Upozornění: Nic z toho, co napíšu neberte ani jako dogmata ani, nedej bože, jako best practices. RX Framework je v Betě, zdokumentován je mizerně a z jednoho řádku u každé metody se dá jen těžko bez dalších experimetů vytušit, co přesně metoda dělá. Tento článek je výsledkem hraní si pro účely jednoho projektu, kam se RX extenze hodí  a zjednodušují (alespoň to tak prozatím vypadá :-) ) dost rutinních činností.

Zde j výsledek našeho snažení, abychom měli motivaci se RX Frameworkem zabývat.

 var resultPosts = from sites in account.GetSites()
                              from site in sites.ToObservable()
                              from posts in site.GetPosts()
                              from post in posts.ToObservable()
                              where post.Private == false
                              select post;

Získání blogů (Sites) i blogpostů (post) je stále asynchronní, ale výsledný kód vypadá jako běžný LINQ (To Enumerable) dotaz. Žádné inverzní volání a práce s výsledkem, jen prostý dotaz, jehož zvláštností je pouze to, že v některých místech voláme metodu ToObservable.

Jak jsem dosáhl tohoto výsledku?

Podíváme-li se na první řádek, vidíme, že voláme metodu account.GetSites. Metoda GetSites součástí C# Posterous API není a jedná se o extenzní metodu. Tato extenzní metoda je zvláštní tím, že její návratovou hodnotou je je jedno z klíčových rozhraní v RX Frameworku – rozhraní IObservable<T>.

        public static IObservable<IEnumerable<IPosterousSite>> GetSites(this IPosterousAccount account)

Rozhrani IObservable má v RX Frameworku podobný význam jako rozhraní IEnumerable v celém .Net Frameworku.  Zjednodušeně můžeme rozhraní IObservable popsat jako ceduli, kterou třída implementující rozhraní dává celému světu najevo: “Miluju voyery, jestliže chcete sledovat, co se ve mně děje, dejte mi sem pozorovatele a já na sebe všechno podstatné, co se od této chvíle stane, postupně  vyzvoním ”.

Rozhraní IObservable je tedy příslib, že zainteresovaný pozorovatel dostane data, která třída podporující toto rozhraní nabízí. Svůj zájem pozorovatel deklaruje tak, že předá odkaz sám na sebe do metody Subscribe.

public interface IObservable<T> 
{
 IDisposable Subscribe(IObserver<T> observer); 
}

Pozorovatel (IObserver) reaguje (proto reaktivní framework) na informace, které jsou mu poskytnuty objektem podporujícím rozhraní IObservable.

public interface IObserver<T> 
{ 
void OnCompleted(); 
void OnNext(T value); 
void OnError(Exception exn); 
}

Metoda OnNext je na IObserver volána vždy, když Observable objekt má k dispozici další data. Metodou OnError Observable objekt signalizuje chyby a metodou OnCompleted Observeru říká “jsem u konce, nic dalšího už pro tebe nemám”.

Naše metoda GetSites tedy říká – zavolejte mě a já vám nabídnu IObservable objekt, který, až budou data k dispozici, vašemu observeru (IObserver) vydá kolekci (IEnumerable) objektů IPosterousSite.

Extenzní metoda GetSites vypadá takto:

 public static IObservable<IEnumerable<IPosterousSite>> GetSites(this IPosterousAccount account)
        {
             
            checkAccountNotNull(account);            
             var sitesEvents = Observable.FromEvent<EventArgsValue<IEnumerable<IPosterousSite>>>(handler => account.SitesLoaded += handler,
                                                                                                handler => account.SitesLoaded -= handler)
                                        .Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT);



             return sitesEvents.GetFinalObservableEvents(account.LoadSitesAsync);
            
        }

 

Po kontrole, zda předaný IPosterousAccount není null, využijeme pomocnou metodu Observable.FromEvent z RX Frameworku, která nám vrátí IObservable objekt. Tento IObservable objekt notifikuje případného observera o každé nastalé události sites.Loaded. V našem případě Observera notifikuje o právě jedné události, protože jsme použili metodu Take (Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT)) a konstanta DEFAULT_TAKE_EVENTS_COUNT má hodnotu 1. Jak si můžete všimnout, metoda FromEvent nám dovoluje s událostmi, které postupně nastávají, zacházet jako (s potenciálně nekonečnou) kolekcí hodnot. Metodě FromEvent jsme pouze museli říct, jaká třída nese argumenty událost (EventArgsValue<IEnumerable<IPosterousSite>) a poskytli jsme ji dva delegáty pro registraci/deregistraci obslužných handlerů, které nám RX Framework předá  (handler => account.SitesLoaded += handler, handler => account.SitesLoaded –= handler). U našeho volání metody Take bych ještě poznamenal, že po vyvolání první události dojde automaticky RX Frameworkem k deregistraci obslužného handleru.

Proměnná sitesEvents je IObserver tohoto typu.

IObservable<IEvent<EventArgsValue<IEnumerable<IPosterousSite>>>>

Argumenty události jsou vždy zabaleny do instance třidy IEvent, která je vydána zaregistrovanému observeru v jeho metodě OnNext. Všimněte si ale, že návratovou hodnotou metody GetSites je již Observable, který observeru předá hodnoty bez IEvent (IObservable<IEnumerable<IPosterousSite>>).

Vidíme, že na sitesEvents je volána další má extenzní metoda GetFinalObservableEvents, které je předán delegát Action ukazující na asynchronní metodu account.LoadSitesAsync, a výsledek volání GetFinalObservableEvents je vrácen klientovi.

Metoda GetFinalObservableEvents:

  public static IObservable<TEventData> GetFinalObservableEvents<TEventData>(this IObservable<IEvent<EventArgsValue<TEventData>>> sourceEvents, Action runAction)
        {
            if (sourceEvents == null)
            {
                throw new ArgumentNullException("sourceEvents");
            }
            var retObservable = new DelegateObservable<TEventData>(
                observer =>
                    {
                        
                        var eventObserver = new EventObserver<TEventData>(observer);
                        var unsubScribe = sourceEvents.Subscribe(eventObserver);
                        runAction();
                        return unsubScribe;
                    });

            return retObservable;
        }

 

Metoda GetFnalObservableEvents vrací opět Observable, ale tentokrát jde o Observable typu IObservable<TEventData>  - jinými slovy, v našem případě IObservable<IEnumerable<IPosterousSite>>. Jak je toho dosaženo? Zdrojový IObservable objekt nazvaný sourceEvents je předán instanci třídy DelegateObservable, což je v současném scénaři již ten hledaný Observable podporující rozhraní IObservable<IEnumerable<IPosterousSite>>. DelegateObservable je tedy adaptér, který převádí události zabalené do IEvent na “rozbalené” hodnoty očekávané observerem. DelegateObservable je můj pomocný IObservable, který dostává do konstruktoru lambdu představující tělo jeho metody Subscribe, abychom nemuseli reimplementovat rozhraní IObservable v různých třídách stále dokola.

Výpis třídy DelegateObservable

 public class DelegateObservable<T> : IObservable<T>
    {
        private readonly Func<IObserver<T>, IDisposable> m_subscribeDel;

        public DelegateObservable(Func<IObserver<T>, IDisposable> subscribeDelegate)
        {
            m_subscribeDel = subscribeDelegate;
            if (m_subscribeDel == null)
            {
                throw new ArgumentNullException("subscribeDelegate");
            }
        }

        #region Implementation of IObservable<out T>

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }
            
           return m_subscribeDel(observer);
        }

        #endregion
    }

Předaná lambda v našem případě vytvoří instanci třídy eventsObserver, což je observer, který bude zpracovávat přicházející události, a do konstruktoru mu podhodí observer předaný klientským kódem – eventsObserver je tedy další adaptér, který je zdopovědný za “rozbalení” dat z instance IEvent a za předání těchto dat klientskému (“konečnému”) observeru.

Třída EventObserver:

 public class EventObserver<T> : IObserver<IEvent<EventArgsValue<T>>>                        
    {
        private readonly IObserver<T> m_innerObserver;
        private bool m_exceptionOccured;

        public EventObserver(IObserver<T> innerObserver)
        {
            if (innerObserver == null)
            {
                throw new ArgumentNullException("innerObserver");
            }
            
            m_innerObserver = innerObserver;
            m_exceptionOccured = false;
        }

        #region Implementation of IObserver<T>
        

        public virtual void OnNext(IEvent<EventArgsValue<T>> eventData)
        {
            if (eventData.EventArgs.Exception != null)
            {
                OnError(eventData.EventArgs.Exception);
                return;
            }
            //Rozbalení a předání dat Observeru
            m_innerObserver.OnNext(eventData.EventArgs.Value);
        }

        public virtual void OnError(Exception exception)
        {
            m_innerObserver.OnError(exception);
            m_exceptionOccured = true;
        }

        public virtual void OnCompleted()
        {
           //Chyba ukončí sekvenci sama o sobě
            if (!m_exceptionOccured)
            {
                m_innerObserver.OnCompleted();                
            }
        }

Třída EventObserver implementuje rozhraní IObservable s těmito generickými argumenty - IObserver<IEvent<EventArgsValue<T>>> . V C# Posterous API všechny události předávají svá data v instanci třídy EventArgsValue<T>, což znamená, že pro naše účely je EventObserver univerzálně použitelný observer pro zpracování výsledků asynchronní operace.

Pro úplnost zde je výpis třídy EventArgsValue

 public class EventArgsValue<T> : EventArgs
    {
        private readonly T m_value;
        private readonly Exception m_exception;

        internal EventArgsValue(T value, Exception exception)
        {
            m_value = value;
            m_exception = exception;
        }
            
        public T Value
        {
            get
            {
                return m_value;
            }
        }

        public Exception Exception
        {
            get
            {
                return m_exception;
            }
        }
    }

V lambdě předané do objektu DelegateObservable také musíme spustit asynchronní operaci – to je volání Action delegáta runAction, který nám předala již metoda GetSites. Každý IObservable také z metody vrací objekt implementující rozhraní IDisposable – volání metody Dispose dovoluje klientovi odpojit se objektu IObservable. Lambda  předaná do instance DelegateObservable vrátí  IDisposable objekt, který je vydán po připojení EventObservera k “streamu události“ (sourceEvents).

Přidání extenzních metod k dalším třídám je triviální – zde je extenzní metoda pro IPosterousSite, která nahraje všechny blogposty.

public static IObservable<IEnumerable<IPosterousPost>> GetPosts(this IPosterousSite site)
        {            

            throwIfSiteNull(site);
            var postsEvents = Observable.FromEvent<EventArgsValue<IEnumerable<IPosterousPost>>>(handler => site.PostsLoaded += handler,
                                                                                               handler => site.PostsLoaded -= handler)
                                      .Take(GlobalConstants.DEFAULT_TAKE_EVENTS_COUNT);
            

            return postsEvents.GetFinalObservableEvents(site.LoadAllPostsAsync);
            
            

        }
 
 

A nyní se můžeme znovu podívat, jak naše API použijeme v klientském kódu – díky alternativní implementaci Query vzoru v RX frameworku  můžeme používat staré dobré známé LINQ dotazy.

Metoda loadPosts:

private void loadPosts()
        {
            toolStripStatusLabel1.Text = TEXT_LOAD_DATA_START;

            IPosterousApplication app = PosterousApplication.Current;
            IPosterousAccount account = app.GetPosterousAccount("<Posterous user name>", "Posterous password");

            var syncContext = SynchronizationContext.Current;

            var resultPosts = from sites in account.GetSites()
                              from site in sites.ToObservable()
                              from posts in site.GetPosts()
                              from post in posts.ToObservable()
                              where post.Private == false
                              select post;

            resultPosts.Subscribe(post =>
                                      {
                                          lock (m_threadsSet)
                                          {
                                              m_threadsSet.Add(Thread.CurrentThread.ManagedThreadId);
                                          }


                                          syncContext.Post(_ =>
                                                               {
                                                                   var UCpost = new UC_Post
                                                                                    {
                                                                                        Title = post.Title,
                                                                                        Body = post.Body
                                                                                    };

                                                                   flowLayoutPanel1.Controls.Add(UCpost);
                                                               },
                                                           null

                                              );
                                      },

                                  ex => syncContext.Post(_ =>
                                                             {

                                                                 throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex);
                                                             },

                                                         null),

                                  () => syncContext.Post(_ =>
                                                             {
                                                                 lock (m_threadsSet)
                                                                 {
                                                                     m_threadsSet.Run(tId => lstThreads.Items.Add(tId));
                                                                 }

                                                                 toolStripStatusLabel1.Text = TEXT_LOAD_DATA_END;
                                                             },
                                                         null

                                            ));



        }

V proměnné resultPosts jsou uloženy všechny blogposty (IPosterousPost) ze všech blogů (IPosterousSite). Blogy i blogposty jsou nahrány asynchronně, ale v klientském kódu nevidíme žádná specialitky kvůli asynchronnímu nahrávání dat. Na proměnných sites i posts v dotazu je volána další extenzní metoda z RX Frameworku ToObservable, protože jak víme, výsledkem volání asynchronních metod byly typy IEnumerable<IPosterousPost> a IEnumerable<IPosterousSite>.

Důležité je, že zpracování dotazu je opět “lazy” – to znamená, že k získání dat dojde až poté, co k resultsPosts zaregistruju svého Observera metodou Subscribe (analogie “Lazy” vyhodnocování v LINQ To IEnumerable a procházení dotazu v cyklu foreach). Metoda Subscribe má několik variant a jedna z nich nám dovoluje pro metody OnNext, OnError a OnCompleted předat delegáty, aniž bychom byli nuceni vytvářet svou třídu implementující rozhraní IObserver.

První delegát (OnNext) vezme předaný post a vytvoří pro něj UserControl, který vloží do FlowPanelu na formuláři. Ještě předtím pro zajímavost do Hashsetu ukládám identifikátory vláken, které se na zpracování dotazu podílí. S prvky na formuláři můžeme pracovat jen z UI threadu, a proto je vložení User controlu provedeno přes SynchronizationContext uložený do proměnné syncContext před spuštěním dotazu.

Druhý delegát (OnError)  pouze přes SynchronizationContext zpropaguje výjimku, která nastala při asynchronním zpracování, do UI threadu. Všimněte si, jak je zpracování výjimek jednoduché – rozdíl vynikne při srovnání s opakovaným voláním metody throwIfAsyncEx v kódu na začátku tohoto článku.

Třetí delegát (OnCompleted) naplní listbox na formuláři ID použitých threadů a změní text ve status baru.

Zde je výsledný formulář. V listboxu nahoře si můžete všimnout, že u mě byly k vykonání dotazu použity celkem 3 thready.

 

AsyncForm

 

Tím bychom mohli skončit, ale RX Framework má pro asynchronní operace ještě další zajímavou podporu. Pomocí metody Observable.FromAsyncPattern můžeme vytvořit IObservable rychle a bezpracně ze standardního a výše již zmíněného asynchronního Begin/End vzoru. V C# Posterous API metody Begin*/End* nejsou, proto je zkusme dodat pomocí extenzních metod.

Rozhraní IPosterousAccount bude obohaceno o extenzní metody BeginLoadSites a EndLoadSite.

Metoda BeginLoadSites

public static IAsyncResult BeginLoadSites(this IPosterousAccount account, AsyncCallback callback, object context)
        {
            checkAccountNotNull(account);
            var loadSiteAction = new Action(account.LoadSites);
            
            return RXEventsHelper.GetAsyncResultEx(loadSiteAction, callback, context);
       
            
        }

Jak vidíte, přesně dle konvencí .Net vzoru metoda vrací odkaz na rozhraní IAsyncResult a přijímá callBack, což je tedy u tohoto vzoru metoda, která má být vyvolána po dokončení asynchronního zpracování, a jak také vzor vyžaduje, posledním argumentem je libovolný objekt reprezentující libovolný “stavový token” operace, který v metodě End* klient používá pro korelací mezi požadavkem a odpovědí.

Veškerá práce je přenesena na metodu GetAsyncResultEx v mém RXEventsHelperu – metoda vyžaduje, abyste ji poslali v delegátu Action metodu, která má být spuštěna asynchronně.

Metoda RXEventsHelper.GetAsyncResultEx.

 public static IAsyncResult GetAsyncResultEx(Action runAction, AsyncCallback callback, object context)
        {
            if (runAction == null)
            {
                throw new ArgumentNullException("runAction");
            }

            Exception ex = null;
            
            var proxyCallback = new AsyncCallback(ar =>
                                                      {
                                                          IAsyncResult proxyResult = new AsyncResultEx(ar, runAction);                                                                                         
                                                           callback(proxyResult);
                                                      });

            return runAction.BeginInvoke(proxyCallback, context);
                      

            
                       
        }

Hlavním trikem je využití možností delegátů – každý delegát v .Net Frameworku vždy obsahuje asynchronní metody BeginInvoke a EndInvoke, které splňují nároky asynchronního vzoru. My tedy na předaném delegátu runAction zavoláme metodu BeginInvoke, ale místo klientské callBack Funkce podhodíme svou proxy funkci (proxyCallback), která po dokončení asynchronního volání připraví pro naši End metodu vlastní IAsyncResult (AsyncResultEx).

Třída AsyncResultEx zapouzdřuje původní IAsyncResult  (argument ar předaný  do konstruktoru v předešlém výpisu) a navíc, když na její instanci zavoláme metodu EndAction, na předaném delegátovi (argument runAction v předešlém výpisu) je zavolána metoda EndInvoke, čehož využije naše metoda EndLoadSites.

Třída AsyncResultEx

 public class AsyncResultEx : IAsyncResult
    {
        
        #region private variables
        private IAsyncResult m_originalAsyncResult;
        private readonly Action m_originaldelegate;
        #endregion private variables

        public AsyncResultEx(IAsyncResult origAsyncResult, Action originaldelegate)
        {
            if (origAsyncResult == null)
            {
                throw new ArgumentNullException("origAsyncResult");
            }


            m_originalAsyncResult = origAsyncResult;
            m_originaldelegate = originaldelegate;
        }

        #region properties
        public virtual IAsyncResult OriginalAsyncResult
        {
            get
            {
                return m_originalAsyncResult;
            }

        }

        public virtual Action OriginalDelegate
        {
            get
            {
                return m_originaldelegate;
            }
        }
        
        #endregion properties

        #region methods

        public virtual void EndAction()
        {
            
            if (OriginalDelegate != null)
            {
                OriginalDelegate.EndInvoke(OriginalAsyncResult);
            }                                                

        }
            
        #endregion methods
        #region Implementation of IAsyncResult

        
        public virtual bool IsCompleted
        {
            get
            {
                return m_originalAsyncResult.IsCompleted;
            }
        }

        public virtual object AsyncState
        {
            get
            {
                return m_originalAsyncResult.AsyncState;
            }
        }

        public virtual WaitHandle AsyncWaitHandle
        {
            get
            {
                return m_originalAsyncResult.AsyncWaitHandle;
            }
        }

        public virtual bool CompletedSynchronously
        {
            get
            {
                return m_originalAsyncResult.CompletedSynchronously;
            }
        }
                
        #endregion
    }

Extenzní metoda EndLoadSites

 public static IEnumerable<IPosterousSite> EndLoadSites(this IPosterousAccount account, IAsyncResult result)
        {
            checkAccountNotNull(account);
            
            var exResult = result as AsyncResultEx;
            
            if (exResult == null)
            {
                throw new ArgumentException("result");
            }
            
            exResult.EndAction();
                                               
            return account.Sites;
        }

Zde vidíme volání metody EndAction na podhozené instanci AsyncResultE. Poté metoda EndLoadSites jen vrátí kolekci Sites objektu account, protože ta  nyní již musí být po asynchronním volání naplněna daty.

Se stávající infrastrukturou si opět si můžeme rychle připravit další Begin a End metody. Zde jsou extenzní metody BeginLoadPosts a EndLoadPosts pro IPosterousSite.

public static IAsyncResult BeginLoadPosts(this IPosterousSite site, AsyncCallback callback, object context)
        {
            throwIfSiteNull(site);
            var loadPostsAction = new Action(site.LoadAllPosts);
            
            return RXEventsHelper.GetAsyncResultEx(loadPostsAction, callback, context);
        }

        public static IEnumerable<IPosterousPost> EndLoadPosts(this IPosterousSite site, IAsyncResult result)
        {
            throwIfSiteNull(site);
            var exResult = result as AsyncResultEx;

            if (exResult == null)
            {
                throw new ArgumentException("result");
            }

            exResult.EndAction();

            return site.Posts;
        }

 

A metoda loadPosts2, která dělá to samé, co předchozí metoda loadPosts, ale používá naše nové extenzní Begin/End metody.

 private void loadPosts2()
        {
            toolStripStatusLabel1.Text = TEXT_LOAD_DATA_START;

            IPosterousApplication app = PosterousApplication.Current;
            IPosterousAccount account = app.GetPosterousAccount("posterousname", "posterouspassword");

            var syncContext = SynchronizationContext.Current;            


            var resultPosts = from sites in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)())
                              from site in sites.ToObservable()
                              from posts in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)())
                              from post in posts.ToObservable()
                              where post.Private == false
                              select post;

            resultPosts.Subscribe(post =>
                                      {
                                          lock (m_threadsSet)
                                          {
                                              m_threadsSet.Add(Thread.CurrentThread.ManagedThreadId);
                                          }


                                          syncContext.Post(_ =>
                                                               {
                                                                   var UCpost = new UC_Post
                                                                                    {
                                                                                        Title = post.Title,
                                                                                        Body = post.Body
                                                                                    };

                                                                   flowLayoutPanel1.Controls.Add(UCpost);
                                                               },
                                                           null

                                              );
                                      },


                                  ex => syncContext.Post(_ =>
                                                             {
                                                                 
                                                                 throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex);
                                                             },

                                                         null),

                                  () => syncContext.Post(_ =>
                                                             {
                                                                 lock (m_threadsSet)
                                                                 {
                                                                     m_threadsSet.Run(tId => lstThreads.Items.Add(tId));
                                                                 }

                                                                 toolStripStatusLabel1.Text = TEXT_LOAD_DATA_END;
                                                             },
                                                         null

                                            )


                );


        }

Upozornil bych jen na dvě specialitky či zrádná místa, která (alespoň v této BETA verzi RX) dělají kód méně intuitvním, než by bylo žádoucí:

Jedná se o tyto dva řádky:

from sites in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)())  
from posts in Observable.Defer(() => Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)()) 

Metoda FromAsyncPattern přijímá delegáty na naše asynchronní metody, ale místo spolehnutí se na typovou inference jsem musel generický argument předat explicitně(Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>) – pokud argument nezadáte, kompilátor hlásí “ambigous reference”.

Dále je patrné, že výsledek funkce FromAsyncPattern, kterým je další funkce vracející IObservable, je předán jako argument metodě Observable.Defer. Metoda Observable.Defer zajistí, že k vyhodnocení předané funkce dojde až poté, co je k výsledkům dotazu přihlášen observer – jinými slovy, metoda Defer nám pomáhá zachovat “lazy” vyhodnocení dotazu.

Dotaz bude fungovat i v této podobě (bez Defer):

var resultPosts = from sites in  Observable.FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)()
                              from site in sites.ToObservable()
                              from posts in Observable.FromAsyncPattern<IEnumerable<IPosterousPost>>(site.BeginLoadPosts, site.EndLoadPosts)()
                              from post in posts.ToObservable()
                              where post.Private == false
                              select post;

Ale jeho vyhodnocení už není “lazy”. Všimněte si závorek na konci výrazu - FromAsyncPattern<IEnumerable<IPosterousSite>>(account.BeginLoadSites, account.EndLoadSites)()  - výsledné IObservable získám okamžitým zavoláním funkce vrácené z metody FromAsyncPattern. Vadit vám to začne v okamžiku, kdy zkonstruujete dotaz, ihned se odpálí asynchronní volání, dojde k chybě a vy budete mít v aplikaci neošetřenou výjimku v threadu na pozadí, protože IObserver ještě není přihlášen (není možné zavolat druhého delegáta - ex => syncContext.Post(_ => { throw new ApplicationException(ASYNC_EXCEPTION_TEXT, ex); }, null), ).

 

Snad se vám tato exkurze líbila. Já ještě na RX Framework konečný názor nemám, ale něco neodbytného ve mně říká, že by mohlo jít o další LINQ, který otřese programátorským světem. :-) Některé extenze pravděpodobně zahrnu do samostatného jmenného prostoru v C# Posterous API.



Tuesday, 02 February 2010 07:43:00 (Central Europe Standard Time, UTC+01:00)       
Comments [3]  .NET Framework | C# Posterous API | LINQ | Návrhové vzory | RX Extensions | Windows Forms


Tuesday, 02 February 2010 14:45:03 (Central Europe Standard Time, UTC+01:00)
Moc pěkný článek! Já zatím koketuji se System.Interactive, který je taky součástí Rx Frameworku. Asynchonních událostí v ASP.NET MVC moc k řešení nemám, kde bych Reactive využil, ale přednášky, co jsem viděl, mne velice zuajaly. :)
Tuesday, 02 February 2010 14:46:55 (Central Europe Standard Time, UTC+01:00)
Díky Aleši, System.Interactive už mám připojený ke každému projektu - Do, Run, Merge, (vylepšené Concat) se hodí skoro vždy.:)
Wednesday, 03 February 2010 21:48:33 (Central Europe Standard Time, UTC+01:00)
Pekne poctenicko, na RX fr jsem koukal hned jak vysla prvni verejna verze, toto je super pripomenuti. A zrovna se mi bude hodit....
Comments are closed.