Observables

For the reference see: Observable

AnPyLar ships with its own version of Reactive Programming extensions or the Observer Pattern. It’s not because we wanted to reinvent the wheel ... it is because like AnPyLar itself ... it hadn’t been done before (in the browser, we mean)

It builds on being able to run Python in the browser and on the per-se asynchronous nature of hot things are executed. For the implementation the following reference sources have been used:

Note

The Reactive Extensions contains a lot of operators. AnPyLar has implemented some of the most useful first and will slowly build up by adding more.

Where needed, the implementation deviates by joining concepts together (Observable and Subject in bindings) to simplify the usage of the library and patterns for the end user.

In case you are not willing to read the above documents, let’s go for a quick explanation:

  • You create an Observable which has a source that generates values
  • You can connect operations to the Observable, that will process (simple observation, edition, filtering, switching to a different source) the generated values
  • You can attach end-of-chain subscriptions that will receive the generated and processed values for consumption
  • Whenever a value is generated by the source, the observable will push the value through the chain of operations all the way down to subscribers.

Example 1

from anpylar import Observable

o = Observable.from_([1, 2, 3])  # Observable from iterable

o.map(lambda x: x * 2)  # map each value to the operation value * 2

o.subscribe(print)  # subscribe print to each value

This will generate the following (in the developer console of the browser):

2
4
6

Note

You can test this simply script with anpylar-serve without creating a complicated structure by placing the contents in a file index.py and doing:

anpylar-serve --auto-serve index.py

We had the following:

  • A source: [1, 2, 3]. This can be made into an Observable with Observable.from_ which will generate as much values as the iterable has

  • A processing operation via map, which takes a callable and calls it with each of the values passed from the soource

    Our callable was a simple value doubler: lambda x: x * 2

  • An end of chain subscription with subscribe(print). print is already a callable, so we don’t need to define it is as lambda, because it will simply print out whatever is given. We could have been more expressive with something like:

    o.subscribe(lambda x: print('My doubled value is:', x))
    

Example 2

This is taken straight out of the original Tour of Pyroes tutorial: Part 6 - Networking: http. And it could actually be part of a real web application.

class PyroSearchComponent(Component):
    selector = 'pyro-search'

    bindings = {
        'pyroes': [],
        'searchterm': '',
    }

    def __init__(self):
        # connect searchterm to the found pyroes to be displayed
        self.searchterm_ \
            .debounce(300) \
            .distinct_until_changed() \
            .switch_map(lambda x: self.pyro_search.search(x) if x else []) \
            .catch_exception(lambda e: print('search error:', e) or []) \
            .subscribe(self.pyroes_)

Recall that in AnPyLar the declaration of a binding does also declare an associated observable with an appended _. So that:

bindings = {
    'pyroes': [],
    'searchterm': '',
}

has also generated two observables reachable as self.pyroes_ and self.searchterm_

See also

You can have a look again in the Bindings section

Concentrating on the Observables

# connect searchterm to the found pyroes to be displayed
self.searchterm_ \
    .debounce(300) \
    .distinct_until_changed() \
    .switch_map(lambda x: self.pyro_search.search(x) if x else []) \
    .catch_exception(lambda e: print('search error:', e) or []) \
    .subscribe(self.pyroes_)

The chain of events

  • self.searchterm_ is the Observable generated by the binding.

  • debounce(300) buffers generated values by 300ms awaiting to see if a new value is generated in that time. If that happens, the original value is discarded and the new one undergoes a new waiting time before being further delivered. This models events like typing and avoids looking for a search term too early while the end user is typing new characters which will make up the complete search term.

  • distinct_until_changed() buffers the previous value a will only forward a new value if different from the previous one. This avoids looking for the same search term twice in a row.

  • switch_map(lambda x: self.pyro_search.search(x) if x else []) chooses what to delivering a new Observable. In this case it chooses between an empty [] or the return value of self.pyro_search.search(x) (which will by the way also be an Observable) using the incoming value x as the key for the choice. In effect, it tries to avoid sending a query to the network if the value typed by the end user is empty. This can be resolved locally.

  • catch_exception(lambda e: print('search error:', e) or []) catches any errors and in our case logs them to the console and replaces the result for the subscription with an empty []

  • subscribe(self.pyroes_) subscribes our self.pyroes_ observable to the processed value.

    Our self.pyroes_ observable is callable because it has been generated by a binding and being a callable is a property of this type of observable (this design choice was made to allow patterns as the one seen in this subscription)

This is already a complex observable chain and one that is a working one. See the Part 6 - Networking: http for a full working app.