matrix_sdk_ui/timeline/pagination.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_rx::StreamExt as _;
use async_stream::stream;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::event_cache::{self, EventCacheError, RoomPaginationStatus};
use tracing::{instrument, warn};
use super::Error;
impl super::Timeline {
/// Add more events to the start of the timeline.
///
/// Returns whether we hit the start of the timeline.
#[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
pub async fn paginate_backwards(&self, mut num_events: u16) -> Result<bool, Error> {
if self.controller.is_live().await {
match self.controller.live_lazy_paginate_backwards(num_events).await {
Some(needed_num_events) => {
num_events = needed_num_events.try_into().expect(
"failed to cast `needed_num_events` (`usize`) into `num_events` (`usize`)",
);
}
None => {
// We could adjust the skip count to a lower value, while passing the requested
// number of events. We *may* have reached the start of the timeline, but since
// we're fulfilling the caller's request, assume it's not the case and return
// false here. A subsequent call will go to the `Some()` arm of this match, and
// cause a call to the event cache's pagination.
return Ok(false);
}
}
Ok(self.live_paginate_backwards(num_events).await?)
} else {
Ok(self.controller.focused_paginate_backwards(num_events).await?)
}
}
/// Add more events to the end of the timeline.
///
/// Returns whether we hit the end of the timeline.
#[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
if self.controller.is_live().await {
Ok(true)
} else {
Ok(self.controller.focused_paginate_forwards(num_events).await?)
}
}
/// Paginate backwards in live mode.
///
/// This can only be called when the timeline is in live mode, not focused
/// on a specific event.
///
/// Returns whether we hit the start of the timeline.
async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
loop {
match self.event_cache.pagination().run_backwards_once(batch_size).await {
Ok(outcome) => {
// As an exceptional contract, restart the back-pagination if we received an
// empty chunk.
if outcome.reached_start || !outcome.events.is_empty() {
return Ok(outcome.reached_start);
}
}
Err(EventCacheError::AlreadyBackpaginating) => {
// Treat an already running pagination exceptionally, returning false so that
// the caller retries later.
warn!("Another pagination request is already happening, returning early");
return Ok(false);
}
// Propagate other errors as such.
Err(err) => return Err(err),
}
}
}
/// Subscribe to the back-pagination status of a live timeline.
///
/// This will return `None` if the timeline is in the focused mode.
///
/// Note: this may send multiple Paginating/Idle sequences during a single
/// call to [`Self::paginate_backwards()`].
pub async fn live_back_pagination_status(
&self,
) -> Option<(RoomPaginationStatus, impl Stream<Item = RoomPaginationStatus>)> {
if !self.controller.is_live().await {
return None;
}
let pagination = self.event_cache.pagination();
let mut status = pagination.status();
let current_value = status.next_now();
let stream = Box::pin(stream! {
let status_stream = status.dedup();
pin_mut!(status_stream);
while let Some(state) = status_stream.next().await {
yield state;
}
});
Some((current_value, stream))
}
}