Java 8 – stream & parallel stream & collect


Java 8 – stream & parallel stream & collect

Stream.collect() pozwala tworzyć nowe struktury danych, przetwarzać je oraz umożliwia wyciąganie informacji które są potrzebne. Składnia jest następująca:

collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner)

gdzie trzeci parametr używany jest w parallel streams.

 final List<Person> persons = Arrays.asList(
                new Person("Max",    18),
                new Person("Peter",  23),
                new Person("Pamela", 24),
                new Person("David",  12));
final List<Person> newPersonsList = persons
               .stream()
               .collect(
                       LinkedList::new,
                       (result, element) -> {
                           System.out.println(
                                   "accumulator: " + result + " => " + element
                           );
                           result.add(element);
                       },

                       // combiner is only used in parallel streams,
                       (p1, p2) -> {
                           System.out.println(
                                   "combiner: " + p1 + "," + p2
                           );
                       }
               );

 System.out.println(newPersonsList);

Wynik:

accumulator: [] => Person{name='Max', age=18}
accumulator: [Person{name='Max', age=18}] => Person{name='Peter', age=23}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}] => Person{name='Pamela', age=24}
accumulator: [Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}] => Person{name='David', age=12}
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Przykład z parallel streams:

 final List<Person> newParallelList = persons
        .parallelStream()
         .collect(
                  () -> {
                   System.out.println("create list");
                      return new LinkedList<>();
                   },
                   result, element) -> {
                      final String threadName = Thread.currentThread().getName();
                      System.out.println(
                                 "[" + threadName + "] accumulator: " + 
                                 result + " + " + element
                       );
                       result.add(element);
                    },

                    // combiner is only used in parallel streams
                    (subResult1, subResult2) -> {
                        final String threadName = 
                              Thread.currentThread().getName();
                        System.out.println(
                                "[" + threadName + "] combine: " + 
                                subResult1 + " + " + subResult2
                        );
                          subResult1.addAll(subResult2);
                    }
          );

Wynik:

create list
create list
create list
[main] accumulator: [] + Person{name='Pamela', age=24}
[ForkJoinPool.commonPool-worker-1] accumulator: [] + Person{name='Peter', age=23}
create list
[main] accumulator: [] + Person{name='Max', age=18}
[ForkJoinPool.commonPool-worker-4] accumulator: [] + Person{name='David', age=12}
[ForkJoinPool.commonPool-worker-4] combine: [Person{name='Pamela', age=24}] + [Person{name='David', age=12}]
[main] combine: [Person{name='Max', age=18}] + [Person{name='Peter', age=23}]
[main] combine: [Person{name='Max', age=18}, Person{name='Peter', age=23}] + [Person{name='Pamela', age=24}, Person{name='David', age=12}]
[Person{name='Max', age=18}, Person{name='Peter', age=23}, Person{name='Pamela', age=24}, Person{name='David', age=12}]

Czy przetwarzanie równoległe jest zawsze dobre? Otóż nie! Dlaczego? Polecam artykuł https://dzone.com/articles/think-twice-using-java-8. Ogólnie rzecz ujmując chodzi o to, że jeśli pewne zadanie zajmuje zbyt wiele czasu to blokuje ono pozostałe zadania uruchomione równolegle. Co więcej nie ma możliwości na chwilę obecną zdefiniowania puli wątków dla strumieni przetwarzanych równolegle (pula wątków oznacza, że nowe wątki są tworzone aż do uzyskania maksymalnego rozmiaru. Po jego osiągnięciu nowe są tworzone tylko wtedy, gdy jakiś wątek zakończy swoją pracę).

Są dwa wyjścia. Pierwsze to albo upewnić się czy zadania idące równolegle nie zajmują dużo czasu i nie blokują pozostałych. Drugie to czekać i nie korzystać z funkcjonalności parallel stream aż firma Oracle da możliwość definiowania puli wątków dla strumieni przetwarzanych równolegle.

Kod źródłowy do wglądu na GitHub!

Jeśli chcesz uzyskać dostęp do GitHuba na 30 dni i pobrać kod źródłowy wyślij smsa o treśći DOSTEP.EDUSESSION na numer 7943. Tyle wiedzy a koszt to tylko 9 PLN (11.07 PLN z VAT).





Leave a comment

Your email address will not be published.


*