Java 8 – stream & parallel stream & collect

Java 8 – stream & parallel stream & collect

Stream.collect() pozwala tworzyć nowe struktury danych, przetwarzać je oraz umożliwia wyciąganie informacji które są potrzebne. Składnia jest następująca:

collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner)

gdzie trzeci parametr używany jest w parallel streams.

 final List<Person> persons = Arrays.asList(
                new Person("Max",    18),
                new Person("Peter",  23),
                new Person("Pamela", 24),
                new Person("David",  12));
final List<Person> newPersonsList = persons
               .stream()
               .collect(
                       LinkedList::new,
                       (result, element) -> {
                           System.out.println(
                                   "accumulator: " + result + " => " + element
                           );
                           result.add(element);
                       },
 
                       // combiner is only used in parallel streams,
                       (p1, p2) -> {
                           System.out.println(
                                   "combiner: " + p1 + "," + p2
                           );
                       }
               );
 
 System.out.println(newPersonsList);

Wynik:

accumulator: [] => Person{name='Max', age=18}
accumulator: [Person{name='Max', age=18}] => Person{name='Peter', age=23}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}] => Person{name='Pamela', age=24}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}] => Person{name='David', age=12}
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Przykład z parallel streams:

 final List<Person> newParallelList = persons
        .parallelStream()
         .collect(
                  () -> {
                   System.out.println("create list");
                      return new LinkedList<>();
                   },
                   result, element) -> {
                      final String threadName = Thread.currentThread().getName();
                      System.out.println(
                                 "[" + threadName + "] accumulator: " + 
                                 result + " + " + element
                       );
                       result.add(element);
                    },
 
                    // combiner is only used in parallel streams
                    (subResult1, subResult2) -> {
                        final String threadName = 
                              Thread.currentThread().getName();
                        System.out.println(
                                "[" + threadName + "] combine: " + 
                                subResult1 + " + " + subResult2
                        );
                          subResult1.addAll(subResult2);
                    }
          );

Wynik:

create list
create list
create list
[main] accumulator: [] + Person{name='Pamela', age=24}
[ForkJoinPool.commonPool-worker-1] accumulator: [] + Person{name='Peter', age=23}
create list
[main] accumulator: [] + Person{name='Max', age=18}
[ForkJoinPool.commonPool-worker-4] accumulator: [] + Person{name='David', age=12}
[ForkJoinPool.commonPool-worker-4] combine: [Person{name='Pamela', age=24}] + [Person{name='David', age=12}]
[main] combine: [Person{name='Max', age=18}] + [Person{name='Peter', age=23}]
[main] combine: [Person{name='Max', age=18}, Person{name='Peter', age=23}] + [Person{name='Pamela', age=24}, Person{name='David', age=12}]
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Czy przetwarzanie równoległe jest zawsze dobre? Otóż nie! Dlaczego? Polecam artykuł http://dzone.com/articles/think-twice-using-java-8. Ogólnie rzecz ujmując chodzi o to, że jeśli pewne zadanie zajmuje zbyt wiele czasu to blokuje ono pozostałe zadania uruchomione równolegle. Co więcej nie ma możliwości na chwilę obecną zdefiniowania puli wątków dla strumieni przetwarzanych równolegle (pula wątków oznacza, że nowe wątki są tworzone aż do uzyskania maksymalnego rozmiaru. Po jego osiągnięciu nowe są tworzone tylko wtedy, gdy jakiś wątek zakończy swoją pracę).

Są dwa wyjścia. Pierwsze to albo upewnić się czy zadania idące równolegle nie zajmują dużo czasu i nie blokują pozostałych. Drugie to czekać i nie korzystać z funkcjonalności parallel stream aż firma Oracle da możliwość definiowania puli wątków dla strumieni przetwarzanych równolegle.

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

Leave a comment

Your email address will not be published.


*


Ta strona wykorzystuje pliki cookie. Używamy informacji zapisanych za pomocą plików cookies w celu zapewnienia maksymalnej wygody w korzystaniu z naszego serwisu. Mogą też korzystać z nich współpracujące z nami firmy badawcze oraz reklamowe. Jeżeli wyrażasz zgodę na zapisywanie informacji zawartej w cookies kliknij na „akceptuję". Jeśli nie wyrażasz zgody, ustawienia dotyczące plików cookies możesz zmienić w swojej przeglądarce. kliknij po więcej informacji

Ta strona wykorzystuje pliki cookie. Używamy informacji zapisanych za pomocą plików cookies w celu zapewnienia maksymalnej wygody w korzystaniu z naszego serwisu. Mogą też korzystać z nich współpracujące z nami firmy badawcze oraz reklamowe. Jeżeli wyrażasz zgodę na zapisywanie informacji zawartej w cookies kliknij na „akceptuję". Jeśli nie wyrażasz zgody, ustawienia dotyczące plików cookies możesz zmienić w swojej przeglądarce.

zamknij