When Iron Man becomes reactive, RxJava

This part of the series is focused on the benefits that some funcional touches can provide a to our projects.

A framework like RxJava of ReactiveX can easily help to handle different running environments running on tasks in background or in the UI thread. On android that has always been a nightmare for all us.

This article also focuses on how the operators can minimize the time in common development tasks, Reactive Extensions offers a wide range of operators to make your live easier.

As always, most of the code and snippets are uploaded to Github, feel free to comment, open an issue or complaining!.

Avengers app on Github

In the first part of this series we talked about Dagger 2, as we go forward we'll see as the coupling between our layers will be less and increase scalability.


RetroLambda

Sometimes, in huge applications developed in Java, or large frameworks like android, it's really difficult or even impossible (android) to use features from Java 8 like the Lambda expressions.

Retrolambda it's a Backport that comes to solve this problem, translating the Java 8 bytecode to previous Java versions like the v7, or even v5 or v6, allowing us to use Lambda among other features.

Retrolambda can be used with the Gradle plugin or Maven, I've choosen Gradle that comes for free in Android Studio, to use it you only have to add the Retrolambda plugin on the root build.gradle and apply it in your module build.gradle, setting the language level of Android Studio to 1.8 all it's done.

build.gradle (root)

    dependencies {
        classpath 'me.tatarka:gradle-retrolambda:3.1.0'

{your module}/build.gradle


    apply plugin: 'me.tatarka.retrolambda'

    android { 

        ...

        compileOptions {
            sourceCompatibility JavaVersion.VERSION_1_8
            targetCompatibility JavaVersion.VERSION_1_8
        }
    }
}

RetroLambda allows you to write less boilerplate code, also clarifies our code making it more readable. In this example by Dan Lew, you can taste the diference.

Without RetroLambda

Observable.just("Hello, world!")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
              System.out.println(s);
        }
    });

With RetroLambda

Observable.just("Hello, world!") .subscribe(
    s -> System.out.println(s)
);

In the Avengers example

mCharacterSubscription = mGetCharacterInformationUsecase
    .execute().subscribe(
        character -> onAvengerReceived(character),
        error     -> manageError(error)
    );

mComicsSubscription = mGetCharacterComicsUsecase
    .execute().subscribe(
        comics -> Observable.from(comics).subscribe(
            comic -> onComicReceived(comic)),
        error  -> manageError(throwable)
    );


ReactiveX

ReactiveX is a collection of Open Source projects among their main principles are the Observer pattern, the pattern Iterator and the functional programming.

ReactiveX it's also defined as an API for asynchronous programming, in fact it's really easy to implement asynchronous task with these frameworks.


ReactiveX as an asynchronous client

The great thing about dealing with ReactiveX is that you can create a fully asynchronous API or client, and then in the implementation decide whether the code will be treated asynchronously or in a separate Thread a Threadpool or synchronously.

So we have an observable API rather than a blocking API.

public interface Usecase<T> {

    Observable<T> execute();
}
public interface Repository {

    Observable<Character> getCharacter (final int characterId);

    Observable<List<Comic>> getCharacterComics (final int characterId);
}


What is RxJava

RxJava is an implementation of the Reactive Extensions made by Netflix. There are implementations for the vast majority of programming languages including Javascript, Python, Ruby, Go and many more.


Observables & Observers

An Observable emits an object or series of objects, these objects are consumed or received by the Observer subscribed to the 'Observable'.


Is necessary that an Observer has to be registered into an Observable, if not the Observer won't emit anything. When the Observer is registered, an object of the type Subscription is created, which is used to unsubscribe from the Observable, this is useful for Activities and Fragments on the onStop or onPause methods, for example.

mCharacterSubscription = mGetCharacterInformationUsecase
    .execute().subscribe( ... );

@Override
public void onStop() {

    if (!mCharacterSubscription.isUnsubscribed())
        mCharacterSubscription.unsubscribe();

    if (!mComicsSubscription.isUnsubscribed())
        mComicsSubscription.unsubscribe();
}

Whenever an Observer subscribes to an Observable has to take into account three methods.


I love this image :)


Communicating components

Let's take a look at how to use the GetCharacterInformationUsecase usecase. All usecases implement the interface Usecase <T>:

public interface Usecase<T> {

    Observable<T> execute();
}

When a usecase is run this return an object of the type Observable, this is useful to be able to chain observables & operators with the least effort, we will see the great power that these operators soon.

When we run the GetCharacterInformationUsecase we say to our repository to make a request to our corresponding data source:

@Override
public Observable<Character> execute() {

    return mRepository.getCharacter(mCharacterId);
        // .awesomeRxStuff();
}

The presenter AvengerDetailPresenter will be our Observer for this usecase will be who subscribes to the events sent by the Observable, this is done through the method subscribe, which connects the Observer with the Observable.

onNext and onError methods are implemented to manage the results of the operation. The onCompleted method is not implemented in this case it's not necessary.

    mCharacterSubscription = mGetCharacterInformationUsecase
        .execute().subscribe(
            character   -> onAvengerReceived(character),
            error       -> manageError(error));


Retrofit & RxJava

Retrofit from Square, RxJava supports methods of the type rx. Observable so the requests can be observed using Observers and modified and transformed by operators.

You must be aware of where are you calling it, Retrofit executes the requests in the thread where your Observable leaves, so if you call it from the UI thread (an activity or Fragment) would get an error. Let's talk about Schedulers!.

Schedulers

The http://reactivex.io/documentation/scheduler.html allow to use multhreading between operators and Observables. This can be used with different threads, a Thread Executor, or preset Schedulers, for example, for input and output operations exists the Schedulers.io ().

RxAndroid are a few specific android utilities for RxJava by Jake Wharton & Matthias Käppler, which includes some Schedulers to manage the threads of the Android platform.

It also provides the possibility of using an android Handler for concurrency management.

    @Override
    public Observable<Character> execute() {

        return mRepository.getCharacter(mCharacterId)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
    }

This example demonstrates the ease provided by Rx for managing threads in Android, which has always been a terror to all :D

Operators

ReactiveX great power lies in the operators, they allow to manipulate, transform and combine objects issued by the Observables.

Let's think about a list of comics of a character, comics have a specific year, and we want to show the comics to a given year. ReactiveX comes to help us!

This filtering is done use of operator filter, which allows using a predicate to discriminate between the comics, in such a way, ask the user why year wants to filter and that year uses predicate to display comics to date.

public Observable<Comic> filterByYear(String year) {

    if (mComics != null) {
        return Observable.from(mComics).filter(
            comic -> {
                for (ComicDate comicDate : comic.getDates())
                    if (comicDate.getDate().startsWith(year))
                        return true;

                return false;
        });

    }

    return null;
}

 Error Handling

Another great example of how the Rx operators can save us some time and give us productivity are the error handling Operators.

Imagine that a user makes a request to the network, and coincidentally is on the bus or subway passing through tunnel, connectivity in that case will be affected.

When we receive a SocketTimeoutException byRetrofit, what we will do will be to take advantage of the operator retry.

The operator retry will accept a predicate as in the case of the operator 'filter', if we return true in that predicate the magic of Rx will issue the 'Observable' again making the Retrofit request for us.

If as most 3 'SocketTimeoutExceptions' are fired, the program flow will go to the onError method to manage the error.

@Override
public Observable<List<Comic>> getCharacterComics(int characterId) {

    final String comicsFormat   = "comic";
    final String comicsType     = "comic";

    return mMarvelApi.getCharacterComics(
        characterId, comicsFormat, comicsType)
            .retry((attemps, error) -> 
                error instanceof SocketTimeoutException && 
                attemps < MAX_ATTEMPS);
}

Resources