Monday

[Java8] Stream이란? 본문

언어/Java

[Java8] Stream이란?

뉴비2 2022. 3. 13. 20:57

1. Stream이란?

스트림(stream)이란 자바8 API에 새로 추가된 기능입니다. 스트림을 이용하면 sql 구문처럼 무엇을해야하는지를 컴퓨터에게 알려주면 결과를 받아볼 수 있습니다. 즉, 내부 구현 방법에는 관심을 갖지 않아도 결과를 얻을 수 있습니다. 이런 방식을 선언형 처리 방식이라고 합니다. 또한, 스트림은 복수 개의 연산(sql로 예를 들면, select, from, where 등)들을 이을 수 있다. 이 뿐만 아니라, 스트림을 이용하면 데이터를 투명하게 병렬로 처리할 수 있습니다. 스트림을 1줄로 표현하면 "데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소"라고 할 수 있습니다. 여기서 위 문장의 의미에 대해 더 자세히 살펴보겠습니다.

  • 연속된 요소 : 컬렉션과 마찬가지로 스트림은 특정 요소 형식으로 이루어진 연속된 값 집합의 인터페이스를 제공합니다.
  • 소스 : 스트림은 컬렉션, 배열, I/O 자원 등의 데이터 제공 소스로부터 데이터를 가져올 수 있습니다.
  • 데이터 처리 연산 : 스트림은 데이터베이스와 비슷한 연산을 지원할 수 있습니다.

2. Stream의 특징

스트림은 "파이프라이닝", "내부 반복"이라는 2가지의 큰 특징을 갖고 있습니다. 각 특징에 대해서 더 자세히 살펴보겠습니다.

  • 파이프라이닝
    - 스트림에서 사용되는 대부분 연산은 연산끼리 연결하여 커다란 연산 파이프라인을 구성할 수 있습니다. 또한, 연산이 "중간 연산", "최종 연산"으로 분류되어 최종 연산을 호출 할 때만 계산이 실행되는 게으름(lazy) 최적화가 가능합니다. 뿐만 아니라, 중간 연산 과정에서 불필요한 연산을 최소화 하는 쇼트서킷 최적화도 가능합니다. 이 부분은 글 후반부에 예제를 통해 살펴보겠습니다.
  • 내부 반복
    - 스트림의 연산은 for-each구문같은 반복문의 내용을 내포하고 있습니다. 예를 들어, 스트림 연산 중 하나인 map연산은 stream 각 요소를 순회하며 인자로 전달된 함수를 적용합니다. 이는 병렬성을 stream이 관리한다는 면에서 큰 장점입니다. 즉, 우리가 병렬로 처리를 할 때 단일 스레드로 for-each 구문을 사용할 지, 멀티 스레드로 구현할 지 고민 할 필요가 없다는 것입니다. 

3. Stream 연산

스트림은 "데이터 소스"로 부터 데이터를 얻어 스트림을 열고, "중간 연산"으로 연산을 이으며, "최종 연산"을 통해 스트림을 닫습니다.

예제를 통해 알아보겠습니다.

List<String> names = menu.stream() // ArrayList 형식의 menu 변수에서 stream 얻기
				.filter(dish -> dish.getCalories() > 300) // 중간 연산 - 칼로리300이상 얻기
                .map(Dish::getName) //중간연산 - 요리 이름 얻기
                .limit(3) //중간연산 - 결과 중 3개만 반환받기
                .collect(toList()); // 최종연산 - 결과를 리스트로 반환받기

위 예시를 보면 ArrayList 형식의 menu 변수에서 데이터를 얻어 스트림을 열고, 중간 연산(filter, map, limit)을 사용하여 연산을 연결하였으며, collect 최종 연산을 통해 스트림을 닫아 결과를 받았습니다. 초반부에서 얘기한 것과 같이 스트림은 최종연산을 부를 때 실제 연산이 수행됩니다. 

 

4. Stream 연산 종류

스트림은 말씀드린 것과 같이 크게 "중간 연산", "최종 연산"으로 나뉘어집니다. 하지만 여기서 "중간 연산"은 상태 유무를 기준으로 한 번 더 분리할 수 있습니다. 상태란 연산 과정에서 "내부 요소에 대한 참조 순서", "이전 값", "요소 간 순서" 등 필요한 정보가 존재하냐를 의미합니다. 예를 들어, sorted와 distinct같은 연산은 "스트림의 정렬 순서" 혹은 "중복 제거를 위한 스트림 과거 이력"을 알고 있어야합니다. 이런 연산들은 "상태가 있는 중간 연산"이라고 합니다.

상태가 없는 중간 연산

