use std::{collections::BTreeMap, fmt, sync::Arc};
use events::{Gap, RoomEvents};
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, SyncTimelineEvent},
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use ruma::{
events::{
relation::RelationType,
room::{message::Relation, redaction::SyncRoomRedactionEvent},
AnyMessageLikeEventContent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent,
AnySyncMessageLikeEvent, AnySyncTimelineEvent,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId,
};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
};
use tracing::{trace, warn};
use super::{
paginator::{Paginator, PaginatorState},
AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination,
};
use crate::{client::WeakClient, room::WeakRoom};
pub(super) mod events;
#[derive(Clone)]
pub struct RoomEventCache {
pub(super) inner: Arc<RoomEventCacheInner>,
}
impl fmt::Debug for RoomEventCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoomEventCache").finish_non_exhaustive()
}
}
impl RoomEventCache {
pub(super) fn new(
client: WeakClient,
room_id: OwnedRoomId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(client, room_id, all_events_cache)) }
}
pub async fn subscribe(
&self,
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
let state = self.inner.state.read().await;
let events = state.events.events().map(|(_position, item)| item.clone()).collect();
Ok((events, self.inner.sender.subscribe()))
}
pub fn pagination(&self) -> RoomPagination {
RoomPagination { inner: self.inner.clone() }
}
pub async fn event(&self, event_id: &EventId) -> Option<SyncTimelineEvent> {
if let Some((room_id, event)) =
self.inner.all_events.read().await.events.get(event_id).cloned()
{
if room_id == self.inner.room_id {
return Some(event);
}
}
let state = self.inner.state.read().await;
for (_pos, event) in state.events.revents() {
if event.event_id().as_deref() == Some(event_id) {
return Some(event.clone());
}
}
None
}
pub async fn event_with_relations(
&self,
event_id: &EventId,
filter: Option<Vec<RelationType>>,
) -> Option<(SyncTimelineEvent, Vec<SyncTimelineEvent>)> {
let mut relation_events = Vec::new();
let cache = self.inner.all_events.read().await;
if let Some((_, event)) = cache.events.get(event_id) {
Self::collect_related_events(&cache, event_id, &filter, &mut relation_events);
Some((event.clone(), relation_events))
} else {
None
}
}
fn collect_related_events(
cache: &RwLockReadGuard<'_, AllEventsCache>,
event_id: &EventId,
filter: &Option<Vec<RelationType>>,
results: &mut Vec<SyncTimelineEvent>,
) {
if let Some(related_event_ids) = cache.relations.get(event_id) {
for (related_event_id, relation_type) in related_event_ids {
if let Some(filter) = filter {
if !filter.contains(relation_type) {
continue;
}
}
if results.iter().any(|e| {
e.event_id().is_some_and(|added_related_event_id| {
added_related_event_id == *related_event_id
})
}) {
continue;
}
if let Some((_, ev)) = cache.events.get(related_event_id) {
results.push(ev.clone());
Self::collect_related_events(cache, related_event_id, filter, results);
}
}
}
}
pub(crate) async fn save_event(&self, event: SyncTimelineEvent) {
if let Some(event_id) = event.event_id() {
let mut cache = self.inner.all_events.write().await;
self.inner.append_related_event(&mut cache, &event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
}
}
pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = SyncTimelineEvent>) {
let mut cache = self.inner.all_events.write().await;
for event in events {
if let Some(event_id) = event.event_id() {
self.inner.append_related_event(&mut cache, &event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
}
}
}
}
pub(super) struct RoomEventCacheInner {
room_id: OwnedRoomId,
pub sender: Sender<RoomEventCacheUpdate>,
pub state: RwLock<RoomEventCacheState>,
all_events: Arc<RwLock<AllEventsCache>>,
pub pagination_batch_token_notifier: Notify,
pub paginator: Paginator<WeakRoom>,
}
impl RoomEventCacheInner {
fn new(
client: WeakClient,
room_id: OwnedRoomId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
let sender = Sender::new(32);
let weak_room = WeakRoom::new(client, room_id);
Self {
room_id: weak_room.room_id().to_owned(),
state: RwLock::new(RoomEventCacheState {
events: RoomEvents::default(),
waited_for_initial_prev_token: false,
}),
all_events: all_events_cache,
sender,
pagination_batch_token_notifier: Default::default(),
paginator: Paginator::new(weak_room),
}
}
fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
let mut handled_read_marker = false;
trace!("Handling account data");
for raw_event in account_data {
match raw_event.deserialize() {
Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
if handled_read_marker {
continue;
}
handled_read_marker = true;
let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
event_id: ev.content.event_id,
});
}
Ok(_) => {
}
Err(e) => {
let event_type = raw_event.get_field::<String>("type").ok().flatten();
warn!(event_type, "Failed to deserialize account data: {e}");
}
}
}
}
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
self.handle_timeline(
updates.timeline,
updates.ephemeral.clone(),
updates.ambiguity_changes,
)
.await?;
self.handle_account_data(updates.account_data);
Ok(())
}
async fn handle_timeline(
&self,
timeline: Timeline,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if timeline.limited {
trace!("limited timeline, clearing all previous events and pushing new events");
self.replace_all_events_by(
timeline.events,
timeline.prev_batch,
ephemeral_events,
ambiguity_changes,
)
.await?;
} else {
trace!("adding new events");
self.append_new_events(
timeline.events,
timeline.prev_batch,
ephemeral_events,
ambiguity_changes,
)
.await?;
}
Ok(())
}
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
Ok(())
}
pub(super) async fn replace_all_events_by(
&self,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
let mut state = self.state.write().await;
state.reset();
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
self.append_events_locked_impl(
&mut state.events,
sync_timeline_events,
prev_batch.clone(),
ephemeral_events,
ambiguity_changes,
)
.await?;
self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
Ok(())
}
async fn append_new_events(
&self,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
self.append_events_locked_impl(
&mut self.state.write().await.events,
sync_timeline_events,
prev_batch,
ephemeral_events,
ambiguity_changes,
)
.await
}
fn append_related_event(
&self,
cache: &mut RwLockWriteGuard<'_, AllEventsCache>,
event: &SyncTimelineEvent,
) {
if let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() {
if let AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(ev)) =
&ev
{
if let Some(redacted_event_id) = ev.content.redacts.as_ref().or(ev.redacts.as_ref())
{
cache
.relations
.entry(redacted_event_id.to_owned())
.or_default()
.insert(ev.event_id.to_owned(), RelationType::Replacement);
}
} else {
let relationship = match ev.original_content() {
Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
if let Some(relation) = c.relates_to {
match relation {
Relation::Replacement(replacement) => {
Some((replacement.event_id, RelationType::Replacement))
}
Relation::Reply { in_reply_to } => {
Some((in_reply_to.event_id, RelationType::Reference))
}
Relation::Thread(thread) => {
Some((thread.event_id, RelationType::Thread))
}
_ => None,
}
} else {
None
}
}
Some(AnyMessageLikeEventContent::PollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::PollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::Reaction(c)) => {
Some((c.relates_to.event_id, RelationType::Annotation))
}
_ => None,
};
if let Some(relationship) = relationship {
cache
.relations
.entry(relationship.0)
.or_default()
.insert(ev.event_id().to_owned(), relationship.1);
}
}
}
}
async fn append_events_locked_impl(
&self,
room_events: &mut RoomEvents,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if sync_timeline_events.is_empty()
&& prev_batch.is_none()
&& ephemeral_events.is_empty()
&& ambiguity_changes.is_empty()
{
return Ok(());
}
{
if let Some(prev_token) = &prev_batch {
room_events.push_gap(Gap { prev_token: prev_token.clone() });
}
room_events.push_events(sync_timeline_events.clone());
let mut cache = self.all_events.write().await;
for ev in &sync_timeline_events {
if let Some(event_id) = ev.event_id() {
self.append_related_event(&mut cache, ev);
cache.events.insert(event_id.to_owned(), (self.room_id.clone(), ev.clone()));
}
}
}
if prev_batch.is_some() {
self.pagination_batch_token_notifier.notify_one();
}
{
if !sync_timeline_events.is_empty() {
let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents {
events: sync_timeline_events,
origin: EventsOrigin::Sync,
});
}
if !ephemeral_events.is_empty() {
let _ = self
.sender
.send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
}
if !ambiguity_changes.is_empty() {
let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
}
}
Ok(())
}
}
pub(super) struct RoomEventCacheState {
pub events: RoomEvents,
pub waited_for_initial_prev_token: bool,
}
impl RoomEventCacheState {
pub(super) fn reset(&mut self) {
self.events.reset();
self.waited_for_initial_prev_token = false;
}
}
#[cfg(test)]
mod tests {
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_test::async_test;
use ruma::{
event_id,
events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
room_id, user_id, RoomId,
};
use crate::test_utils::{events::EventFactory, logged_in_client};
#[async_test]
async fn test_event_with_redaction_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.redaction(original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_edit_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.text_msg("* An edited event")
.edit(
original_id,
RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
)
.event_id(related_id)
.into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_reply_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_thread_reply_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_reaction_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.text_msg("Original event").event_id(original_id).into(),
f.reaction(original_id, ":D".to_owned()).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_poll_response_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.poll_start("Poll start event", "A poll question", vec!["An answer"])
.event_id(original_id)
.into(),
f.poll_response("1", original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_poll_end_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let room_id = room_id!("!galette:saucisse.bzh");
let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
assert_relations(
room_id,
f.poll_start("Poll start event", "A poll question", vec!["An answer"])
.event_id(original_id)
.into(),
f.poll_end("Poll ended", original_id).event_id(related_id).into(),
f,
)
.await;
}
#[async_test]
async fn test_event_with_filtered_relationships() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let associated_related_id = event_id!("$recursive_related");
let room_id = room_id!("!galette:saucisse.bzh");
let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
let related_event = event_factory
.text_msg("* Edited event")
.edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
.event_id(related_id)
.into();
let associated_related_event =
event_factory.redaction(related_id).event_id(associated_related_id).into();
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
room_event_cache.save_event(original_event).await;
room_event_cache.save_event(related_event).await;
room_event_cache.save_event(associated_related_event).await;
let filter = Some(vec![RelationType::Replacement]);
let (event, related_events) =
room_event_cache.event_with_relations(original_id, filter).await.unwrap();
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert_eq!(related_events.len(), 2);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
let related_event_id = related_events[1].event_id().unwrap();
assert_eq!(related_event_id, associated_related_id);
let filter = Some(vec![RelationType::Thread]);
let (event, related_events) =
room_event_cache.event_with_relations(original_id, filter).await.unwrap();
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert!(related_events.is_empty());
}
#[async_test]
async fn test_event_with_recursive_relation() {
let original_id = event_id!("$original");
let related_id = event_id!("$related");
let associated_related_id = event_id!("$recursive_related");
let room_id = room_id!("!galette:saucisse.bzh");
let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
let related_event = event_factory
.text_msg("* Edited event")
.edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
.event_id(related_id)
.into();
let associated_related_event =
event_factory.redaction(related_id).event_id(associated_related_id).into();
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
room_event_cache.save_event(original_event).await;
room_event_cache.save_event(related_event).await;
room_event_cache.save_event(associated_related_event).await;
let (event, related_events) =
room_event_cache.event_with_relations(original_id, None).await.unwrap();
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_id);
assert_eq!(related_events.len(), 2);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
let related_event_id = related_events[1].event_id().unwrap();
assert_eq!(related_event_id, associated_related_id);
}
async fn assert_relations(
room_id: &RoomId,
original_event: SyncTimelineEvent,
related_event: SyncTimelineEvent,
event_factory: EventFactory,
) {
let client = logged_in_client(None).await;
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
let room = client.get_room(room_id).unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let original_event_id = original_event.event_id().unwrap();
room_event_cache.save_event(original_event).await;
let unrelated_id = event_id!("$2");
room_event_cache
.save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
.await;
let related_id = related_event.event_id().unwrap();
room_event_cache.save_event(related_event).await;
let (event, related_events) =
room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
let cached_event_id = event.event_id().unwrap();
assert_eq!(cached_event_id, original_event_id);
assert_eq!(related_events.len(), 1);
let related_event_id = related_events[0].event_id().unwrap();
assert_eq!(related_event_id, related_id);
}
}