Peter Goodman bio photo

Peter Goodman

A software engineer and leader living in Auckland building products and teams. Originally from Derry, Ireland.

Twitter Google+ LinkedIn Github

I have been meaning to get into Rx for a while now and haven’t quite found the excuse or opportunity to do so. While playing with SignalR it became apparent that Rx was something that could help and while I was researching it I seen a lot of people talking about using Rx in UI frameworks and I thought, hmmm, that sounds interesting.

So I set myself a challenge, I wanted to use Rx to create an auto-complete WPF control with some of the major use cases I have seen in these types of controls.

The App Shell

First thing I was going to need was a WPF application with a MainWindow that would hold my text box, the results and a log window.

 CropperCapture[1]

The XAML looks something like this:

<Window
    x:Class="AutoCompleteWithRx.MainWindow"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    Title="AutoComplete with Rx"
    Height="451"
    Width="825"
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
    xmlns:AutoCompleteWithRx="clr-namespace:AutoCompleteWithRx"
    mc:Ignorable="d"
    d:DataContext="{d:DesignInstance AutoCompleteWithRx:AutoCompleteViewModel}"
    FocusManager.FocusedElement="{Binding ElementName=SearchBox}">
    <Grid>
        <Grid.RowDefinitions>
            <RowDefinition
                Height="Auto" />
            <RowDefinition />
        </Grid.RowDefinitions>
        <Grid.ColumnDefinitions>
            <ColumnDefinition />
            <ColumnDefinition />
        </Grid.ColumnDefinitions>
        <TextBlock
            FontFamily="Segoe WP Light"
            FontSize="26.667"
            Text="Auto-Complete Sample"
            Grid.ColumnSpan="2" 
            
            />

        <StackPanel
            Grid.Row="2">
            <Label>Search</Label>
            <TextBox
                x:Name="SearchBox"
                Text="{Binding SearchText, UpdateSourceTrigger=PropertyChanged}"/>
            <ProgressBar Height="19" IsIndeterminate="True"/>
            <ListBox
                ItemsSource="{Binding SearchResults}" />
        </StackPanel>
        <StackPanel Grid.Column="1" Orientation="Vertical" Grid.Row="1" d:LayoutOverrides="Height">
            <Label>Log</Label>
            <ListBox
                ItemsSource="{Binding LogOutput}" />
        </StackPanel>
    </Grid>
</Window>

So that is basically the view for now. I have cheated and just instantiated my viewmodel in the code behind class for now.

public partial class MainWindow : Window {
    public MainWindow() {
        InitializeComponent();
        DataContext = new AutoCompleteViewModel();
    }
}

Now that we have a window I need to get my view model up to scratch. I started with simply satisfying the data properties required by my form.

public class AutoCompleteViewModel : INotifyPropertyChanged {

    private string searchText;

    public string SearchText {
        get { return searchText; }
        set {
            if (searchText == value) {
                return;
            }
            searchText = value;
            NotifyPropertyChanged("SearchText");
        }
    }

    private readonly ObservableCollection<string> logOutput = new ObservableCollection<string>();

    public ObservableCollection<string> LogOutput {
        get { return logOutput; }
    }

    private readonly ObservableCollection<string> searchResults = new ObservableCollection<string>();

    public ObservableCollection<string> SearchResults {
        get { return searchResults; }
    }

    public event PropertyChangedEventHandler PropertyChanged;

    protected virtual void NotifyPropertyChanged(string propertyName) {
        if (PropertyChanged != null) {
            PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
        }
    }
}

 

Subscribing to Property Changed Events using Rx

OK. So that is simple enough but now I need to make my textbox do something when the user types into it. Before I start doing this though I will need to add Rx to my project. Right click the project choose “Manage Nuget Packages” and search for Rx, install Rx-Main and Rx-WPF.

Let’s start by simply logging the fact that the user has typed something. Add a constructor to our view model class with the following code:

