Java 8 – stream & parallel stream & collect

Java 8 – stream & parallel stream & collect

Stream.collect() pozwala tworzyć nowe struktury danych, przetwarzać je oraz umożliwia wyciąganie informacji które są potrzebne. Składnia jest następująca:

collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner)

gdzie trzeci parametr używany jest w parallel streams.

 final List<Person> persons = Arrays.asList(
                new Person("Max",    18),
                new Person("Peter",  23),
                new Person("Pamela", 24),
                new Person("David",  12));
final List<Person> newPersonsList = persons
               .stream()
               .collect(
                       LinkedList::new,
                       (result, element) -> {
                           System.out.println(
                                   "accumulator: " + result + " => " + element
                           );
                           result.add(element);
                       },
 
                       // combiner is only used in parallel streams,
                       (p1, p2) -> {
                           System.out.println(
                                   "combiner: " + p1 + "," + p2
                           );
                       }
               );
 
 System.out.println(newPersonsList);

Wynik:

accumulator: [] => Person{name='Max', age=18}
accumulator: [Person{name='Max', age=18}] => Person{name='Peter', age=23}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}] => Person{name='Pamela', age=24}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}] => Person{name='David', age=12}
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Przykład z parallel streams:

 final List<Person> newParallelList = persons
        .parallelStream()
         .collect(
                  () -> {
                   System.out.println("create list");
                      return new LinkedList<>();
                   },
                   result, element) -> {
                      final String threadName = Thread.currentThread().getName();
                      System.out.println(
                                 "[" + threadName + "] accumulator: " + 
                                 result + " + " + element
                       );
                       result.add(element);
                    },
 
                    // combiner is only used in parallel streams
                    (subResult1, subResult2) -> {
                        final String threadName = 
                              Thread.currentThread().getName();
                        System.out.println(
                                "[" + threadName + "] combine: " + 
                                subResult1 + " + " + subResult2
                        );
                          subResult1.addAll(subResult2);
                    }
          );

Wynik:

create list
create list
create list
[main] accumulator: [] + Person{name='Pamela', age=24}
[ForkJoinPool.commonPool-worker-1] accumulator: [] + Person{name='Peter', age=23}
create list
[main] accumulator: [] + Person{name='Max', age=18}
[ForkJoinPool.commonPool-worker-4] accumulator: [] + Person{name='David', age=12}
[ForkJoinPool.commonPool-worker-4] combine: [Person{name='Pamela', age=24}] + [Person{name='David', age=12}]
[main] combine: [Person{name='Max', age=18}] + [Person{name='Peter', age=23}]
[main] combine: [Person{name='Max', age=18}, Person{name='Peter', age=23}] + [Person{name='Pamela', age=24}, Person{name='David', age=12}]
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Czy przetwarzanie równoległe jest zawsze dobre? Otóż nie! Dlaczego? Polecam artykuł http://dzone.com/articles/think-twice-using-java-8. Ogólnie rzecz ujmując chodzi o to, że jeśli pewne zadanie zajmuje zbyt wiele czasu to blokuje ono pozostałe zadania uruchomione równolegle. Co więcej nie ma możliwości na chwilę obecną zdefiniowania puli wątków dla strumieni przetwarzanych równolegle (pula wątków oznacza, że nowe wątki są tworzone aż do uzyskania maksymalnego rozmiaru. Po jego osiągnięciu nowe są tworzone tylko wtedy, gdy jakiś wątek zakończy swoją pracę).

Są dwa wyjścia. Pierwsze to albo upewnić się czy zadania idące równolegle nie zajmują dużo czasu i nie blokują pozostałych. Drugie to czekać i nie korzystać z funkcjonalności parallel stream aż firma Oracle da możliwość definiowania puli wątków dla strumieni przetwarzanych równolegle.

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

Leave a comment

Your email address will not be published.


*