Programowanie reaktywne w RxJava

Programowanie reaktywne z użyciem RxJava

Najbardziej znany przykład wykorzystanego w praktyce programowania reaktywnego to komórki w programie MS Excel. W komórkach obliczane są wartości z użyciem formuł zapisanych w innych komórkach. Jeśli któraś z tych formuł się zmieni to wszystkie komórki zależne ulegają zmianie wartości. Jeśli więc komórka zależy od innych, to będzie reagowała zmianą swo­jego stanu na zmianę stanu innej komórki od której zależy. Nazwa programowanie reaktywne odzwierciedla koncept reagowania na zmiany. Wadą tego podejścia jest to, że czas przetwarzania wszystkich elementów jest dłuższy niż w klasycznym podejściu, ale w zamian za to otrzymujemy możliwość natychmiastowego reagowania na każdy element który został przetworzony.

Programowanie reaktywne to nic innego jak rozszerzenie znanego już od dawna wzorca obserwator:

  • Observable – dostarcza strumień danych do przetwarzania.
  • Observer – przetwarza kolejno dane z użyciem następujących metod:
    • onNext() – przetwarzanie kolejnego elementu,
    • onCompleted() – po zakończeniu przetwarzania wszystkich elementów,
    • onError() – w momencie wystąpienia błędu w trakcie przetwarzania elementów.
String[] letters = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
  i -> result += i,  // onNext
  Throwable::printStackTrace, // OnError
  () -> result += "_finished" // OnCompleted
);
assertTrue(result.equals("abcdefghij_finished"));

W powyższym przykładzie dołączona została subskrypcja na strumień danych co umożliwia otrzymywanie powiadomień o pojawieniu się nowych wartości. Przykład który zrealizowany zostanie w tym artykule to praktyczny program który z użyciem programowania reaktywnego i mechanizmu wątków pobierze tytuły artykułów z Wikipedii i zapisze je wszystkie do zdefiniowanego pliku. Zaczynamy od nowego projektu i dodania zależności do pliku pom.xml:

<dependencies>
    <dependency>
        <groupId>org.jsoup</groupId>
        <artifactId>jsoup</artifactId>
        <version>1.7.2</version>
    </dependency>
 
    <dependency>
        <groupId>io.reactivex</groupId>
        <artifactId>rxjava</artifactId>
        <version>1.3.8</version>
    </dependency>
</dependencies>

Klasa WikiManager która służy do pobrania listy artykułów:

public class WikiManager {
 
    public static String [] urls = {
            "http://en.wikibooks.org/wiki/Shelf:Computer_science",
            "http://en.wikibooks.org/wiki/Shelf:Computer_software",
            "http://en.wikibooks.org/wiki/Shelf:Web_applications"};
 
    private static List<String> getWikiNewsFromUrl(String url) throws IOException {
        List<String> news = new ArrayList();
        Document doc = Jsoup.connect(url).get();
        Elements newsHeadlines = doc.getElementsByAttribute("href");
        newsHeadlines.forEach(newsTitle -> news.add(newsTitle.attr("title")));
        return news;
    }
 
    public static List getWikiNewsFromUrlApi(String url) {
        List wikiLinks = null;
        try {
             wikiLinks = getWikiNewsFromUrl(url);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wikiLinks;
    }
}

Klasa FileHelper która służy do zapisu pobranych danych do pliku – jeśli plik nie istnieje to zostanie utworzony:

public class FileHeper {
 
    private static void saveToFileListElements(String fileNAme, List<String> content) throws IOException {
        content.removeAll(Arrays.asList(""));
        FileWriter fileWriter = new FileWriter(fileNAme, true);
        content.forEach(article -> {
            try {
                fileWriter.write(article.trim());
                fileWriter.write(System.lineSeparator());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        fileWriter.close();
    }
 
    public static void saveToFileListElementsApi(String fileNAme, List<String> content){
        try {
            saveToFileListElements(fileNAme,content);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Klasa która z użyciem programowania reaktywnego – RxJava oraz z użyciem zdefiniowanych adresów URL pobierze artykuły z Wikipedii a następnie zapisze je do pliku. Każde pobranie listy artykułów oraz zapis ich do pliku odbywa się w oddzielnym wątku. Po zakończeniu przetwarzania wykonana zostanie metoda onCompleted().

public class ProcessWikiLinksController {
 
    public void process() {
 
        Observer<String> observers = new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("process completed");
            }
 
            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }
 
            @Override
            public void onNext(String s) {
               Runnable runnable = () -> {
                   FileHeper.saveToFileListElementsApi("article.txt", WikiManager.getWikiNewsFromUrlApi(s));
               };
               Thread thread = new Thread(runnable);
               thread.start();
            }
        };
 
        Observable.from(WikiManager.urls).subscribe(observers);
    }
 
    public static void main(String[] args) {
        new ProcessWikiLinksController().process();
    }
 
}

W wyniku zostanie utworzony (jeśli już nie istnieje) w katalogu projektu plik article.txt. Kolejne wykonanie programu nie tworzy na nowo pliku tyko dopisuje do niego dane.

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

 

Leave a comment

Your email address will not be published.


*