Flutter - State management with RxDart Streams

Written by dacianflorea | Published 2022/05/30
Tech Story Tags: flutter | rxdart | reactive-programming | state-management | dart | dart-programming-language | mobile-app-development | flutter-for-mobile-app

TLDRIn Flutter we need to find a way to manage all 3 phases (Loading, Success, Error) within the app life-cycle in an easy-to-test and easy-to-use way. This article uses Dart streams and RxDart library to build the screen state. The code is based on a functional programming style, meaning we can have a chain of transformation functions applied to a stream. This function acts as a filter. It calls its body each time a value is emitted by the stream. It checks if it is an even number and returns the element.via the TL;DR App

What is State Management and Why do we need it?

In Flutter we have widgets that define our screens. A widget is a Flutter component that can be built out of 0, 1 or multiple widgets which all together can become a screen. These screens display some information to us and most of the time we need this information to change. For example, if we have an application that displays a list of universities we need a screen with a list of widgets where each widget displays some information for a specific university. This screen should also display something when it loads the data (e.g. a loading spinner) and it also has to show something when it failed to load the data. We need to find a way to manage all 3 phases (Loading, Success, Error) within the app life-cycle in an easy-to-test and easy-to-use way.

These 3 phases, the moment when the screen is loading the data, the moment when the data is successfully loaded and the moment when there was something wrong and an error is displayed, build the screen state.

What we will achieve here is to manage the screen state using Dart streams and RxDart library.

codebase for the examples within this article

What is RxDart?

Rx stands for ReactiveX and it comes from reactive programming. In Dart, RxDart comes with a bunch of extensions over Streams and StreamControllers and they introduce a lot of other specific Rx components such as BehaviourSubject, ReplaySubjects, MergeStreams, CombineStreams etc. Rx library is based on functional programming style, meaning we can have a chain of transformation functions applied to a stream:

return _universityEndpoint
        .getUniversitiesByCountry(country)
        .safeApiConvert((p0) => p0.map((e) => e.toDomain()).toList())
        .map((event) => transformToSomethingElse(event))
        .flatMap((data) => tranformIntoAStream(data))
        .mergeWith([stream2, stream3]);

“Functional programming consists in building software by composing pure functions while avoiding shared state and mutable data.

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.

Together, functional programming and reactive programming form a combination of functional and reactive techniques that can represent an elegant approach to event-driven programming – with values that change over time and where the consumer reacts to the data as it comes in. This technology brings together different implementations of its core principles, some authors came up with a document that defines the common vocabulary for describing the new type of applications.” - https://www.baeldung.com/rx-java

Since RxDart uses Streams it follows the Observer pattern. What you must always know when working with streams is that they never work unless you subscribe to them.

// Stream.fromIterable builds a stream which emits the values from the given list
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7])
      // where function calls its body each time a value is emitted by the stream
      // and checks if it is an even number. This function acts like a filter.
			.where((element) {
          // if the element is even this function returns true
          // and the element passes the filter
          // otherwise it returns false and the element is filtered out.
		      return element % 2 == 0;
	    });

You must always subscribe to a stream to get any information from it!

The above code does nothing because there is no other code that subscribes/listens to it. So what we can do to make it work is to call listen on this stream.

var evenNumbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7])
			.where((element) {
		      return element % 2 == 0;
	    });
evenNumbers.listen((element){
	print(element);
});

Now, what .listen function does is to create a subscription to the evenNumbers stream and listen to its events. The moment .listen is called and registered as a subscription for evenNumbers stream, the stream code starts working. Firstly it takes the first element from the list then it checks it in the .where function and then if it passes the test (in our case is an even number) it is sent in the listen callback and then it’s printed. So the above code prints 2 4 6

When we manually listen for streams we have to be really careful because we can easily add memory leaks to our code. When we listen to a stream, a subscription is created. This subscription is stored in memory and therefore if it’s not disposed/deleted it will remain there, causing memory leaks. What we can do to prevent this to happen is store the subscription somewhere and then, we don’t need them anymore we dispose of them.

CompositeSubscription subscriptions = CompositeSubscription();
var evenNumbers =
    Stream.fromIterable([1, 2, 3, 4, 5, 6, 7]).where((element) {
  return element % 2 == 0;
});
subscriptions.add(evenNumbers.listen((element) {
  print(element);
}));

// other code ....
// .....

// When we are done with our streams
subscriptions.dispose();

Dart provides us with an easy way to handle our subscriptions. There is CompositeSubscription which is similar to a list of subscriptions. Each time we want to listen to a stream, we add the listen inside of our subscription list, and at the end when we don’t need them anymore we call dispose function which closes all the subscriptions from memory.

More information about RxDart can be found on their GitHub page https://github.com/ReactiveX/rxdart

A non-disposed Stream subscription produces memory leaks

What is App State and how do we Create it?

Before managing the state, we need to create a class that encapsulates our app states (Loading, Data, Error). To avoid lots of boilerplate code for creating different methods on models (copyWith, equals, toString, when etc) I’m using freezed library.

@freezed
class AppResult<T> with _$AppResult {
  const AppResult._();

  const factory AppResult.data(T value) = Data;

  const factory AppResult.loading() = Loading;

  const factory AppResult.appError([String? message]) = AppError;
}

Now that we have our AppResult class, we can use it to handle our states. So what we are going to do is whenever we want to handle these 3 states somewhere in the app we will make functions to return an object of type AppResult<OurData> This object can have one of the 3 states (Data, Loading, Error).

After creating the state, we have to handle it. Since we are using freezed it already builds for us a when method that allows us to do an action for each possible state.

appResult.when(
    data: (data) {
	// code run when appResult has data stata
	},
    loading: () {
	// code run when appResult has loading state
	},
    appError: (appError) {
	// code run when appResult has error state
	},
);

If we don’t use freezed then we have to manually write a switch to check which state we are currently in. e.g.

switch(appResult){
    case AppResult.data:
    // code run when appResult has loading state
      break;
    case AppResult.loading:
    // code run when appResult has loading state
      break;
    case AppResult.error:
    // code run when appResult has error state
      break;
}

State Management with RxDart

Before jumping into the code I want to describe a little the application built using RxDart for state management. There will be an app that provides a list of universities that can be queried by their country.

There we are using:

  • RxDart for data streams and state management
  • MVVM with Clean Architecture as a design pattern
  • Retrofit with Dio for handling API calls
  • Freezed for avoiding models boilerplate code
  • Json Serializer for helping us with serializing and de-serializing JSON data
  • Free API [http://universities.hipolabs.com](http://universities.hipolabs.com) for getting universities’ data

This article only focuses on RxDart state management.

Since we are following the MVVM with Clean Architecture it means we have the following flow of data:

  1. Wiget/Screen is opened
  2. When it is initialized, its ViewModel is initialized with it as well
  3. The ViewModel calls the usecase for initializing the data
  4. The usecase calls the repository to get the data
  5. Based on where the data is and the logic behind it, the repository calls an endpoint to an API or queries the local database, or gets the data from shared preferences. The data comes back exactly through the same classes in reverse order using a stream.

e.g:

  1. UniversitiesScreen is opened
  2. UniversitiesScreen initializes the ViewModel
  3. UniversitiesScreen subscribes to the stream with universities data from ViewModel
  4. UniversitiesViewModel calls GetUniversitiesByCountryUseCase to get the universities
  5. The use-case calls UniversitiesRepository to get the universities’ data
  6. The repository calls the UniversityRemoteDataSource to get the universities’ data
  7. The data source using UniversityEndpoint does an API request to the API to fetch the data

When there is a result from the API call the data goes all the way back via the streams.

  1. UniversityEndpoint gets the data (error, data, loading)
  2. it sends it back to UniversityRemoteDataSource
  3. sends it back to UniversitiesRepository
  4. sends it back to GetUniversitiesByCountryUseCase
  5. sends it back to UniversitiesViewModel
  6. then it finally arrives on the UniversitiesScreen where it is handled by its state and displayed

Since we are using streams, we can easily send how many values we want on the same stream.

Let’s jump into the code!

Here we have the UniversitiesScreen:

class UniversitiesScreen extends StatefulWidget {
  const UniversitiesScreen({Key? key}) : super(key: key);

  @override
  State<UniversitiesScreen> createState() => _UniversitiesScreenState();
}

class _UniversitiesScreenState extends State<UniversitiesScreen> {
  final UniversitiesViewModel _viewModel = UniversitiesViewModel();

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text("RxDart State"),
      ),
      body: Column(
        children: [
          Container(
            margin: const EdgeInsets.all(10),
            child: TextField(
              onChanged: _viewModel.searchByCountry,
              decoration: const InputDecoration(
                  labelText: 'Search', suffixIcon: Icon(Icons.search)),
            ),
          ),
          Expanded(
            child: StreamBuilder(
              stream: _viewModel.universities,
              builder: (BuildContext context,
                  AsyncSnapshot<AppResult<UniversityScreenState>> snapshot) {
                return snapshot.data?.when(
                        data: (e) => _buildUniversities(e.universities),
                        loading: () => _buildLoading(),
                        appError: (e) => _buildError(e.toString()),
                        apiError: (e) => _buildError(e.toString())) ??
                    _buildLoading();
              },
            ),
          ),
        ],
      ),
    );
  }

  Widget _buildUniversities(List<UniversityScreenModel> universities) {
    return ListView.builder(
      itemCount: universities.length,
      itemBuilder: (BuildContext context, int index) {
        return Card(
          elevation: 5,
          margin: const EdgeInsets.all(10),
          child: Container(
            padding: const EdgeInsets.all(25),
            child: Column(
              children: [
                Text("Name: ${universities[index].name}"),
                Text("Country: ${universities[index].country}"),
                Text("Website: ${universities[index].website}"),
              ],
            ),
          ),
        );
      },
    );
  }

  Widget _buildLoading() {
    return const Center(
      child: CircularProgressIndicator(),
    );
  }

  Widget _buildError(String error) {
    return Center(
      child: Text(
        error,
        style:
            Theme.of(context).textTheme.headline3?.copyWith(color: Colors.red),
      ),
    );
  }
}

Our screen is built having a Scaffold with an AppBar and a column with an input TextField for searching the universities by country and then the list of universities.

For the list of universities, we can see that we have a StreamBuilder there:

StreamBuilder(
  stream: _viewModel.universities,
  builder: (BuildContext context,
      AsyncSnapshot<AppResult<UniversityScreenState>> snapshot) {
    return snapshot.data?.when(
            data: (e) => _buildUniversities(e.universities),
            loading: () => _buildLoading(),
            appError: (e) => _buildError(e.toString()),
            apiError: (e) => _buildError(e.toString())) ??
        _buildLoading();
  },
),

We are using the StreamBuilder here because all of our data is managed by Streams, remember? The StreamBuilder is managing the subscription to the university stream automatically (remember that for a stream to work we have to subscribe/listen to it and then take care of the subscription to prevent memory leaks) and it provides us a callback function builder where we can build the actual widget that we want to display based on the result provided.

We can also manually subscribe to the stream and handle the state changing using setState callback. Below is an example of how to listen to a stream and how to dispose of the subscriptions.

final UniversitiesViewModel _viewModel = UniversitiesViewModel();
  final CompositeSubscription _subscriptions = CompositeSubscription();

  AppResult<UniversityScreenState> _screenState = const AppResult.loading();

  @override
  void initState() {
    super.initState();
    _subscriptions.add(_viewModel.universities.listen((event) {
      setState(() {
        _screenState = event;
      });
    }));
  }

  @override
  void dispose() {
    _subscriptions.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text("RxDart State"),
      ),
      body: Column(
        children: [
          Container(
            margin: const EdgeInsets.all(10),
            child: TextField(
              onChanged: _viewModel.searchByCountry,
              decoration: const InputDecoration(
                  labelText: 'Search', suffixIcon: Icon(Icons.search)),
            ),
          ),
          Expanded(
            child: _screenState.when(
                data: (e) => _buildUniversities(e.universities),
                loading: () => _buildLoading(),
                appError: (e) => _buildError(e.toString()),
                apiError: (e) => _buildError(e.toString())),
          ),
        ],
      ),
    );
  }
}

The ViewModel is initialized exactly as before, but this time we are using a variable to keep the app state. Since we don’t have any data stored locally and we always have to fetch it from the server we want to see a loading indicator when we initialize the screen so we also initialize the _screenState variable with the loading state.

Then, before the screen is built, in the initState callback we subscribe to the universities data stream and register the subscription in our subscriptions list _subscriptions. In the subscription callback, you can see that we call setState and then we set the new value in our _screenState variable. Here the magic happens. When setState is called, Flutter checks what was changed and goes into the widget tree, and rebuilds the widgets depending on the changed variables. So it goes in build method and rebuilds the widgets from there using the new _screenState

To prevent memory leaks we dispose of/close all the subscriptions from the list inside dispose callback.

This is all we have to do for managing the screen state on the UI.

In the UniversitiesViewModel there are only a few lines of code.

class UniversitiesViewModel {
  final GetUniversitiesByCountryUseCase _getUniversitiesByCountryUseCase;

  final Subject<String?> _searchByCountry = PublishSubject();

  late Stream<AppResult<UniversityScreenState>> universities;

  UniversitiesViewModel(
      {GetUniversitiesByCountryUseCase? getUniversitiesByCountryUseCase})
      : _getUniversitiesByCountryUseCase = getUniversitiesByCountryUseCase ??
            GetUniversitiesByCountryUseCase() {
    universities = _searchByCountry
        .startWith(null)
        .flatMap((value) => _getUniversitiesByCountryUseCase.invoke(value));
  }

  void searchByCountry(String country) {
    _searchByCountry.add(country);
  }
}

We can notice that here we have something new which comes from RxDart. Here is a Subject which is created using a PublishSubject. These Subjects are some extensions over the StreamController, which is just a stream that allows us to send values through it. This Subject called _searchByCountry is used for sending the search query to the useCase for providing us the universities from that country. Since we don’t want any other class to be able to send information via this subject, we make it private.

Now that we have a way to send our requests we also need a way for the view to be able to get the information that it needs. For this, we create a Stream variable that will send the app state data through it. This variable universities must be initialized in the viewModel’s constructor. There we also set some rules/transformations to it:

// We assign our _searchByCountry Subject to the universities variable
universities = _searchByCountry
// When the application is open we also want to load some universities and because
// there we don't want to choose a country by default we just tell the stream 
// connection to send null when it's connected for the first time
        .startWith(null)
// Now we want to call our useCase to get us the universities for the specified country
// This is done by using flatMap which maps/transforms a stream into another stream
        .flatMap((value) => _getUniversitiesByCountryUseCase.invoke(value));

Now that we initialized our stream of data, there is nothing else for us to do. The view has to subscribe to universities streams and it will get the needed data.

If you noticed on the screen, there was the Search TextField used for getting user’s input for a specific country

TextField(
  onChanged: _viewModel.searchByCountry,
  decoration: const InputDecoration(
      labelText: 'Search', suffixIcon: Icon(Icons.search)),
)

There we have to set a callback function for onChanged action so the moment the text is changed, it will call the callback function we specify. In our case, we will use the searchByCountry function from our viewModel. What this function does, is that it only sends the string received into the internal subject and then everything happens automatically (the string is sent to flatMap → the flatMap calls _getUniversitiesByCountryUseCase → and then it sends the data back to the listener)

void searchByCountry(String country) {
    _searchByCountry.add(country);
}

Now we just cover everything related to the actual state management for our Flutter widgets.

For this article, I haven’t covered the details of the architectural approach or how the data is fetched. In our case, if you don’t want to use MVVM with Clean Architecture, you can do directly the API call in the viewModel or even in the widget and then transform it into a stream of AppResult<UniversityScreenState> and everything will work. As long as you use Streams for getting the data you can the above steps can be applied for any architecture you choose.

Why Streams and not Async and Await?

The main difference between streams and async-await is that a Stream is used to send multiple data asynchronous whereas async-await is used to only send one element asynchronous.

For lots of the projects which only interact with Rest APIs, there will be no need for streams, a simple await on an async function can do the job. But in that case, we have to handle the loading state manually, maybe in the viewmodel or in the widget.

Moreover, if an application will need to connect to a WebSocket then an async-await can’t help us and we must use a stream that is connected to the WebSocket and then continuously sends us the data.

Another use case that is way easier to handle with Streams is when we need to get the data from the local storage, display it on the screen, and then update it from the server, and after it is updated display the updated version.

Benefits of streams

Most of the time in real-world applications there are screens that needs data from multiple sources. An example can be a screen with user info and a list of users’ books. In this case, we have to combine data from two sources (from two streams in this case) and merge their data into a ScreenState object. e.g

class GetMainScreenUseCase {
  final BooksRepository _booksRepository;
  final UserRepository _userRepository;

  GetMainScreenUseCase({
    BooksRepository? booksRepository,
    UserRepository? userRepository,
  })  : _booksRepository = booksRepository ?? BooksRepository(),
        _userRepository = userRepository ?? UserRepository();

  Stream<AppResult<ScreenState>> invoke() {
    return CombineLatestStream.combine2(
      _userRepository.getUserData(),
      _booksRepository.getBooks(),
      (userDataResponse, booksResponse) {
				// Here we need to check if we got success on both streams
        if (userDataResponse is AppResult.data &&
            booksResponse is AppResult.data) {
					// Since we got data on both streams now we create our ScreenState
					// with the received data.
          return AppResult.data(
            ScreenState.from(
              userDataResponse,
              booksResponse,
            ),
          );
        }else if(....){
          .....
        }....
        else{
          .....
        }
      },
    );
  }
}

There can be other cases where you need to display some data from different sources, for that case you can use MergeStream.

Other useful streams are BehaviourSubjects which are streams which keep the last emitted value (saves state) and directly send it when someone subscribes to it. This behavior can be added to simple streams as well, by using .reply extension. These two are useful when we want to keep the screen’s state e.g. User is on the main page, see all the data there, then they go on the details page, and when they come back to the main screen we need to display the same data as before without triggering any DB/API/SharedPreferences call.

You can find out more about what these streams provide by reading RxDart documentation and the one from RxJava. https://github.com/ReactiveX/rxdart

https://reactivex.io/documentation/observable.html

Other articles about RxDart and its integration with Flutter:


Written by dacianflorea | 💎 Senior Flutter Engineer • Contractor • Freelancer • Consultant | Building clean and scalable mobile applications
Published by HackerNoon on 2022/05/30