연산 종류 반환 형식 사용 시기 사용법
filter Stream<T> filter(Predicate<T> 함수) 처럼 사용하며 특정 요소들을 필터링 할 때 사용합니다. filter(isNumber())
map Stream<R> map(Function<T, R> 함수) 형식으로 사용하며, 각 요소들에 대해 변환 작업을 할 때 사용합니다. map(String::tostring)
flatMap Stream<R> flatmap(Function<T, Stream<R> 함수) 형식으로 사용하며, 각 요소를 Stream<R>로 매핑합니다. 이 의미는 각 요소마다 스트림을 생성한다는 것이며, 배열을 각 요소로 분해하는 역할을 합니다. 즉 차원을 1단계 낮추는 역할을 합니다. flatMap(Arrays::stream) - flatMap 연산 이전에 Stream<배열> 형태였다면, flatMap 이후 각 요소로 분해됩니다.
takeWhile Stream<T> takeWhile(Predicate<T> 함수) 형식으로 사용하며, 정렬된 스트림에서 특정 조건을 만족하는 만큼만 스트림을 자를 수 있습니다. filter와 다르게 특정 조건을 불만족하면 더 이상 다음 요소를 확인하지 않습니다. takeWhile(dish -> dish.getCalories() < 320) - 정렬된 dish 스트림에서 칼로리 320미만 요리만 추출
dropWhile Stream<T> dropWhile(Predicate<T> 함수) 형식으로 사용하며, takeWhile과 반대되는 개념입니다. 특정 조건을 만족하는 만큼 스트림을 버립니다. takeWhile(dish -> dish.getCalories() < 320) - 정렬된 dish 스트림에서 칼로리 320미만 요리를 버리고 나머지 요리들만 추출

상태가 있는 중간 연산

연산 종류 반환 형식 사용 시기 사용법
distinct Stream<T> 스트림 내에서 중복 값을 제거한 결과를 받아야 할 때 사용합니다. distinct()
limit Stream<T> 결과의 갯수를 제한할 때 사용합니다. limit(3)
skip Stream<T> 스트림에서 앞 요소 N개를 건너뛰고 결과를 받을 때 사용합니다. skip(3) - 3개 건너뛰고 결과 받음
sorted Stream<T> sorted(Comparator<T> 함수) 형식으로 사용되며, 스트림 요소들을 정렬할 때 사용합니다. sorted(comparing(Dish::getName))

최종 연산

연산 종류 반환 형식 사용 시기 사용법
anyMatch boolean 특정 조건을 만족하는 요소가 있는지 확인할 때 사용합니다. anyMatch(isEvenNumber())
noneMatch boolean 특정 조건을 만족하는 요소가 1개도 없는 지 확인할 때 사용합니다. noneMatch(number -> number>9)
allMatch boolean 특정 조건을 모든 요소가 만족하는 지 확인할 때 사용합니다. allMatch(number -> number<=10)
findAny Optional<T> 특정 조건을 만족하는 요소 아무거나 1개를 반환받습니다. findAny(number -> number>9)
findFirst Optional<T> 특정 조건을 만족하는 요소 중 제일 처음 요소를 반환받습니다. 보통 정렬과 함께 사용하며, 순서가 상관이 없다면 findAny를 사용하는 것이 좋습니다. .sorted(comparint(Dish::getName)
.findFirst()
forEach void 각 요소마다 특정 함수를 수행하고 끝나야 할 때 사용합니다. foreach(n -> System.out.println(n))
collect R 스트림 요소들을 모은 결과를 생성해야 할 때 사용합니다. 주로 스트림 결과를 List로 생성할 때 사용합니다. collect(toList())
reduce Optional<T> 스트림 요소를 모두 합쳐 결과를 만들어야 할 때 사용합니다. 예를 들어, 모든 요소를 더하거나, 곱하거나, 문자열을 이을 때 사용합니다. reduce((a,b) -> a+b))
count long 요소의 갯수를 셀 때 사용합니다. count()

5. collect 연산

collect 최종 연산은 reduce 연산과 비슷하게 스트림 요소들을 하나의 결과로 합치는 연산입니다. collect 연산은 Collector라는 인터페이스(=어떻게 요소들을 하나로 합칠것인가를 정의한 인터페이스)를 사용하여 커스텀한 결과물을 만들 수 있어서 특별한 최종 연산입니다.  collect(Collector 인터페이스 구현체) 형태로 사용됩니다. 참고로 collect와 reduce와 차이를 말하자면, 아래에서 더 살펴보겠지만 중간 결과물이 새로운 리스트를 생성하는 것처럼 가변 컨테이너 관련 작업이면서 병렬성을 확보해야하면 collect 메소드를 사용하는게 더 좋습니다. 

5-1) Collectors 클래스

Collectors 클래스는 자바에서 기본적으로 제공하는 Collector 인터페이스 생성을 도와주는 클래스입니다. 즉, 팩토리 메소드를 제공하여 Collector 인터페이스를 제공합니다. Collectors에서 제공하는 메소드의 기능은 크게 3가지로 나눌 수 있습니다.

  • 스트림 요소를 하나의 값으로 합치며 요약하는 기능
    : 최소값, 최대값, 카운팅, 문자열 연결과 같은 요약 연산입니다. 종합적인 통계 정보를 제공하는 summarizingInt 기능도 있으며, 범용적으로 더 다양한 기능을 구현할 수 있게 reducing() 팩토리 메소드를 이용할 수도 있습니다. 

  • 그룹화 기능
    : SQL의 groupby와 비슷한 기능입니다. 특정 기준으로 그룹핑하거나, 그룹핑을 중첩한 이중 그룹핑과 같이 그룹핑을 반복해서 중첩 그룹핑이 가능합니다. 다음과 같은 상황에 사용합니다.

    1)  간단한 그룹화 : stream().collect(Collectors.groupingby(그룹핑 키 반환 함수))
    2) 그룹화 후 각 그룹마다 연산 적용 : stream.collect(Collectors.groupingby(그룹핑 키 함수, Collector 구현체))
    3) 중첩 그룹핑 : straem.collect(Collectors.groupingby(첫 번째 키 함수, groupingby(두 번째 키 함수, ...))

  • 분할(partition) 기능
    : 스트림을 2개의 그룹으로 나누는 기능입니다. Predicate 함수를 이용하여 참, 거짓 그룹으로 분리합니다. 분할은 다음과 같이 사용할 수 있습니다.

    1) 간단한 분할 : stream.collect(Collectors.partitionBy(분할 기준 참, 거짓 반환 함수))
    2) 분할 후 각 분할마다 연산 적용 : stream.collect(Collectors.partitionBy(분할 기준 참, 거짓 반환 함수, Collector 구현체))

5-2) Collector  인터페이스

Collector 인터페이스는 어떻게 스트림을 리듀싱(하나로 합칠지) 할 지를 정의합니다. Collector 인터페이스에 정의된 메소드를 구현함으로써 커스텀 컬렉터를 개발 할 수 있습니다. Collector 인터페이스는 코드로 다음과 같이 구현되어있습니다. 

/* 파라미터
*  T: 수집될 스트림 항목의 제네릭 형식
*  A: 수집 과정에서 중간 결과를 누적하는 객체의 형식
*  R: 수집 연산 결과 객체의 형식
*/
public interface Collector<T, A, R> {
    // 결과를 담을 컨테이너를 만드는 함수
    Supplier<A> supplier();

    //중간 과정들을 누적하는 함수(ex. 리스트에 요소 추가하기)
    BiConsumer<A, T> accumulator();

    //중간 과정을 누적한 작은 결과 2개를 합치는 함수, 이 함수를 잘 정의해서 리듀싱을 병렬로 처리 가능
    BinaryOperator<A> combiner();

    // 중간 결과를 최종 결과물로 변환하는 함수 
    Function<A, R> finisher();

    //스트림을 병렬로 리듀스 할지, 어떤 최적화를 할 지 힌트를 제공하는 함수
    Set<Characteristics> characteristics();
}

여기서 Characteristics 메서드는 컬렉트 연산을 어떻게 처리할 지 힌트를 제공하는 함수로 다음과 같이 총 3개의 힌트가 존재합니다.

  • UNORDERED : 리듀싱의 결과는 스트림의 요소 순서나 누적 순서에 영향을 받지 않는다.
  • CONCURRENT : 멀티 스레드 환경에서 accumulator를 병렬로 처리할 수 있게 합니다. 단, UNORDERED와 함께 사용하거나, 그렇지 않다면 스트림 요소 참조 순서가 의미 없을 때 사용하면 됩니다.
  • IDENTITY_FINISH : 중간 결과를 단순히 최종 결과로 반환할 때 사용합니다. 즉, 최종 결과를 반환하기 전 중간 결과와 최종 결과의 객체의 차이가 없을 때 사용하면 됩니다.

6. Stream 병렬 처리

스트림은 parallel() 함수를 적용함으로써 병렬 처리를 할 수 있습니다. 내부적으로 ForkJoinPool을 사용하여 동작합니다. 다만, 사용 시에 주의가 많이 필요합니다. 기본적으로 오토박싱과 레이스 컨디션 등 병렬 처리에서 고려해야 할 요소들이 많습니다. 이를 신경쓰지 않고 사용한다면 순차처리보다 더 안좋은 성능을 냄으로 많은 주의가 필요하고 꼭 벤치마크와 같이 성능을 측정하여 사용하시는 것을 추천드립니다.

  • 주의사항 1) 병렬 스트림을 사용 시에는 성능이 직관적이지 않으므로 직접 측청해본다.
  • 주의사항 2) 병렬 스트링을 처리할 때 기본형 특화 스트림 사용 등 적절한 자료구조를 선택해야 병렬 처리 성능이 향상된다.

또한, Stream 병렬 처리는 내부적으로 알아서 처리량을 작은 조각으로 분할하여 처리합니다. 원리는 각 컬렉션 마다  Spliterator 인터페이스가 구현되어 있고, Spliterator는 어떻게 작은 조각으로 분할할지에 대해서 정의하고 있습니다. 이 Spliterator를 커스터마이징 함으로써 자기 자신만의 분할 방법을 만들 수 도 있습니다.

Comments