스트림은 데이터나 이벤트가 들어오는 통로다. 즉, 파이프나 다리와 같다고 생각하면 된다. 앱을 만들다보면 데이터를 처리할 일이 많은데, 어느 타이밍에 데이터가 들어올지 정확히 알기 어렵다. 스트림은 이와 같은 비동기 작업을 할 때 주로 쓰인다. 예를 들어, 네트워크에서 데이터를 받아 UI에 보여주는 상황을 생각해보자. 언제 네트워크에서 데이터를 받을지 알 수 없다. 이런 문제를 스트림은 데이터 생성과 소비하는 곳을 따로둬서 이 문제를 해결한다.
import 'dart:async';
voidmain() {
// 1초마다 데이터 1개를 최대 5개까지 만듦.
Stream stream = Stream.periodic(Duration(seconds: 1), (int x) => x).take(5);
// 이벤트 처리
stream.listen(print);
}
한쪽 끝에 값을 넣고 다른 쪽 끝에 listener가 있으면 listener가 해당 값을 받는다. 스트림에는 여러 개의 listener가 있을 수 있으며, 파이프 라인에 넣으면 모든 listener가 동일한 값을 받는다.
스트림 생성
스트림을 만드는 방법은 다양하다. 즉시 만들거나 혹은 일정 시간마다 만들거나 Future를 사용해 만들 수도 있다.
import 'dart:async';
// 동기적인 iterable 형식을 반환하는 함수Iterable<int> getIterable(int maxNum) sync* {
for (int i = 0; i < maxNum; i++) {
yield i;
}
}
// 비동기적인 future 형식을 반환하는 함수FuturegetData(intvalue) async {
await Future.delayed(Duration(seconds: value));
print('Fetched Data');
return'$value초 대기후의 데이터';
}
voidmain() {
// 일반적인 데이터를 다룰 때.
Stream.fromIterable(<int>[1, 2, 3, 4, 5])
.listen((int x) => print('iterable: $x'));
// 동기적인 Iterable 반환 함수를 이용한 데이터를 다룰 때.
Stream.fromIterable(getIterable(5)).listen((int x) {
print('getIterable: $x');
});
// 스트림 데이터를 생성 후 5초간 초당 데이터를 소비한다.
Stream.periodic(Duration(seconds: 1), (x) => x).take(5).listen((int x) {
print('periodic: $x');
});
// 비동기적인 future 객체를 받아 소비하는 스트림. 3초간 기다린 데이터를 받는다.
Stream.fromFuture(getData(3)).listen((event) {
print('from future: $event');
});
// 비동기적인 future 객체 List를 받아 소비하는 스트림. 5, 7초간 기다린 데이터를 받는다.
Stream.fromFutures(>[getData(5), getData(7)]).listen((event) {
print('from futures: $event');
});
}
/* 결과
iterable: 1
getIterable: 0
iterable: 2
getIterable: 1
iterable: 3
getIterable: 2
iterable: 4
getIterable: 3
iterable: 5
getIterable: 4
periodic: 0
periodic: 1
periodic: 2
Fetched Data
from future: 3초 대기후의 데이터
periodic: 3
periodic: 4
Fetched Data
from futures: 5초 대기후의 데이터
Fetched Data
from futures: 7초 대기후의 데이터
*/
스트림 데이터 다루기
스트림 데이터 다루는 방법은 다양한다. 다음의 코드는 스트림 데이터의 첫 번째, 마지막 데이터 그리고 스트림의 길이를 가져오는 코드이다.
import 'dart:async';
voidmain() {
Stream<int> stream = Stream.fromIterable([1, 2, 3, 4, 5]);
// 첫 번째 데이터를 가져온다.
stream.first.then((value) => print('stream.first: $value'));
// Stream은 기본적으로 하나의 listen만 가능하다. 그래서 아래처럼 stream을 갱신했다.
stream = Stream.fromIterable([1, 2, 3, 4, 5]);
// 마지막 데이터를 가져온다.
stream.last.then((value) => print('stream.last: $value'));
stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7]);
// 스트림의 전체 길이
stream.length.then((value) => print('stream.length: $value'));
}
/* 결과
stream.first: 1
stream.last: 5
stream.length: 7
*/
스트림은 기본적으로 싱글 서브스크립션(Single Subscription)이다. 싱글 서브스크립션은 단 한곳에서만 listen 할 수 있다. 여러곳에서 listen 을 하려면 Broadcast 로 변경해줘야 한다.
listen (구독) 한 후 : 약한 연결, 점선 화살표, StreamSubscription과 EventSource가 연결된다.
위의 예제들을 보면 listen() 에서 이벤트 처리를 한 것 같았지만, 실제로는 StreamSubscription에서 이벤트 콜백을 다룬다. 새로운 이벤트가 생기거나 에러가 생기면, StreamSubscription에서 이를 처리한다. 뿐만 아니라 StreamSubscription은 이벤트 소스와의 연결도 끊을 수 있다.
StreamSubscriptoin은 이벤트 처리를 콜백을 정하여 실행한다.
listen 콜백 함수는 스트림 유형 중 하나인 StreamSubscription (구독 신청)을 반환한다. 이것은 스트림 구독을 관리하는 데 사용할 수 있다. 구독의 가장 일반적인 사용법은 더 이상 데이터를 수신할 필요가 없을 때 청취를 취소하는 것이다. 그리고 스트림 구독은 앱이 종료되기 전까지 항상 열려있으므로 메모리 누수가 없는지 확인해야 한다. 아래의 코드는 등록 된 콜백에 대한 구독 객체를 생성해서 관리하기 위해 StreamSubscription으로 받아주고 있다.
import 'dart:async';
voidmain() {
Stream<int> stream =
new Stream.periodic(new Duration(milliseconds: 200), (x) => x);
StreamSubscription streamSubscription = stream.listen(null);
// onData : 데이터를 하나씩 처리하는 이벤트
streamSubscription.onData((data) {
print('data: $data');
if (data == 5) {
streamSubscription.cancel();
}
});
// onError : 에러가 발생했을 때 처리하는 이벤트
streamSubscription.onError((err) => print('에러: $err'));
// onDone : 스트림의 끝에 도달했을 때(더 이상 데이터가 없을 때) 처리하는 이벤트
streamSubscription.onDone(() => print('스트림 작업 완료'));
}
/* 결과
data: 0
data: 1
data: 2
data: 3
data: 4
data: 5
*/
기본적으로 만들어지는 스트림은 한 곳에서만 listen 할 수 있다. 그래서 다음의 코드를 실행해보면 "Bad state: Stream has already been listened to." 라는 에러가 발생한다.
import 'dart:async';
voidmain() {
Stream<int> stream =
new Stream.periodic(new Duration(milliseconds: 200), (x) => x).take(5);
stream.listen((event) => print(event));
stream.listen((event) => print(event));
}
/* 결과 - 에러 발생
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:493:9)
...
*/
Flutter에서 Stream을 다룰 때, 가장 흔히 볼 수 있는 것은 "Stream has already been..." 에러 메시지다. 또 다른 예를 보도록하자.
import 'dart:async';
voidmain() {
StreamController streamController = new StreamController();
Stream stream = streamController.stream;
stream.listen((event) => print('1st stream: $event'));
stream.listen((event) => print('2nd stream: $event'));
streamController.sink.add('Hello');
streamController.sink.add('World');
streamController.close();
}
/* 결과 - 에러 발생
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
...
*/
위의 코드가 실행되면 마찬가지로 "Bad state..." 스트림 에러가 발생한다. 그 이유는 위에서 설명했듯이 기본적으로 만들어지는 스트림은 한 곳에서만 listen할 수 있기 때문이다. 그래서 두 가지 유형의 스트림이 존재한다.
단일 구독 스트림 (Single) : 파일이나 웹 요청을 읽는 것과 같은 유형에 사용된다. 먼저 구독한 구독자가 올바른 순서로 올바른 정보를 모두 얻도록하려면 스트림이 존재하는 수명 주기 동안 한 번만 구독할 수 있는 제한이 필요하다. 이럴때 사용한다.
방송 스트림 (Broadcast) : 이 유형의 스트림은 여러 곳에서 listen 할 수 있다.
StreamController streamController = new StreamController.broadcast();
위의 코드처럼 broadcast로 선언해주면 에러가 발생하지 않는다. 그리고 만약, Stream을 직접 구현하고 사용중에 있다면, 구독 중 Stream에 변경 사항 발생 시 스트림을 정리(cancel)하고 다시 연결해야 한다. 그렇지 않고 여러 번 구독하면 메모리 누수가 발생한다.