Parce que les développeurs ne sont pas les seuls à en baver : Les joies du sysadmin

Découverte de ReactiveX – Partie 1

Découverte de ReactiveX – Partie 1

ReactiveX est une librairie utilisée pour créer des programmes se basant sur de l’asynchronisme en utilisant le pattern Observer.

Le pattern observer au premier plan

Ce design pattern est au centre de l’architecture de la librairie. L’équipe de dév part du postulat que les techniques comme les Future (sur lesquels reposent fortement Vert.X par exemple) ne sont pas adaptées à l’imbrications d’appels asynchrones (ça peut devenir en effet rapidement compliqué de gérer des appels enchainés).

A l’inverse, le modèle Observable de ReactiveX permet de gérer les différentes réponses aux événements asynchrones aussi simplement que si elles étaient contenues dans une collection, par exemple.

ReactiveX est polyglotte

Il existe des implémentations de ReactiveX dans de nombreux langages (RxJava, RxJS, Rx.NET, RxScala, RxClojure, RxSwift…).

Dans cet article nous nous intéresserons à RxJava.

Les concepts de la librairie

La librairie offre une série de concepts pour gérer l’asynchronisme dans l’ensemble d’une application. Ces concepts sont les suivants :

Observable

Pour ceux qui ne connaissent pas ce design pattern, un observable est un objet auquel des observateurs (observers) peuvent souscrire pour rester informés des événements d’intérêt aux yeux de l’observable (un peu comme s’inscrire à une newsletter).

Ce pattern facilite les exécutions concurrentes car chaque Observable peut réaliser ses opérations et informer les Observers alors qu’ils sont entrain de réaliser d’autres opérations de leur côté.

Les operators

La librairie aurait très peu d’intérêt s’il ne s’agissait que d’une implémentation du pattern Observer. La grande force de la librairie repose dans ses extensions réactives (d’où ReactiveX) : ce sont les operators.

Ces operators permettent de transformer / combiner / manipuler / travailler les séquences de données générées par les Observables.

Vous trouverez sur ce lien la liste complète des Operator. Ils sont également classés par catégorie dont on peut noter les principales ici :

  • Les operators de création : pour la création d’objets observables (Create(), From(), Repeat() etc…)
  • Les operators de transformation : pour transformer les données émises par un observable (Buffer(), Map(), Groupby() …)
  • Les operators de filtre : pour filtrer les résultats émis par les observables (Filter(), ELementAt(), Distinct() ….)
  • Les operators de combinaison : pour combiner plusieurs observables (And / Then / When(), Merge(), Join() …)
  • Les operators de gestion d’exception (Catch(), Retry())

Consultez la documentation pour un listing complet.

Un mot sur la programmation réactive

Il faut bien comprendre qu’il s’agit d’une manière de penser différente de ce qu’on a pu utiliser jusqu’à présent avec les langages impératifs usuels. La programmation réactive, c’est programmer avec des flux de données asynchrones.

C’est le même principe que lorsque l’on clique sur un bouton : l’événement de clic est placé dans une file (un bus) en attente d’être traité. La programmation réactive n’applique pas seulement ce principe aux clics, mais à (presque) tout le reste de l’application (données, variable, fonctions etc…) .

Un flux est une séquence d’événements ordonnés dans le temps.

Hello World

Pour commencer, le classique Hello World en RxJava :

import io.reactivex.Observable;

public class HelloWorld {
   public static void main(String[] args) {
      Observable.just("Hello world").subscribe(System.out::println);
   }
}

Dans ce code, on passe la chaine de caractères « Hello World » à la méthode just() de Observable qui va nous créer un Observable ayant pour paramètre « Hello World ».

A cet observable, on indique un subscriber, la méthode System.out.println.

Récupération des utilisateurs de Github

Bon rien de bien fascinant là-dedans. Maintenant imaginons que l’on veuille récupérer une liste d’utilisateurs de github et qu’on veuille afficher uniquement les 3 premiers ?

Github fournit une URL pour cela : https://api.github.com/users

Commençons par écrire une fonction qui renvoit le contenu de cette URL :

static String readUrl(String url) throws IOException {
   URL feedUrl = new URL(url);
   BufferedReader in = new BufferedReader(new InputStreamReader(feedUrl.openStream()));

   String inputLine;
   String res = "";
   while ((inputLine = in.readLine()) != null)
      res += inputLine;
   in.close();

   return res;
}

On va obtenir du json en retour :

[{« login »: »chneukirchen », »id »:139, »avatar_url »: »https://avatars0.githubusercontent.com/u/139?v=3″, »gravatar_id »: » », »url »: »https://api.github.com/users/chneukirchen »…..

On va ensuite créer juste 2 Observables imbriqués : un pour effectuer la requête, l’autre pour traiter la réponse une fois le chargement est effectué :

public static void main(String[] args) throws InterruptedException {
   Observable<String> requestStream = Observable.just("https://api.github.com/users");

   requestStream.subscribe(new Consumer<String>() {
      @Override
      public void accept(String url) throws Exception {
         String json = readUrl(url);

         Observable<String> responseStream = Observable.just(json);
         responseStream.subscribe(new Consumer<String>() {
            @Override
            public void accept(String json) throws Exception {
               System.out.println(json);
            }
         });
      };
   });
}

Très bien. Maintenant on va parser le résultat obtenu pour afficher les utilisateurs dans 3 JLabels :

requestStream.subscribe(new Consumer<String>() {
   @Override
   public void accept(String url) throws Exception {
      String json = readUrl(url);

      Observable<String> responseStream = Observable.just(json);
      responseStream.subscribe(new Consumer<String>() {
         @Override
         public void accept(String json) throws Exception {
            updateCards(json);
         }
      });
   };
});
private void updateCards(String json) {
   ObjectMapper mapper = new ObjectMapper();
   // delete existing cards
   panel.invalidate();
   userCards.forEach(card -> panel.remove(card));
   userCards.clear();
   // JSON from file to Object
   try {
      List<GithubUser> obj = mapper.readValue(json, new TypeReference<List<GithubUser>>() {
      });
      obj.stream().limit(3).forEach(user -> addCard(user));
   } catch (IOException e) {
      e.printStackTrace();
   }
   panel.validate();

}

(Retrouvez le code source complet ici : https://github.com/alexandrelanglais/reactivex-search-users )

On parse le json récupéré avec la librairie jackson et on met à jour un JPanel avec les photos et logins des utilisateurs, ce qui nous donne :

Ok on a notre utilisation de base ReactiveX : un observable qui sert à exécuter la requête, un observer qui l’écoute et créé un autre Observable pour mettre à jour le panel d’utilisateurs lorsque la réponse est reçue.

Essayons de pousser un peu plus loin les possibilités de la librairie RxJava.

Note
A partir de maintenant nous utiliserons les lambdas offertes par Java 8 pour simplifier la lecture du code.

Le code précédent en Java 8 devient :

requestStream.subscribe(theUrl -> {
   String json = readUrl(theUrl);
   Observable<String> responseStream = Observable.just(json);
   responseStream.subscribe(jsonResult -> updateCards(jsonResult));
});

Gestion des erreurs

Voyons comment gérer les erreurs avec ce pattern, par exemple si l’URL fournie ne répond pas.

La méthode subscribe() prend en premier paramètre une action à mener si l’exécution se déroule bien (appel à onNext()). On peut également lui spécifier une 2è action à réaliser en cas d’erreur (appel à onError).

Les différents paramètres de la méthode subscribe

On peut simplement utiliser la méthode subscribe pour lui passer un comportement pour le onError :

requestStream
      .subscribe(theUrl -> {
         String json = readUrl(theUrl);
         Observable.just(json)
               .subscribe(jsonResult -> updateCards(jsonResult));
      }, throwable -> {
         JOptionPane.showMessageDialog(null, throwable.getMessage());
         System.out.println("Error: " + throwable.getMessage());
      });

Ainsi, si la requête échoue (on peut par exemple changer l’url de requête par toto://) :

String url = "toto://api.github.com/users?since=" + ((int) (Math.random() * 500.0));

Fin de la partie 1

On est encore très loin d’avoir exploité toutes les possibilités de RxJava 🙂 J’écrirai prochainement un article détaillant tout ce que l’on peut faire avec le flux d’événements reçu par les observers, comment on les filtre, les manipule, les transforme etc..

 

Laisser un commentaire