Reactive Extensions

Inledning

Reactive Extensions eller kort och gott Rx är ett ramverk som används allt mer. Det finns en mängd saker som man kan använda Rx till och därmed också en mängd olika sätt att lära sig Rx. För att göra allt lite mer förvirrande så finns det en äldre version 1 samt en nyare version 2 av Rx. Det som gör det förvirrande är att exempelkod från version 1, vilket är det vanligaste på nätet, skiljer sig en del från version 2.

Vi kommer att titta på Rx version 2 och visa genom exempel i Windows Forms hur det fungerar.

IObservable vs IEnumerable

IObservable följde med som interface i C# 4.0 men utan några implementationer! Det är här Rx kommer in. Rx står för implementationer av IObservable och IObservable.

IEnumerable har du garanterat använt mycket. Det är det interface som array'er, listor m.fl. implementerar. Det gör att vi får uppräkningsbara klasser. Vi kan då använda foreach för att iterera över samlingen.


	int[] data = { 0, 1, 2, 3, 4, 5, 6, 7, 8 };

	foreach(int i in data)
	{
		Console.WriteLine(i);
	}

Det vi gör är att vi aktivt frågar efter nästa data i vår IEnumerable. I exemplet ovan så sker det omedelbart att vi får nästa data, men vad händer om det skulle ta tid att få vår data? Säg att datakällan inte är en enkel lista av int utan en webbtjänst där varje förfrågan efter "nästa data" tar tid. Då blockeras programmet i väntan på "nästa data" och vi riskerar att få följande beteende:

bild

Ett program som inte svarar, dvs. ett användargränssnitt som är låst. Programmet som vi skriver använder bara 1 tråd. Vi kan kalla denna UI-tråden eftersom den också sköter uppritningen av programmet (UI = User Interface). Inom programmering brukar detta kallas för synkrona anrop. Anropen är synkroniserade och vi väntar på resultat. Beteendet att aktivt fråga efter "nästa data" brukar kallas för "pull"-baserat.

Vad händer om vi vänder på det hela? Jo då går vi från IEnumerable till IObserverable. Vi går från data som vi räknar upp till data som vi övervakar (observe)! Vi går också från synkron programmering till asynkron programmering. Vi går från "pull"-baserat till "push"-baserat.

Istället för att låsa programmet i väntan på data så sätter vi upp en data-källa som vi övervakar och reagerar på när data dyker upp. Alltså helt omvänt. Vårt program kan då göra andra saker i väntan på att något skall bli klart. Vi jobbar asynkront! Hur löser man detta? Rent tekniskt handlar det om att programmet skapar extra trådar som ligger och övervakar. Så fort man går över till asynkront så handlar det om fler trådar. Fler trådar gör också programmet mer komplext.

Events

Vi skapar ett Windows Forms projekt med en ListBox, en TextBox och en Timer för att visa lite exempel. Programmet nedan prenumererar på eventet MouseMove för formuläret. Det kommer att triggas då muspekaren rör sig över bakgrunden i formuläret men ej över knappen eller listboxen.

Events exempel

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace ReactiveTest
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
            this.MouseMove += Form1_MouseMove;
        }

        void Form1_MouseMove(object sender, MouseEventArgs e)
        {
            listBox1.Items.Add(string.Format("Got X={0} Y={1}", e.X, e.Y));
        }

        private void button1_Click(object sender, EventArgs e)
        {
            this.MouseMove -= Form1_MouseMove;
        }
		
        private void timer1_Tick(object sender, EventArgs e)
        {
            textBox1.Text = DateTime.Now.ToString("HH:mm:ss,ff");
        }		
    }
}

Vad vi har här är ganska typiskt Windows Forms. Faktum är att events är en ganska typisk tillämpning för Rx som vi strax skall se exempel på. Vi kopplar oss på ett event och reagerar så fort muspekaren rörs. Det resulterar i väldigt många snabba utskrifter. Vi har även en Timer där vi reagerar på eventet Tick var 100:e millisekund.

Vad händer med programmet om själva reaktionen på eventet MouseMove tar tid? Vi kan simulera detta genom att låta tråden sova lite, vilket motsvarar en beräkningsintensiv operation.


	void Form1_MouseMove(object sender, MouseEventArgs e)
	{
		listBox1.Items.Add(string.Format("Got X={0} Y={1}", e.X, e.Y));
		Thread.Sleep(1000);
	}

Inte helt oväntat så låses programmet i minst 1 sekund varje gång vi rör musen. Uppritningen upphör att fungera vilket syns då varken tiden eller muspositionen ritas ut längre. Programmet svarar ej. En annan sak som händer är att events försvinner. Under tiden vi väntar i 1 sekund så missar vi events från musen och från timern. Detta beror på att tråden som övervakar events i vårt program är samma tråd som också exekverar dem.

Rx och Events

Kanske dags att vi skapar en Observable nu! Till det använder vi Rx. För att lägga till Rx i projektet så använder vi NuGet. Paketet heter "Reactive Extensions - Main Library". Vi får då tillgång till klassen Observable som kan skapa just "observables" från t.ex. events.

Rx exempel 1

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace ReactiveTest
{
    public partial class Form1 : Form
    {
        IDisposable subscription;

        public Form1()
        {
            InitializeComponent();
            var mouseMoves = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
            subscription = mouseMoves.Subscribe(e => Form1_MouseMove(e.Sender, e.EventArgs));
        }

        void Form1_MouseMove(object sender, MouseEventArgs e)
        {
            listBox1.Items.Add(string.Format("Got X={0} Y={1}", e.X, e.Y));
            Thread.Sleep(1000);
        }

        private void button1_Click(object sender, EventArgs e)
        {
            subscription.Dispose();
        }

        private void timer1_Tick(object sender, EventArgs e)
        {
            textBox1.Text = DateTime.Now.ToString("HH:mm:ss,ff");
        }
    }
}

Programmet fungerar just nu likadant men vad är skillnaden? Vi skapar en Observable med metoden FromEventPattern. Vi behöver då ange vilken typ av EventArgs samt objektet (this) och namnet på eventet som vi är intresserade av. När vi har vår Observable så kan vi göra en prenumeration (subscription). Här har vi valt att återanvända metoden Form1_MouseMove. Vi skulle kunna använda en anonym delegat för vi får en IDisposable av vår prenumeration. Denna IDisposable kan vi använda om vi vill avsluta prenumerationen.

(Överkurs) I första exemplet hade vi också kunnat använda en anonym delegat istället för metoden Form1_MouseMove. Problemet med events är då att du kan inte avsluta prenumerationen på eventhandlern. I exemplet med Rx får vi alltid en IDisposable vid prenumeration.

Nu kan vi göra en enkel justering för att hantera flödet av events.


	var mouseMoves = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
	subscription = mouseMoves.Throttle(TimeSpan.FromMilliseconds(200))
		.Subscribe(e => Form1_MouseMove(e.Sender, e.EventArgs));

Vi använder metoden Throttle på vår Observable. Metoden gör så att alla events ignoreras tills dess att det gått en tidsperiod utan events (200ms) och rapporterar då senaste eventet. Det som händer nu är en krash när vi slutar röra muspekaren!

bild

Rx har nu automatiskt skapat en ny tråd för att övervaka vår Observable och som också sköter Throttle-funktionen. UI-komponenter, som i detta fall listboxen, får endast ändras av den tråd som skapade dem. Dvs. endast UI-tråden får göra ändringar som triggar en ny uppritning. Vi kan tvinga Rx att köra på rätt tråd genom att använda ObserveOn.


	var mouseMoves = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
	subscription = mouseMoves.Throttle(TimeSpan.FromMilliseconds(200))
		.ObserveOn(SynchronizationContext.Current)
		.Subscribe(e => Form1_MouseMove(e.Sender, e.EventArgs));

Nu börjar programmet fungera bättre! Timern fungerar nu under tiden vi rör muspekaren men så fort vi slutar röra muspekaren så får vi efter 200ms ett event. Eftersom vi kör eventet på UI-tråden för att undvika krash så blockas vi nu fortfarande i 1 sekund. Tänk nu på att detta är ett exempel. Vi hade ju inte behövt göra något UI-relaterat i eventhanteraren och hade då sluppit att köra den på UI-tråden och hade då ej behövt blockera något alls.

SynchronizationContext.Current anger den aktuella tråden i detta fall UI-tråden. För andra typer av projekt WPF, Windows Phone, Silverlight etc. så kan denna bit skilja sig lite. Du kan behöva använda en "Dispatcher" istället som du skickar till ObserveOn. Det finns även fler Rx bibliotek som gör att du skulle kunna skicka komponenten (formuläret) du vill övervaka till ObserveOn.

En bieffekt är nu att det senaste eventet hela tiden rapporteras när vi inte rör muspekaren. Detta kan vi också justera:


	var mouseMoves = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
	subscription = mouseMoves.Throttle(TimeSpan.FromMilliseconds(200))
		.ObserveOn(SynchronizationContext.Current)
		.DistinctUntilChanged()
		.Subscribe(e => Form1_MouseMove(e.Sender, e.EventArgs));

DistinctUntilChanged ser till att inga dubletter av events rapporteras.

Rx och LINQ

Det går bra att göra urval med hjälp av LINQ på Observables. Exemplet ovan kan skrivas om så att vi får en Observable av typen Point. Samtidigt passar vi på att ersätta Form1_MouseMove med en anonym delegat.

Rx exempel 2

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace ReactiveTest
{
    public partial class Form1 : Form
    {
        IDisposable subscription;

        public Form1()
        {
            InitializeComponent();
            subscription = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
                .Select(e => new Point(e.EventArgs.X, e.EventArgs.Y))
                .Throttle(TimeSpan.FromMilliseconds(200))
                .ObserveOn(SynchronizationContext.Current)
                .DistinctUntilChanged()
                .Subscribe(p => {
                    listBox1.Items.Add(string.Format("Got X={0} Y={1}", p.X, p.Y));
                    Thread.Sleep(1000);
                });
        }

        private void button1_Click(object sender, EventArgs e)
        {
            subscription.Dispose();
        }

        private void timer1_Tick(object sender, EventArgs e)
        {
            textBox1.Text = DateTime.Now.ToString("HH:mm:ss,ff");
        }
    }
}

Kort om Rx

Nu har vi bara skrapat lite på ytan vad det gäller Rx. Det finns många trevliga metoder i Rx bl.a. Delay, Sample och Buffer. Det finns också många tillämpningar där Rx verkligen glänser. Många saker som är enkelt att göra i Rx är otroligt svåra att göra utan Rx. Bara en sån sak som att skapa trådar som sköter övervakning samt synkronisering med en annan tråd är många rader kod som du slipper skriva. Slipper man många rader kod så slipper man även många buggar.

Några övningar

Övning 1

Skriv om exemplet ovan med Sample så att muspositionen rapporteras var 2:a sekund.

Övning 2

Lägg till en TextBox där du övervakar TextChanged och skapar en Observable<string> med den ändrade texten. 500ms efter att du slutat skriva i textboxen så skall det nya värdet rapporteras till användaren via en MessageBox.

Lämna ett svar

Din e-postadress kommer inte publiceras. Obligatoriska fält är märkta *

Scroll to top