public AutoCompleteViewModel() {
    // Listen to all property change events on SearchText
    var searchTextChanged = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
        ev => PropertyChanged += ev,
        ev => PropertyChanged -= ev
        )
        .Where(ev => ev.EventArgs.PropertyName == "SearchText");

    // Transform the event stream into a stream of strings (the input values)
    var input = searchTextChanged
        .Select(args => SearchText);

    // Log all events in the event stream to the Log viewer
    input.ObserveOn(DispatcherScheduler.Instance)
        .Subscribe(e => LogOutput.Insert(0, 
            string.Format("Text Changed. Current Value - {0}", e)));
}

The first section of slightly unwieldy syntax creates an IObservable from the PropertyChanged event of INotifyPropertyChanged which our view model implements. You can think of the IObservable as a stream of events that will happen or a collection of future objects that will be added to each time the event is raised. Our code will then filter those events to only those concerning the SearchText that our textbox is bound to.

The next line transforms the type of the events in our IObservable from PropertyChangeEventsArgs to string, the content of the textbox. Much more useful.

Finally we need to subscribe to the event stream and do something with the string, firstly though we tell the subscription that we want to observe the events on the WPF dispatcher (This helper singleton comes from the Rx-WPF package). Then we simply provide an inline function to execute for each event, in our case inserting into the log stack which our LogOutput list box is bound to.

The result looks like this.

CropperCapture[2]

OK. So far so good. Now let’s add an actual search to the mix.

Chaining the Asynchronous Search

Before I add my search function I want to create a structure that will encapsulate the search term and it’s result.

public struct SearchResult {
    public string SearchTerm { get; set; }
    public IEnumerable<string> Results { get; set; } 
}

Next I add my search function. For now I’ve just created a synchronous function but chances are you would be calling an external service with Async methods (BeginXXX and EndXXX) for which you should use the FromAsyncPattern method instead of the ToAsync I am using.

private SearchResult DoSearch(string searchTerm) {
    return new SearchResult {
        SearchTerm = searchTerm,
        Results =
            phrases.Where(item => item.ToUpperInvariant().Contains(searchTerm.ToUpperInvariant())).ToArray()
    };
}

private readonly string[] phrases = new[] {
    "The badger knows something",
    "Your head looks like a pineapple",
    "etc...",
};

And now for the magic. To invoke our search we add the following to our constructor:

// Setup an Observer for the search operation
var search = Observable.ToAsync<string, SearchResult>(DoSearch);

// Chain the input event stream and the search stream
var results = from searchTerm in input
              from result in search(searchTerm)
              select result;

// Log the search result and add the results to the results collection
results.ObserveOn(DispatcherScheduler.Instance)
.Subscribe(result => {
    searchResults.Clear();
    LogOutput.Insert(0, string.Format("Search for '{0}' returned '{1}' items", result.SearchTerm, result.Results.Count()));
    result.Results.ToList().ForEach(item => searchResults.Add(item));
    }
);

Here we firstly call ToAsync, this will create a function that takes a string (our search term) and returns an Observable that will only ever produce one event (the Completed / End) and then close.

We chain the incoming input stream with the result of the search function using simplified LINQ syntax and produce a new observable of SearchResult. We then observe the result, logging and updating the autocomplete items.

The result is this:

CropperCapture[4]

OK. So this is round about where my mind exploded and grey matter started to leak out of my ears. Basically what is happening is we are treating these incoming events as collections before they actually happen, then we can manipulate them using LINQ and even chain the events much like pipelining in UNIX / Powershell.

We’re not done yet. We still have a number of issues.

Using Throttle to enforce an idle period before searching

One of the issues you can see in the above screenshot is that the search was done after every single key press. The owner of the service we are calling will not be too happy with us flooding them with unnecessary searches, so how do we stop this from happening. Typically we would create a timer that would wait for a period of inactivity before searching, we would have to reset the timer every time the user typed something so that the search is not performed for each of the previous characters and there would be quite a few lines of code required to actually do this.

With Rx we can use the Throttle operator which will quite effectively do exactly the behaviour we want for free. We change our previous declaration of the input stream to add the Throttle method.

var input = searchTextChanged
    .Throttle(TimeSpan.FromMilliseconds(400))
    .Select(args => SearchText);

Now if we test we should get something like the following:

CropperCapture[5]

That was too easy.

Merging two event streams

The next requirement was that I may want a larger throttle timeout for strings of less than 3 characters as people tend to type slower at the start of a search. To do this I needed to split my input stream into two different event streams that could be throttled differently.

var input = searchTextChanged
    .Where(ev => SearchText == null || SearchText.Length < 4)
    .Throttle(TimeSpan.FromSeconds(3))
    .Merge(searchTextChanged
        .Where(ev => SearchText != null && SearchText.Length >= 4)
        .Throttle(TimeSpan.FromMilliseconds(400)))
    .Select(args => SearchText);

So above I have simply cut the searchTextChanged stream 2 different ways, throttled them differently and then I can use the Merge operator to join the streams back together again into one.

Creating a Reactive ICommand for MVVM

The next requirement is that the search should be executed immediately if the user hits the enter key. To do this I decided to use the MVVM ICommand pattern. We need to add the following Input KeyBinding to the TextBox in our XAML so that the enter key will be picked up and we point it at the command we will write below.

<TextBox
    x:Name="SearchBox"
    Text="{Binding SearchText, UpdateSourceTrigger=PropertyChanged}">
    <TextBox.InputBindings>
        <KeyBinding
            Command="{Binding TextBoxEnterCommand}"
            Key="Enter" />
    </TextBox.InputBindings>
</TextBox>

The next thing we need to do is create our own command type. If you use an MVVM framework that understands Rx like ReactiveUI then you should get this for free but implementing a simple version yourself is not difficult. We simply take the RelayCommand defined by Josh Smith and add a Subject<T> exposed as an Observable.

public class ReactiveRelayCommand : ICommand {
    private readonly Action<object> execute;
    private readonly Predicate<object> canExecute;
    
    public ReactiveRelayCommand(Action<object> execute)
        : this(execute, null) {
    }

    public ReactiveRelayCommand(Action<object> execute, Predicate<object> canExecute) {
        if (execute == null)
            throw new ArgumentNullException("execute");

        this.execute = execute;
        this.canExecute = canExecute;
    }

    [DebuggerStepThrough]
    public bool CanExecute(object parameter) {
        return canExecute == null || canExecute(parameter);
    }

    public event EventHandler CanExecuteChanged {
        add { CommandManager.RequerySuggested += value; }
        remove { CommandManager.RequerySuggested -= value; }
    }

    public void Execute(object parameter) {
        execute(parameter);
        executed.OnNext(parameter);
    }

    private readonly Subject<object> executed = new Subject<object>();

    public IObservable<object> Executed {
        get { return executed; }
    }
}

Notice that it exactly the same as the standard RelayCommand except that we use a Subject<object> as the backing implementation for our IObservable. We call OnNext whenever the command is Executed.

Add the following field and property to the viewmodel class to store our command:

private ReactiveRelayCommand textBoxEnterCommand;
public ReactiveRelayCommand TextBoxEnterCommand {
    get { return textBoxEnterCommand; }
    set { textBoxEnterCommand = value; }
}

Then at the start of our constructor we setup the command itself:

// Setup the command for the enter key on the textbox
textBoxEnterCommand = new ReactiveRelayCommand(obj => { });

Next we need to change our input event stream to take into account our command and merge that with our SearchText property change events so that either can fire our search:

// Transform the event stream into a stream of strings (the input values)
var input = searchTextChanged
    .Where(ev => SearchText == null || SearchText.Length < 4)
    .Throttle(TimeSpan.FromSeconds(3))
    .Merge(searchTextChanged
        .Where(ev => SearchText != null && SearchText.Length >= 4)
        .Throttle(TimeSpan.FromMilliseconds(400)))
    .Select(args => SearchText)
    .Merge(
        textBoxEnterCommand.Executed.Select(e => SearchText));

Notice we have just added the last line to merge the Executed observable on our command. Now when we run we can type a single character and hit enter if we do not want to wait for the idle period. How easy was that? Are we done? Almost.

Removing duplicate consecutive events

