In Flutter we have widgets that define our screens. A widget is a Flutter component that can be built out of 0, 1 or multiple widgets which all together can become a screen. These screens display some information to us and most of the time we need this information to change. For example, if we have an application that displays a list of universities we need a screen with a list of widgets where each widget displays some information for a specific university. This screen should also display something when it loads the data (e.g. a loading spinner) and it also has to show something when it failed to load the data. We need to find a way to manage all 3 phases (Loading, Success, Error) within the app life-cycle in an easy-to-test and easy-to-use way.
These 3 phases, the moment when the screen is loading the data, the moment when the data is successfully loaded and the moment when there was something wrong and an error is displayed, build the screen state.
What we will achieve here is to manage the screen state using Dart streams and RxDart library.
codebase for the examples within this article
Rx stands for ReactiveX and it comes from reactive programming. In Dart, RxDart comes with a bunch of extensions over Streams
and StreamControllers
and they introduce a lot of other specific Rx components such as BehaviourSubject
, ReplaySubjects
, MergeStreams
, CombineStreams
etc. Rx library is based on functional programming style, meaning we can have a chain of transformation functions applied to a stream:
return _universityEndpoint
.getUniversitiesByCountry(country)
.safeApiConvert((p0) => p0.map((e) => e.toDomain()).toList())
.map((event) => transformToSomethingElse(event))
.flatMap((data) => tranformIntoAStream(data))
.mergeWith([stream2, stream3]);
“Functional programming consists in building software by composing pure functions while avoiding shared state and mutable data.
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.
Together, functional programming and reactive programming form a combination of functional and reactive techniques that can represent an elegant approach to event-driven programming – with values that change over time and where the consumer reacts to the data as it comes in. This technology brings together different implementations of its core principles, some authors came up with a document that defines the common vocabulary for describing the new type of applications.” - https://www.baeldung.com/rx-java
Since RxDart uses Streams it follows the Observer pattern. What you must always know when working with streams is that they never work unless you subscribe to them.
// Stream.fromIterable builds a stream which emits the values from the given list
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7])
// where function calls its body each time a value is emitted by the stream
// and checks if it is an even number. This function acts like a filter.
.where((element) {
// if the element is even this function returns true
// and the element passes the filter
// otherwise it returns false and the element is filtered out.
return element % 2 == 0;
});
You must always subscribe to a stream to get any information from it!
The above code does nothing because there is no other code that subscribes/listens to it. So what we can do to make it work is to call listen on this stream.
var evenNumbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7])
.where((element) {
return element % 2 == 0;
});
evenNumbers.listen((element){
print(element);
});
Now, what .listen
function does is to create a subscription to the evenNumbers
stream and listen to its events. The moment .listen
is called and registered as a subscription for evenNumbers
stream, the stream code starts working. Firstly it takes the first element from the list then it checks it in the .where
function and then if it passes the test (in our case is an even number) it is sent in the listen
callback and then it’s printed. So the above code prints 2 4 6
When we manually listen for streams we have to be really careful because we can easily add memory leaks to our code. When we listen to a stream, a subscription is created. This subscription is stored in memory and therefore if it’s not disposed/deleted it will remain there, causing memory leaks. What we can do to prevent this to happen is store the subscription somewhere and then, we don’t need them anymore we dispose of them.
CompositeSubscription subscriptions = CompositeSubscription();
var evenNumbers =
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7]).where((element) {
return element % 2 == 0;
});
subscriptions.add(evenNumbers.listen((element) {
print(element);
}));
// other code ....
// .....
// When we are done with our streams
subscriptions.dispose();
Dart provides us with an easy way to handle our subscriptions. There is CompositeSubscription
which is similar to a list of subscriptions. Each time we want to listen to a stream, we add the listen inside of our subscription list, and at the end when we don’t need them anymore we call dispose
function which closes all the subscriptions from memory.
More information about RxDart can be found on their GitHub page https://github.com/ReactiveX/rxdart
A non-disposed Stream subscription produces memory leaks
Before managing the state, we need to create a class that encapsulates our app states (Loading
, Data
, Error
). To avoid lots of boilerplate code for creating different methods on models (copyWith
, equals
, toString
, when
etc) I’m using freezed library.
@freezed
class AppResult<T> with _$AppResult {
const AppResult._();
const factory AppResult.data(T value) = Data;
const factory AppResult.loading() = Loading;
const factory AppResult.appError([String? message]) = AppError;
}
Now that we have our AppResult
class, we can use it to handle our states. So what we are going to do is whenever we want to handle these 3 states somewhere in the app we will make functions to return an object of type AppResult<OurData>
This object can have one of the 3 states (Data
, Loading
, Error
).
After creating the state, we have to handle it. Since we are using freezed
it already builds for us a when
method that allows us to do an action for each possible state.
appResult.when(
data: (data) {
// code run when appResult has data stata
},
loading: () {
// code run when appResult has loading state
},
appError: (appError) {
// code run when appResult has error state
},
);
If we don’t use freezed
then we have to manually write a switch to check which state we are currently in. e.g.
switch(appResult){
case AppResult.data:
// code run when appResult has loading state
break;
case AppResult.loading:
// code run when appResult has loading state
break;
case AppResult.error:
// code run when appResult has error state
break;
}
Before jumping into the code I want to describe a little the application built using RxDart for state management. There will be an app that provides a list of universities that can be queried by their country.
There we are using:
RxDart
for data streams and state managementRetrofit
with Dio
for handling API callsFreezed
for avoiding models boilerplate codeJson Serializer
for helping us with serializing and de-serializing JSON data[http://universities.hipolabs.com](http://universities.hipolabs.com)
for getting universities’ data
This article only focuses on RxDart state management.
Since we are following the MVVM with Clean Architecture it means we have the following flow of data:
Wiget
/Screen
is openedViewModel
is initialized with it as wellViewModel
calls the usecase for initializing the data
e.g:
UniversitiesScreen
is openedUniversitiesScreen
initializes the ViewModel
UniversitiesScreen
subscribes to the stream with universities data from ViewModel
UniversitiesViewModel
calls GetUniversitiesByCountryUseCase
to get the universitiesUniversitiesRepository
to get the universities’ dataUniversityRemoteDataSource
to get the universities’ dataUniversityEndpoint
does an API request to the API to fetch the data
When there is a result from the API call the data goes all the way back via the streams.
UniversityEndpoint
gets the data (error, data, loading)UniversityRemoteDataSource
UniversitiesRepository
GetUniversitiesByCountryUseCase
UniversitiesViewModel
UniversitiesScreen
where it is handled by its state and displayed
Since we are using streams, we can easily send how many values we want on the same stream.
Here we have the UniversitiesScreen
:
class UniversitiesScreen extends StatefulWidget {
const UniversitiesScreen({Key? key}) : super(key: key);
@override
State<UniversitiesScreen> createState() => _UniversitiesScreenState();
}
class _UniversitiesScreenState extends State<UniversitiesScreen> {
final UniversitiesViewModel _viewModel = UniversitiesViewModel();
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("RxDart State"),
),
body: Column(
children: [
Container(
margin: const EdgeInsets.all(10),
child: TextField(
onChanged: _viewModel.searchByCountry,
decoration: const InputDecoration(
labelText: 'Search', suffixIcon: Icon(Icons.search)),
),
),
Expanded(
child: StreamBuilder(
stream: _viewModel.universities,
builder: (BuildContext context,
AsyncSnapshot<AppResult<UniversityScreenState>> snapshot) {
return snapshot.data?.when(
data: (e) => _buildUniversities(e.universities),
loading: () => _buildLoading(),
appError: (e) => _buildError(e.toString()),
apiError: (e) => _buildError(e.toString())) ??
_buildLoading();
},
),
),
],
),
);
}
Widget _buildUniversities(List<UniversityScreenModel> universities) {
return ListView.builder(
itemCount: universities.length,
itemBuilder: (BuildContext context, int index) {
return Card(
elevation: 5,
margin: const EdgeInsets.all(10),
child: Container(
padding: const EdgeInsets.all(25),
child: Column(
children: [
Text("Name: ${universities[index].name}"),
Text("Country: ${universities[index].country}"),
Text("Website: ${universities[index].website}"),
],
),
),
);
},
);
}
Widget _buildLoading() {
return const Center(
child: CircularProgressIndicator(),
);
}
Widget _buildError(String error) {
return Center(
child: Text(
error,
style:
Theme.of(context).textTheme.headline3?.copyWith(color: Colors.red),
),
);
}
}
Our screen is built having a Scaffold
with an AppBar
and a column with an input TextField
for searching the universities by country and then the list of universities.
For the list of universities, we can see that we have a StreamBuilder
there:
StreamBuilder(
stream: _viewModel.universities,
builder: (BuildContext context,
AsyncSnapshot<AppResult<UniversityScreenState>> snapshot) {
return snapshot.data?.when(
data: (e) => _buildUniversities(e.universities),
loading: () => _buildLoading(),
appError: (e) => _buildError(e.toString()),
apiError: (e) => _buildError(e.toString())) ??
_buildLoading();
},
),
We are using the StreamBuilder
here because all of our data is managed by Streams, remember? The StreamBuilder
is managing the subscription to the university stream automatically (remember that for a stream to work we have to subscribe/listen to it and then take care of the subscription to prevent memory leaks) and it provides us a callback function builder
where we can build the actual widget that we want to display based on the result provided.
We can also manually subscribe to the stream and handle the state changing using setState
callback. Below is an example of how to listen to a stream and how to dispose of the subscriptions.
final UniversitiesViewModel _viewModel = UniversitiesViewModel();
final CompositeSubscription _subscriptions = CompositeSubscription();
AppResult<UniversityScreenState> _screenState = const AppResult.loading();
@override
void initState() {
super.initState();
_subscriptions.add(_viewModel.universities.listen((event) {
setState(() {
_screenState = event;
});
}));
}
@override
void dispose() {
_subscriptions.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("RxDart State"),
),
body: Column(
children: [
Container(
margin: const EdgeInsets.all(10),
child: TextField(
onChanged: _viewModel.searchByCountry,
decoration: const InputDecoration(
labelText: 'Search', suffixIcon: Icon(Icons.search)),
),
),
Expanded(
child: _screenState.when(
data: (e) => _buildUniversities(e.universities),
loading: () => _buildLoading(),
appError: (e) => _buildError(e.toString()),
apiError: (e) => _buildError(e.toString())),
),
],
),
);
}
}
The ViewModel
is initialized exactly as before, but this time we are using a variable to keep the app state. Since we don’t have any data stored locally and we always have to fetch it from the server we want to see a loading indicator when we initialize the screen so we also initialize the _screenState
variable with the loading state.
Then, before the screen is built, in the initState
callback we subscribe to the universities data stream and register the subscription in our subscriptions list _subscriptions
. In the subscription callback, you can see that we call setState
and then we set the new value in our _screenState
variable. Here the magic happens. When setState
is called, Flutter checks what was changed and goes into the widget tree, and rebuilds the widgets depending on the changed variables. So it goes in build
method and rebuilds the widgets from there using the new _screenState
To prevent memory leaks we dispose of/close all the subscriptions from the list inside dispose
callback.
This is all we have to do for managing the screen state on the UI.
In the UniversitiesViewModel
there are only a few lines of code.
class UniversitiesViewModel {
final GetUniversitiesByCountryUseCase _getUniversitiesByCountryUseCase;
final Subject<String?> _searchByCountry = PublishSubject();
late Stream<AppResult<UniversityScreenState>> universities;
UniversitiesViewModel(
{GetUniversitiesByCountryUseCase? getUniversitiesByCountryUseCase})
: _getUniversitiesByCountryUseCase = getUniversitiesByCountryUseCase ??
GetUniversitiesByCountryUseCase() {
universities = _searchByCountry
.startWith(null)
.flatMap((value) => _getUniversitiesByCountryUseCase.invoke(value));
}
void searchByCountry(String country) {
_searchByCountry.add(country);
}
}
We can notice that here we have something new which comes from RxDart. Here is a Subject
which is created using a PublishSubject
. These Subject
s are some extensions over the StreamController
, which is just a stream that allows us to send values through it. This Subject
called _searchByCountry
is used for sending the search query to the useCase
for providing us the universities from that country. Since we don’t want any other class to be able to send information via this subject, we make it private.
Now that we have a way to send our requests we also need a way for the view to be able to get the information that it needs. For this, we create a Stream
variable that will send the app state data through it. This variable universities
must be initialized in the viewModel
’s constructor. There we also set some rules/transformations to it:
// We assign our _searchByCountry Subject to the universities variable
universities = _searchByCountry
// When the application is open we also want to load some universities and because
// there we don't want to choose a country by default we just tell the stream
// connection to send null when it's connected for the first time
.startWith(null)
// Now we want to call our useCase to get us the universities for the specified country
// This is done by using flatMap which maps/transforms a stream into another stream
.flatMap((value) => _getUniversitiesByCountryUseCase.invoke(value));
Now that we initialized our stream of data, there is nothing else for us to do. The view has to subscribe to universities
streams and it will get the needed data.
If you noticed on the screen, there was the Search
TextField
used for getting user’s input for a specific country
TextField(
onChanged: _viewModel.searchByCountry,
decoration: const InputDecoration(
labelText: 'Search', suffixIcon: Icon(Icons.search)),
)
There we have to set a callback function for onChanged
action so the moment the text is changed, it will call the callback function we specify. In our case, we will use the searchByCountry
function from our viewModel
. What this function does, is that it only sends the string received into the internal subject and then everything happens automatically (the string is sent to flatMap
→ the flatMap
calls _getUniversitiesByCountryUseCase
→ and then it sends the data back to the listener)
void searchByCountry(String country) {
_searchByCountry.add(country);
}
Now we just cover everything related to the actual state management for our Flutter widgets.
For this article, I haven’t covered the details of the architectural approach or how the data is fetched. In our case, if you don’t want to use MVVM with Clean Architecture, you can do directly the API call in the viewModel
or even in the widget and then transform it into a stream of AppResult<UniversityScreenState>
and everything will work. As long as you use Streams for getting the data you can the above steps can be applied for any architecture you choose.
The main difference between streams and async-await is that a Stream
is used to send multiple data asynchronous whereas async-await is used to only send one element asynchronous.
For lots of the projects which only interact with Rest APIs, there will be no need for streams, a simple await on an async function can do the job. But in that case, we have to handle the loading state manually, maybe in the viewmodel
or in the widget.
Moreover, if an application will need to connect to a WebSocket then an async-await can’t help us and we must use a stream that is connected to the WebSocket and then continuously sends us the data.
Another use case that is way easier to handle with Streams is when we need to get the data from the local storage, display it on the screen, and then update it from the server, and after it is updated display the updated version.
Most of the time in real-world applications there are screens that needs data from multiple sources. An example can be a screen with user info and a list of users’ books. In this case, we have to combine data from two sources (from two streams in this case) and merge their data into a ScreenState
object. e.g
class GetMainScreenUseCase {
final BooksRepository _booksRepository;
final UserRepository _userRepository;
GetMainScreenUseCase({
BooksRepository? booksRepository,
UserRepository? userRepository,
}) : _booksRepository = booksRepository ?? BooksRepository(),
_userRepository = userRepository ?? UserRepository();
Stream<AppResult<ScreenState>> invoke() {
return CombineLatestStream.combine2(
_userRepository.getUserData(),
_booksRepository.getBooks(),
(userDataResponse, booksResponse) {
// Here we need to check if we got success on both streams
if (userDataResponse is AppResult.data &&
booksResponse is AppResult.data) {
// Since we got data on both streams now we create our ScreenState
// with the received data.
return AppResult.data(
ScreenState.from(
userDataResponse,
booksResponse,
),
);
}else if(....){
.....
}....
else{
.....
}
},
);
}
}
There can be other cases where you need to display some data from different sources, for that case you can use MergeStream
.
Other useful streams are BehaviourSubjects
which are streams which keep the last emitted value (saves state) and directly send it when someone subscribes to it. This behavior can be added to simple streams as well, by using .reply
extension. These two are useful when we want to keep the screen’s state e.g. User is on the main page, see all the data there, then they go on the details page, and when they come back to the main screen we need to display the same data as before without triggering any DB/API/SharedPreferences call.
You can find out more about what these streams provide by reading RxDart documentation and the one from RxJava. https://github.com/ReactiveX/rxdart
https://reactivex.io/documentation/observable.html
Other articles about RxDart and its integration with Flutter: