在 Flutter 中有两种处理异步操作的方式 Future
和 Stream
用来处理连续的异步操作。比如往水杯倒水,将一个水杯倒满为一个 Future
,连续的将多个水杯倒满就是 Stream
Stream 详解
| abstract class Stream<T> { Stream(); }
它们的区别在于同步流会在执行 add
或 close
方法时立即向流的监听器 StreamSubscription
带有控制流方法的流。 可以向它的流发送数据,错误和完成事件,也可以检查数据流是否已暂停,是否有监听器。sync
| abstract class StreamController<T> implements StreamSink<T> { Stream<T> get stream; }
StreamController _streamController = StreamController( onCancel: () {}, onListen: () {}, onPause: () {}, onResume: () {}, sync: false, );
流事件的入口。提供 add
| abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { Future close(); Future get done; }
流的监听器。提供 cacenl
, resume
| abstract class StreamSubscription<T> { }
StreamSubscription subscription = StreamController().stream.listen(print); subscription.onDone(() => print('done'));
使用流数据渲染 UI 界面的部件。
| StreamBuilder( stream: stream, initialData: 'loading...', builder: (context, AsyncSnapshot snapshot) { if (snapshot.hasData) { Map data = snapshot.data; return Text(data), } return CircularProgressIndicator(); }, )
创建 Stream
在 Dart 有几种方式创建 Stream
- 从现有的生成一个新的流
,使用 map
| Stream<int> intStream = StreamController<int>().stream;
Stream<int> evenStream = intStream.where((int n) => n.isEven);
Stream<int> doubleStream = intStream.map((int n) => n * 2);
Stream<int> biggerStream = intStream.takeWhile((int n) => n > 10);
- 使用
| Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; } }
Stream stream = countStream(10); stream.listen(print);
- 使用
| StreamController<Map> _streamController = StreamController( onCancel: () {}, onListen: () {}, onPause: () {}, onResume: () {}, sync: false, );
Stream _stream = _streamController.stream;
- 使用
| Future<int> _delay(int seconds) async { await Future.delayed(Duration(seconds: seconds)); return seconds; }
List<Future> futures = []; for (int i = 0; i < 10; i++) { futures.add(_delay(3)); }
Stream _futuresStream = Stream.fromFutures(futures);
应用 Stream
Stream Counter
把 Flutter 的默认项目改用 Stream
| import 'dart:async'; import 'package:flutter/material.dart';
class StreamCounter extends StatefulWidget { @override _StreamCounterState createState() => _StreamCounterState(); }
class _StreamCounterState extends State<StreamCounter> { StreamController<int> _counterStreamController = StreamController<int>( onCancel: () { print('cancel'); }, onListen: () { print('listen'); }, );
int _counter = 0; Stream _counterStream; StreamSink _counterSink;
void _incrementCounter() { if (_counter > 9) { _counterSink.close(); return; } _counter++; _counterSink.add(_counter); }
void _closeStream() { _counterStreamController.close(); }
@override void initState() { super.initState(); _counterSink = _counterStreamController.sink; _counterStream = _counterStreamController.stream; }
@override void dispose() { super.dispose(); _counterSink.close(); _counterStreamController.close(); }
@override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text('Stream Counter'), ), body: Center( child: Column( mainAxisAlignment: MainAxisAlignment.center, children: <Widget>[ Text('You have pushed the button this many times:'), StreamBuilder<int>( stream: _counterStream, initialData: _counter, builder: (context, snapshot) { if (snapshot.connectionState == ConnectionState.done) { return Text( 'Done', style: Theme.of(context).textTheme.bodyText2, ); }
int number = snapshot.data; return Text( '$number', style: Theme.of(context).textTheme.bodyText2, ); }, ), ], ), ), floatingActionButton: Row( mainAxisAlignment: MainAxisAlignment.center, children: <Widget>[ FloatingActionButton( onPressed: _incrementCounter, tooltip: 'Increment', child: Icon(Icons.add), ), SizedBox(width: 24.0), FloatingActionButton( onPressed: _closeStream, tooltip: 'Close', child: Icon(Icons.close), ), ], ), ); } }
NetWork Status
监听手机的网络链接状态,首先添加 connectivity
| dependencies: connectivity: ^0.4.8+2
| import 'dart:async'; import 'package:connectivity/connectivity.dart'; import 'package:flutter/material.dart';
class NetWorkStatus extends StatefulWidget { @override _NetWorkStatusState createState() => _NetWorkStatusState(); }
class _NetWorkStatusState extends State<NetWorkStatus> { StreamController<ConnectivityResult> _streamController = StreamController(); StreamSink _streamSink; Stream _stream; String _result;
void _checkStatus() async { final ConnectivityResult result = await Connectivity().checkConnectivity();
if (result == ConnectivityResult.mobile) { _result = 'mobile'; } else if (result == ConnectivityResult.wifi) { _result = 'wifi'; } else if (result == ConnectivityResult.none) { _result = 'none'; }
setState(() {}); }
@override void initState() { super.initState(); _stream = _streamController.stream; _streamSink = _streamController.sink; _checkStatus(); Connectivity().onConnectivityChanged.listen( (ConnectivityResult result) { _streamSink.add(result); }, ); }
@override dispose() { super.dispose(); _streamSink.close(); _streamController.close(); }
@override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text('Network Status'), ), body: Center( child: StreamBuilder<ConnectivityResult>( stream: _stream, builder: (context, AsyncSnapshot snapshot) { if (snapshot.hasData) { if (snapshot.data == ConnectivityResult.mobile) { _result = 'mobile'; } else if (snapshot.data == ConnectivityResult.wifi) { _result = 'wifi'; } else if (snapshot.data == ConnectivityResult.none) { return Text('还没有链接网络'); } }
if (_result == null) { return CircularProgressIndicator(); }
return ResultText(_result); }, ), ), ); } }
class ResultText extends StatelessWidget { final String result;
const ResultText(this.result);
@override Widget build(BuildContext context) { return RichText( text: TextSpan( style: TextStyle(color: Colors.black), text: '正在使用', children: [ TextSpan( text: ' $result ', style: TextStyle( color: Colors.red, fontSize: 20.0, fontWeight: FontWeight.bold, ), ), TextSpan(text: '链接网络'), ], ), ); } }
Random Article
| dependencies: dio: ^3.0.9 flutter_html: ^0.11.1
| import 'dart:async';
import 'package:dio/dio.dart'; import 'package:flutter/material.dart'; import 'package:flutter_html/flutter_html.dart';
class RandomArticle extends StatefulWidget { @override _RandomArticleState createState() => _RandomArticleState(); }
class _RandomArticleState extends State<RandomArticle> { static Dio _dio = Dio( BaseOptions(baseUrl: 'https://interface.meiriyiwen.com'), );
static Future<Map> _getArticle() async { Response response = await _dio.get( '/article/random', queryParameters: {"dev": 1}, );
final data = response.data['data']; return data; }
Stream<Map> _futuresStream;
@override void initState() { List<Future<Map>> futures = []; for (int i = 0; i < 10; i++) { futures.add(_getArticle()); }
_futuresStream = Stream<Map>.fromFutures(futures); super.initState(); }
@override Widget build(BuildContext context) { return Scaffold( appBar: AppBar(title: Text('Random Article')), body: SingleChildScrollView( child: Center( child: StreamBuilder<Map>( stream: _futuresStream, builder: (context, AsyncSnapshot snapshot) { if (snapshot.hasData) { Map article = snapshot.data;
return Container( child: Column( children: <Widget>[ SizedBox(height: 24.0), Text( article['title'], style: TextStyle(fontSize: 24.0), ), Padding( padding: const EdgeInsets.only( top: 12.0, left: 12.0, right: 12.0, bottom: 60.0, ), child: Html( data: article['content'], ), ), ], ), ); } return CircularProgressIndicator(); }, ), ), ), ); } }
Broadcast Stream
| import 'dart:async';
import 'package:flutter/material.dart';
class BroadcastStream extends StatefulWidget { @override _BroadcastStreamState createState() => _BroadcastStreamState(); }
class _BroadcastStreamState extends State<BroadcastStream> { StreamController<int> _streamController = StreamController<int>.broadcast(); StreamSubscription _subscription1; StreamSubscription _subscription2; StreamSubscription _subscription3;
int _count = 0; int _s1 = 0; int _s2 = 0; int _s3 = 0;
@override void initState() { _subscription1 = _streamController.stream.listen((n) { setState(() { _s1 += 1; }); });
_subscription2 = _streamController.stream.listen((n) { setState(() { _s2 += 2; }); });
_subscription3 = _streamController.stream.listen((n) { setState(() { _s3 -= 1; }); });
super.initState(); }
void _add() { if (_count > 10) { _subscription1.cancel(); } _count++; _streamController.add(_count); }
@override void dispose() { super.dispose(); _streamController.close(); _subscription1.cancel(); _subscription2.cancel(); _subscription3.cancel(); }
@override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text('Broadcast Stream'), ), body: Container( width: double.infinity, height: MediaQuery.of(context).size.height, child: Column( mainAxisAlignment: MainAxisAlignment.center, crossAxisAlignment: CrossAxisAlignment.center, children: [ Text('Count: $_count'), SizedBox(height: 12.0), Text('S1: $_s1'), SizedBox(height: 12.0), Text('S2: $_s2'), SizedBox(height: 12.0), Text('S3: $_s3'), SizedBox(height: 12.0), FloatingActionButton( onPressed: _add, child: Icon(Icons.plus_one), ), ], ), ), ); } }
是处理异步编程的方式之一,它提供一个了异步的事件序列,并在你准备好接受时发送。在 Dart 中流分为同步流和异步流,以及单订阅流和广播流,有多种方式创建 Stream