We still have an issue that when the user presses Enter, the idle timeout will expire some time later and we will get another event firing, essentially performing our search twice. So how do we stop the second duplicate event from being fired, easy, we just use DistinctUntilChanged.

We want a distinct search but we still want to allow the same search to be done later if the user tries another search in between. In other words we can have the same search done twice as long as they aren’t consecutive searches.

// Transform the event stream into a stream of strings (the input values)
var input = searchTextChanged
    .Where(ev => SearchText == null || SearchText.Length < 4)
    .Throttle(TimeSpan.FromSeconds(3))
    .Merge(searchTextChanged
        .Where(ev => SearchText != null && SearchText.Length >= 4)
        .Throttle(TimeSpan.FromMilliseconds(400)))
    .Select(args => SearchText)
    .Merge(
        textBoxEnterCommand.Executed.Select(e => SearchText))
    .DistinctUntilChanged();

 

Cancelling previous searches with TakeUntil

The last issue here is that when searches return out of sequence, we could get the wrong results displayed. For example, the user searches for cr hits enter but then types crazy and hits enter. If the first search is taking a long time because of the number of results then the results for it will end up being displayed after the actual current search term is displayed. To simulate this we could add some latency to our search.

private readonly Random random = new Random(5);
private SearchResult DoSearch(string searchTerm) {
    Thread.Sleep(random.Next(100, 500)); // Simulate latency
    return new SearchResult {
        SearchTerm = searchTerm,
        Results =
            phrases.Where(item => item.ToUpperInvariant().Contains(searchTerm.ToUpperInvariant())).ToArray()
    };
}

To solve this we need to stop the Async stream when more input is given, this is pretty easy with the TakeUntil operator. Take operators define a completion for an event stream. In the case of an Async operation this will effectively cancel the Async subscription. TakeUntil takes an observable as input such that an event on that stream will complete the current stream. To implement this change we simply say the we TakeUntil the input stream in our chaining code.

// Chain the input event stream and the search stream, cancelling searches when input is received
var results = from searchTerm in input
              from result in search(searchTerm).TakeUntil(input)
              select result;

Summary

So we have managed to create the guts of an auto-complete control using Rx in a much more declarative way by responding to events in observables instead of writing lots of imperative code with little items of state sitting about to represent our state machine.

All of the heavy work can be easily seen in a single declaration of intent:

// Setup the command for the enter key on the textbox
textBoxEnterCommand = new ReactiveRelayCommand(obj => { });

// Listen to all property change events on SearchText
var searchTextChanged = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
    ev => PropertyChanged += ev,
    ev => PropertyChanged -= ev
    )
    .Where(ev => ev.EventArgs.PropertyName == "SearchText");

// Transform the event stream into a stream of strings (the input values)
var input = searchTextChanged
    .Where(ev => SearchText == null || SearchText.Length < 4)
    .Throttle(TimeSpan.FromSeconds(3))
    .Merge(searchTextChanged
        .Where(ev => SearchText != null && SearchText.Length >= 4)
        .Throttle(TimeSpan.FromMilliseconds(400)))
    .Select(args => SearchText)
    .Merge(
        textBoxEnterCommand.Executed.Select(e => SearchText))
    .DistinctUntilChanged();

// Log all events in the event stream to the Log viewer
input.ObserveOn(DispatcherScheduler.Instance)
    .Subscribe(e => LogOutput.Insert(0, 
        string.Format("Text Changed. Current Value - {0}", e)));

// Setup an Observer for the search operation
var search = Observable.ToAsync<string, SearchResult>(DoSearch);

// Chain the input event stream and the search stream, cancelling searches when input is received
var results = from searchTerm in input
              from result in search(searchTerm).TakeUntil(input)
              select result;

// Log the search result and add the results to the results collection
results.ObserveOn(DispatcherScheduler.Instance)
.Subscribe(result => {
    searchResults.Clear();
    LogOutput.Insert(0, string.Format("Search for '{0}' returned '{1}' items", result.SearchTerm, result.Results.Count()));
    result.Results.ToList().ForEach(item => searchResults.Add(item));
    }
);

 

Lots of the ideas and even the odd piece of code were taken from: