Java8 Stream API

 

개요

Java8에서 도입된 Stream API에 대해 알아보자. Stream API는 lambda식, Optional등과 함께 반드시 알아야 할 Java8의 기능이다.

Stream생성

Stream을 만드는 방법은 다양하다. 한 가지 중요한 특징은 stream을 만들어서 연산을 적용하더라도 원본 데이터는 변하지 않는다는 점이다. 따라서 하나의 원본 데이터에서 여러 개의 stream을 만들어서 사용해도 무방하다.

 

빈 Stream 생성

Stream<String> emtpyStream = Stream.empty();
 

Collection으로부터 생성

List나 Set과 같은 Collection으로부터 Stream을 만들 수 있다.

Collection<String> list = Arrays.asList("s1", "s2", "s3");
Stream<String> streamFromList = list.stream();

Set<String> set = new HashSet<>(Arrays.asList("a", "b", "b"));
Stream<String> streamFromSet = set.stream();
 

배열로부터 생성

String[] arr = new String[]{"s1", "s2", "s3"};
Stream<String> stream = Arrays.stream(arr);
 

builder를 사용하여 생성

Stream<String> stream =
  Stream.<String>builder().add("s1").add("s2").add("s3").build();
 

Stream.generate()를 사용하여 무한 스트림 생성

generate()메소드는 Supplier<T>를 인자로 받아들여서 무한의 stream을 반환한다.

무한하기 때문에 실제로 사용하기 위해서는 limit()으로 제한해야 한다.

Stream<String> stream =
  Stream.generate(() -> "sameText").limit(10);
 

Stream.iterate()로 무한 스트림 생성

Stream의 iterate메소드를 사용하면 첫 번째 인자와 두 번째 인자의 람다식을 반복 적용하여 무한의 스트림을 얻을 수 있다.

Stream<Integer> stream = Stream.iterate(1, n -> n * 2).limit(5);
 

위 예에서는 첫 번째 값은 1 그 다음 값은 2, 4, 8, 16, 32.... 이 된다.

1: 1 * 2-> 2

2: 2 * 2 -> 4

4: 4 * 2 -> 8

8: 8 * 2 -> 16

16: 16 * 2 -> 32

프리미티브 타입의 Stream

프리미티브 타입으로 구성된 Stream을 위해 IntStream, LongStream, DoubleStream이 존재한다.

IntStream intStream = IntStream.range(1, 5);
LongStream longStream = LongStream.rangeClosed(1, 5);
 

위 예에서 range()메서드는 1,2,3,4가 되고 rangeClosed()메서드는 1L,2L,3L,4L,5L이 된다.

한편 다음과 같이 Random한 정수값의 스트림을 만드는 것도 가능하다.

Random random = new Random();        
IntStream ints = random.ints(0, 5).limit(5);
ints.forEach(System.out::println);
 

결과는 다음과 같다. 랜덤한 정수가 5개 출력된다.

4
2
4
0
1
 

문자열을 정규식으로 분리하여 Stream만들기

java.util.regex.Pattern의 splitAsStream을 사용하면 문자열을 정규식 패턴으로 분리하여 Stream을 만들 수 있다.

Pattern pattern = Pattern.compile(".@");
Stream<String> stream = pattern.splitAsStream("aa@aaa@aaaa@");
stream.forEach(s -> System.out.println(s));
 

결과는 다음과 같다.

a
aa
aaa
 

파일을 읽어서 줄 단위로 Stream 만들기

파일을 읽어서 줄 단위로 분리하여 stream을 만들 수 있다.

Path path = Paths.get("C:\\input.txt");
Stream<String> stream =
        Files.lines(path, Charset.forName("UTF-8"));
stream.forEach(s -> System.out.println(s));
 

Stream의 재사용은 불가능

Stream에 대해 Terminal Operation을 수행한 뒤에는 해당 Stream을 사용할 수 없다.

Stream에 대한 Operation은 크게 Intermediate와 Terminal이 있다.

두 오퍼레이션을 조합하여 Stream pipeline을 만들게 되는데 Terminal Operation이 항상 마지막 Operation이 되어야 한다.

 

다음 코드에서 findAny()와 findFirst()는 Terminal Operation이다.

Stream<String> stream =
        Arrays.asList("s1", "s2", "s3").stream();
Optional<String> anyElement = stream.findAny();
Optional<String> firstElement = stream.findFirst(); // <- Exception
 

Terminal Operation 뒤에 다시 해당 stream을 다시 사용하려 했기 때문에 위 코드를 실행하면 다음과 같이 IllegalStateException이 발생한다.

Exception in thread "main" java.lang.IllegalStateException: 
stream has already been operated upon or closed
 

위 코드는 다음과 같이 바꿔야 한다.

List<String> list = Arrays.asList("s1", "s2", "s3");
Optional<String> anyElement = list.stream().findAny();
Optional<String> firstElement = list.stream().findFirst();
 

Stream Pipeline

보통 Stream에는 원천(source), 중간 연산(intermediate operation), 종료 연산(terminal operation)으로 구성된 데이터 변형 pipeline을 적용한다.

 

중간 연산(intermediate operation)은 데이터가 변형된 새로운 stream을 반환한다.

이러한 중간 연산은 다음과 같이 chain형식으로 이어서 기술하는 것이 보편적이다.

List<String> list = Arrays.asList("s1", "s2", "s3");

Stream<String> stream = list.stream()
                            .map(s -> s + "_postfix")
                            .map(s -> "prefix_" + s);

stream.forEach(System.out::println);
 

출력결과는 다음과 같다.

prefix_s1_postfix
prefix_s2_postfix
prefix_s3_postfix
 

이런 식으로 데이터를 변환하다가 최종 결과로 변환하는 것이 종료 연산(terminal operation)이다. 다시 강조하지만 하나의 stream에 대해서는 한번만 종료 연산을 수행할 수 있다. 그리고 종료 연산은 반드시 마지막에 적용해야 하고 이후에는 그 어떤 연산도 적용해서는 안 된다. 다음 코드는 collect라는 종료 연산으로 각 string을 전부 이어붙인 결과를 반환하고 있다.

List<String> list = Arrays.asList("s1", "s2", "s3");
String joined = list.stream()
                     .map(s -> s + "_postfix")
                     .map(s -> "prefix_" + s)
                     .collect(Collectors.joining());
System.out.println(joined);
 

결과는 다음과 같다.

prefix_s1_postfixprefix_s2_postfixprefix_s3_postfix
 

게으른 처리

중간 연산은 게으르게 처리된다. 즉, 실제로 수행되어야 하는 마지막 순간까지는 실행되지 않는다. 마지막 순간에 필요한 최소한의 연산만 수행하기 때문에 이러한 처리를 게으른 처리라고 한다. 만약에 stream에 대한 종료 연산이 호출되지 않는다면 중간 연산은 호출되지 않는다.

 

다음 예를 보자.

List<String> list = Arrays.asList("s1", "s2", "s3");
list.stream()
    .map(s -> {
        System.out.println("called");
        return s + "_postfix";
    })
    .findFirst();
 

최종 연산인 findFirst()는 첫 요소만을 필요로 하기 때문에 map()함수는 세 번 호출되는 것이 아니라 한번만 호출된다.

적용 순서의 중요성

Stream에 중간 연산을 적용할 때는 그 순서가 중요하다. 비록 게으르게 처리가 된다고 해도 쓸데없는 연산이 발생하지 않도록 적용 순서를 현명하게 적용해야 한다.

List<String> list = Arrays.asList("s1", "s2", "a1", "a2");
String joined = list.stream()
                    .map(s -> s + "_postfix")
                    .filter(s -> s.startsWith("s"))
                    .collect(Collectors.joining());
System.out.println(joined);
 

위 코드를 보면 map을 먼저하고 filter를 하고 있다.

하지만 잘 생각해보면 filter를 먼저 했으면 map이 2번만 호출되어도 되었을 것이다. 위 예에서는 사소한 차이지만 작성하는 코드에 따라서는 커다란 성능차이가 발생할 수 있다. 따라서 stream 파이프라인을 구성할 때는 가급적 filter(), skip(), distinct()같은 것을 먼저 적용하여 연산 횟수를 줄이는 것이 좋다.

reduce() 함수

max(), min()등과 같이 Stream에 적용할 수 있는 다양한 종료 연산이 있지만 reduce()나 collect()를 사용하면

보다 풍부한 로직을 적용할 수 있다. 먼저 reduce()함수의 사용법에 대해 알아보자. reduce()함수를 사용하는 방법은 크게 세 가지가 있다.

 

reduce(accumulator함수)

OptionalInt reducedInt = IntStream.of(1, 2, 3, 4, 5).reduce((x, y) -> x + y);
System.out.println(reducedInt.getAsInt());
 

위 예에서는 reduce()에 람다 함수만을 인자로 넘겨주고 있다. 이는 1,2,3,4,5로 구성된 stream에 대해 2개씩 점진적으로 적용되어 마지막 하나의 값이 도출된다.

  • 1,2 -> 3
  • 3,3 -> 6
  • 6,4 -> 10
  • 10,5 -> 15

reduce(초기값, accumulator함수)

int reducedInt = IntStream.of(1, 2, 3, 4, 5).reduce(10, (x, y) -> x +y);
System.out.println(reducedInt);
 

위 예에서는 reduce()에 초기값과 람다함수를 넘겨주고 있다. 그러면 초기값과 stream의 첫 번째 값이 람다식에 적용되고 이어서 그 결과값과 2번째 요소가 람다식에 적용되는 것이 반복되어 마지막 하나의 값이 도출되게 된다.

10(초기값),1(첫 번째 요소) -> 11

11, 2(두 번째 요소) -> 13

13, 3(세 번째 요소) -> 16

16, 4(네 번째 요소) -> 20

20, 5(다섯 번째 요소) -> 25 (최종 결과)

reduce(초기값, accumulator함수, combiner함수)

combiner는 parallelStream을 사용할 때에만 유효하다. 이 경우 위 두 경우와는 전혀 다른 방식으로 reduce함수가 동작하니 주의가 필요하다.

int reducedParallel = Arrays.asList(1, 2, 3)
                            .parallelStream()
                            .reduce(1, (x,y) -> (x+y), 
                                       (x,y) -> (x+y));
System.out.println(reducedParallel);
 

위 코드에서 1은 초기값, 두 번째 인자는 accumulator함수, 세 번째 인자는 combiner함수다.

먼저 스트림의 각 요소들에 대해 각각 초기값과 accumulator함수가 적용된다. 그러면, 1,2,3 이 2,3,4가 된다.

그리고 이 2,3,4에 대해 combiner함수가 적용되어 2+3=5 -> 5+4=9가 된다.

collect()함수

이번에는 사실상 가장 많이 사용되는 종료연산인 collect()함수에 대해 알아보자. collect()함수는 Collectors타입의 인자를 받아들인다. 대표적인 사용예는 Stream을 Collection으로 변환하는 것이다.

List<Integer> list = Arrays.asList(1, 2, 3);
Stream<Integer> steam = list.stream().map(x -> x + 1);
List<Integer> convertedList  = stream.collect(Collectors.toList());
 

위 예에서는 원래의 List를 stream으로 변환하여 처리한 뒤 다시 List로 변환하고 있다. 이때 다음과 같이 Set으로 변환할 수도 있다.

Set<Integer> convertedSet  = stream.collect(Collectors.toSet());
 

객체의 stream 다루기

이번에는 우리가 정의한 class의 객체들로 구성된 stream을 다루는 방법에 대해 알아보도록 하자.

먼저 다음과 같이 Book이란 class를 정의한다. (getter, setter, constructor는 생략하였다)

public class Book {
    String author;
    Integer price;
    String title;
}
 

먼저 이 Book의 객체로 구성된 Collection을 만든다.

Book book1 = new Book("author1", 3000, "title1");
Book book2 = new Book("author2", 6000, "title2");
Book book3 = new Book("author3", 3000, "title3");

List<Book> books = Arrays.asList(book1, book2, book3);
 

다음과 같이 Author필드로만 구성된 List를 얻을 수 있다. map으로 변환한 후 collect로 List로 변환하고 있다.

List<String> authors = books.stream()
                            .map(Book::getAuthor)
                            .collect(Collectors.toList());
 

또한 다음과 같이 title필드를 하나의 String으로 합친 결과를 얻을 수도 있다.

String titleJoined = books.stream()
                          .map(Book::getTitle)
                          .collect(Collectors.joining(",", "[", "]"));
 

위 코드를 출력해보면 다음과 같다.

[title1,title2,title3]
 

한편 정수 필드인 price의 평균이나 합을 구하는 Collectors함수도 있다.

double averagePrice = books.stream()
        .collect(Collectors.averagingInt(Book::getPrice)); // 4000.0

int totalPrice = books.stream()
        .collect(Collectors.summingInt(Book::getPrice)); // 12000
 

합이나 평균뿐만 아니라 기본적인 통계데이터를 담은 객체를 반환해주는 summarizingInt라는 함수도 있다.

IntSummaryStatistics statistics = books.stream()
        .collect(Collectors.summarizingInt(Book::getPrice));
 
IntSummaryStatistics{count=3, sum=12000, min=3000, average=4000.000000, max=6000}
 

Stream에 대해 Grouping하기

Stream의 요소들을 특정 기준으로 그룹을 만들어 Map으로 반환할 수 있다.

다음 예에서는 Book의 Price를 기준으로 그룹핑한 Map을 반환하고 있다.

Map<Integer, List<Book>> groupedMap = books.stream() 
                                           .collect(Collectors.groupingBy(Book::getPrice));

groupedMap.forEach((Integer price, List<Book> bookList) -> {
    System.out.println("# price group: " + price);
    bookList.forEach(book -> {
        System.out.println(book.getTitle());
    });
});
 
# price group: 6000
title2
# price group: 3000
title1
title3
 

predicate으로 그룹핑하기

partitioningBy를 사용하면 predicate, 즉 boolean을 반환하는 함수를 기준으로 그룹을 지정할 수도 있다.

Map<Boolean, List<Book>> groupedMap = books.stream()
        .collect(Collectors.partitioningBy(book -> book.getPrice() > 4000));

groupedMap.forEach((Boolean overPrice, List<Book> bookList) -> {
    System.out.println("# Over Price: " + overPrice);
    bookList.forEach(book -> {
        System.out.println(book.getTitle());
    });
});
 

나만의 Collector 만들기

Java에서는 이미 충분히 다양한 Collector를 제공하고 있지만 필요에 의해 나만의 Collector를 만들어야 할 경우에는 Collector.of를 사용하도록 한다.

Collector<Book, ?, List<Book>> myCollector =
        Collector.of(ArrayList::new, List::add,
                (list, item) -> {
                    list.addAll(item);
                    return list;
                });

List<Book> converted = books.stream().collect(myCollector);
converted.forEach(System.out::println);
 

위 예는 stream의 요소들을 ArrayList에 담아서 반환해주고 있다.

병렬 스트림

병렬 처리를 다루기 위해서는 ExecutorService나 ForkJoin등을 알아야 한다. Java8에서는 parallel stream을 통해 함수형 스타일로 병렬 처리를 기술할 수 있게 되었다.

boolean overPrice = books.parallelStream()
                         .map(book -> book.getPrice() * 100)
                         .anyMatch(price -> price > 400000);
 

그러면, Stream API는 ForkJoin framework를 사용하여 연산을 병렬로 수행한다. 이때 common thread pool 이 사용되며 custom thread pool을 사용하도록 설정할 수 없다. 그럴려면 parallel collectors를 구현해야 한다.

 

parallel stream을 사용할 때는 disk IO나 network IO가 포함되지 않은 순수 연산을 수행해야 한다. 그리고 각 요소별로 소요되는 시간이 비슷하지 않으면 오래 걸리는 한 요소가 끝날 때까지 다음 작업이 진행되지 않으므로 주의해야 한다.

 

parallel stream은 각 요소별로 병렬로 처리된다. 이를 확인하는 코드는 다음과 같다.

boolean overPrice = books.parallelStream()
                    .map(book -> {
                        System.out.println(book.getTitle() + " map() started on " + Thread.currentThread().getName());
                        return book.getPrice() * 100;
                    })
                    .anyMatch(price -> {
                        System.out.println(price + " anyMatch() started on " + Thread.currentThread().getName());
                        return price > 400000;
                    });
 

출력결과는 다음과 같다.

title2 map() started on main
title3 map() started on ForkJoinPool.commonPool-worker-2
600000 anyMatch() started on main
title1 map() started on ForkJoinPool.commonPool-worker-9
300000 anyMatch() started on ForkJoinPool.commonPool-worker-2
300000 anyMatch() started on ForkJoinPool.commonPool-worker-9
 

마무리

Stream API를 사용하면 요소들에 대한 처리를 효과적으로 기술하여 보다 가독성있는 코드를 작성할 수 있게 된다. 파이프라인으로 데이터 처리를 기술하는 것은 리눅스를 비롯해 빅데이터 처리 플랫폼에 이르기까지 다양한 기술에서 사용된다. Stream API를 자유자재로 다룰 수 있으면 Java 활용 능력과 생산성이 무척 높아질 것이다.

 

 

 

 

댓글

이 블로그의 인기 게시물

Java의 Matcher를 활용한 정규식 검색 및 추출

Java Gradle Project에서 Unit Test코드 작성하기