How to create a Flowable that updates from multiple data?

Issue

I’m building a chat application and I have the following situation: on my database, I have 3 sources of data but two of them depends on the first.

A -> is a Contact
B -> is the last unread message
C -> is the messages count

I need to first fetch the contacts, then using its ID I need to fetch the other two. The requirement is to keep watching for any data change in the Contact information, or unread message or message count. How can I do that using RxJava just updating the necessary not to block the UI? (Some people told me I could use Flowable for that).

What I’ve tried so far:

fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
        Flowable.fromIterable(contacts)
            .flatMapSingle { contact ->
                getLastUnreadMessage(contact.id)
                    .materialize()
                    .zipWith(
                        getUnreadCount(contact.id)
                    ) { msg: Notification<Messages>, unreadCount: Int ->
                        Chat(contact, msg.value, unreadCount)
                    }
            }.toList().toFlowable()
    }.subscribeOn(schedulers.io).observeOn(schedulers.main)

In the viewModel

var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())

But it seems it just updates once then it doesn’t update any more data.

Solution

The problem in your code is, that you only observe changes of contacts list and get last messages and unread count only once per contact.

This should work:

  fun queryAllChats() = dao.queryContactsFlowable()
    .switchMap { contacts ->
      val chats = contacts.map { contact ->
        Flowable.combineLatest(
          getUnreadCount(contact.id), // Flowable
          getLastUnreadMessage(contact.id), // Flowable
          { unreadCount, unreadMessages ->
            Chat(contact, unreadMessages, unreadCount)
          })
      }
      [email protected] Flowable.combineLatest(chats) {
        it.map { it as Chat }
      }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

Answered By – Dominik Setniewski

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published