Observable

Subcription

The final target of an Observable, be it directly or after operations have been applied to it is that a subscription takes place. A subscription is performed by an Observer.

Ideally an Observer would be an instance of a class defining these 3 methods.

class Observer:

    def on_next(self, value):
       '''Called with each value generated by the Observable'''

    def on_completed(self):
       '''Called when the Observable can no longer produce values'''

    def on_error(self, error):
       '''Called when an error has occurred in the chain'''

Note

A class with the 3 methods is offered inside AnPyLar, where the needed methods can be overridden by the end user

In practice the following suffices

class Observer:

    def on_next(self, value):
        # Receive the next value generated by the Observable and do something
        pass

or a simple callable

def observer(value):
    # Receive the next value generated by the Observable and do something
    pass

Subscribing:

  • ObservableSource.subscribe(on_next, on_completed=None, on_error=None, observer=None)

or

  • with chained operations:

    ObservableSource \
        .op1() \
        .op2() \
        .subscribe(on_next, on_completed=None, on_error=None, observer=None)
    

The parameters work as follows:

  • If observer is not None, it will be taken as the target for the values. It is assumed it will be an object with the following methods

    • on_next, on_completed, on_error

    Note

    In this case and because on_next has no default value in the signature, anything can be passed, including None

  • If observer is None

    • If on_completed is None, it will then be sought as an attribute of on_next
    • If on_error is None, it will then be sought as an attribute of on_next
    • on_next will be scanned for an attribute called on_next. If found it will be taken, else the passed value will be used.

The pattern which one will usually apply:

observable.subscribe(callable)

A practical example:

Observable.from_(3).map(lambda x: x * 2).subscribe(print)

which prints:

6

Sources

The following methods create an Observable using the provided parameters.

from_

from_(iterable)

Generates an observable from iterable, generating as many values as elements are present in iterable

of

of(*args)

Generates an observable from *args, generating as many values as arguments are provided

range

range(start, count, step=1)

Generates an observable that will issue count events starting with start and increasing each iteration by step

throw_ (source)

throw_(throw)

Create an Observable that delivers an error using throw

Note

There is also a throw_ operator. See below.

Operators

The following operators can be applied to Observables. They can be chained.

all

all(predicate)

Check if all items generated by the Observable meet the condition determined by predicate

catch_exception

catch_exception(handler)

Swicth to another observable if an error has been produced and on_error would be invoked in the subscriber.

  • If handler is an Observable, it will be forwarded to it
  • Else, handler must be a callable accepting the value and returning the value to switch to for forwarding
  • If the forwarded value is an Observable it will be the result to switch to
  • Else, the forwarded value will be converted using Observable.of

debounce

debounce(ms)

Delay the generated value by the amount of milliseconds ms and discard the delayed value if a new one is produced.

defval

defval(defval)

This is a particular implementation in AnPyLar. It provides a default value for the Observable + Chain of Operators, intended for elements that for example subscribe in the background (like html nodes do)

It is intended to be used as the last operator in a chain (although with care it can be used earlier in the chain)

delay

distinct(predicate=None)

Let the value through if it has not been seen before. If predicate is provided it will be used to assess if a value is distinct from previous values

Note

Use with care, because on long running observables, the buffer will grow with no limits.

distinct

distinct(predicate=None)

Let the value through if it has not been seen before. If predicate is provided it will be used to assess if a value is distinct from previous values

Note

Use with care, because on long running observables, the buffer will grow with no limits.

distinct_until_changed

distinct_until_changed(predicate=None)

Let the value through if it is the same that was last seen. If predicate is provided it will be used to assess if a value is distinct from previous values

do_action

do_action(action)

It forwards values after calling action with the incoming value.

The value is forwarded as is, regardless of the actions of action

filter

filter(predicate)

Forward the value only if predicate evaluates the value to True

first

first(predicate=None)

Take only the first value.

If predicate is provided, it will be used to determine which is the first value.

first_or_default

first_or_default(predicate=None, default_value=None)

Take only the 1st value. If predicate is provided, it will be invoked with each value and the 1st value will be that for which predicate returns True

If no first value can be delivered, the operator will deliver default_value on completion.

map

map(fn)

Apply fn to the generated values generated, forwarding each result

nop

nop(*args, **kwargs)

A no-operation operator. It will simply forward values.

publish

publish()

Freezes an observable source to make it multicast. Several subscriptions can take place without values being forwarded to them.

It supports the following additional operations:

  • auto_connect(count)

    Unfreeze the observable after count subscriptions have taken place

  • connect()

    Unfreeze the observable

switch_map

switch_map(handler)

Swith to another observable based:

  • If handler is an Observable, it will be switched to it
  • Else, handler must be a callable accepting the value and returning the value to switch to
    • If the return value is an Observable it will be the result to switch to
    • Else, the return value will be converted using Observable.of

take

take(count)

Forward at most count values before ending the subscription.

throw_ (operator)

throw_(throw)

Generate an error throw as the error value

Note

This is the operator and not the throw_ source.