use std::{
collections::{btree_map, BTreeMap},
fmt,
time::Duration,
};
pub use matrix_sdk_base::sync::*;
use matrix_sdk_base::{
debug::{DebugInvitedRoom, DebugKnockedRoom, DebugListOfRawEventsNoId},
sync::SyncResponse as BaseSyncResponse,
};
use ruma::{
api::client::sync::sync_events::{
self,
v3::{InvitedRoom, KnockedRoom},
},
events::{presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyToDeviceEvent},
serde::Raw,
time::Instant,
OwnedRoomId, RoomId,
};
use tracing::{debug, error, warn};
use crate::{event_handler::HandlerKind, Client, Result, Room};
#[derive(Clone, Default)]
pub struct SyncResponse {
pub next_batch: String,
pub rooms: RoomUpdates,
pub presence: Vec<Raw<PresenceEvent>>,
pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
pub to_device: Vec<Raw<AnyToDeviceEvent>>,
pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
}
impl SyncResponse {
pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
base_response;
Self { next_batch, rooms, presence, account_data, to_device, notifications }
}
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for SyncResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SyncResponse")
.field("next_batch", &self.next_batch)
.field("rooms", &self.rooms)
.field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
.field("to_device", &DebugListOfRawEventsNoId(&self.to_device))
.field("notifications", &self.notifications)
.finish_non_exhaustive()
}
}
#[derive(Clone)]
pub enum RoomUpdate {
Left {
room: Room,
updates: LeftRoomUpdate,
},
Joined {
room: Room,
updates: JoinedRoomUpdate,
},
Invited {
room: Room,
updates: InvitedRoom,
},
Knocked {
room: Room,
updates: KnockedRoom,
},
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for RoomUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Left { room, updates } => {
f.debug_struct("Left").field("room", room).field("updates", updates).finish()
}
Self::Joined { room, updates } => {
f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
}
Self::Invited { room, updates } => f
.debug_struct("Invited")
.field("room", room)
.field("updates", &DebugInvitedRoom(updates))
.finish(),
Self::Knocked { room, updates } => f
.debug_struct("Knocked")
.field("room", room)
.field("updates", &DebugKnockedRoom(updates))
.finish(),
}
}
}
impl Client {
pub(crate) async fn process_sync(
&self,
response: sync_events::v3::Response,
) -> Result<BaseSyncResponse> {
let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
#[cfg(feature = "e2e-encryption")]
self.encryption().backups().maybe_trigger_backup();
self.call_sync_response_handlers(&response).await?;
Ok(response)
}
#[tracing::instrument(skip(self, response))]
pub(crate) async fn call_sync_response_handlers(
&self,
response: &BaseSyncResponse,
) -> Result<()> {
let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
let now = Instant::now();
self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
self.handle_sync_events(HandlerKind::ToDevice, None, to_device).await?;
let _ = self.inner.room_updates_sender.send(rooms.clone());
for (room_id, room_info) in &rooms.join {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
};
self.send_room_update(room_id, || RoomUpdate::Joined {
room: room.clone(),
updates: room_info.clone(),
});
let JoinedRoomUpdate {
unread_notifications: _,
timeline,
state,
account_data,
ephemeral,
ambiguity_changes: _,
} = room_info;
let room = Some(&room);
self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
self.handle_sync_state_events(room, state).await?;
self.handle_sync_timeline_events(room, &timeline.events).await?;
self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
}
for (room_id, room_info) in &rooms.leave {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
};
self.send_room_update(room_id, || RoomUpdate::Left {
room: room.clone(),
updates: room_info.clone(),
});
let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
let room = Some(&room);
self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
self.handle_sync_state_events(room, state).await?;
self.handle_sync_timeline_events(room, &timeline.events).await?;
}
for (room_id, room_info) in &rooms.invite {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
};
self.send_room_update(room_id, || RoomUpdate::Invited {
room: room.clone(),
updates: room_info.clone(),
});
let invite_state = &room_info.invite_state.events;
self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
}
for (room_id, room_info) in &rooms.knocked {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
};
self.send_room_update(room_id, || RoomUpdate::Knocked {
room: room.clone(),
updates: room_info.clone(),
});
let knock_state = &room_info.knock_state.events;
self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
}
debug!("Ran event handlers in {:?}", now.elapsed());
let now = Instant::now();
let mut futures = Vec::new();
for handler in &*self.notification_handlers().await {
for (room_id, room_notifications) in notifications {
let Some(room) = self.get_room(room_id) else {
warn!(?room_id, "Can't call notification handler, room not found");
continue;
};
futures.extend(room_notifications.iter().map(|notification| {
(handler)(notification.clone(), room.clone(), self.clone())
}));
}
}
for fut in futures {
fut.await;
}
debug!("Ran notification handlers in {:?}", now.elapsed());
Ok(())
}
fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
if let btree_map::Entry::Occupied(entry) =
self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
{
let tx = entry.get();
if tx.receiver_count() == 0 {
entry.remove();
} else {
_ = tx.send(make_msg());
}
}
}
async fn sleep() {
#[cfg(target_arch = "wasm32")]
gloo_timers::future::TimeoutFuture::new(1_000).await;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(Duration::from_secs(1)).await;
}
pub(crate) async fn sync_loop_helper(
&self,
sync_settings: &mut crate::config::SyncSettings,
) -> Result<SyncResponse> {
let response = self.sync_once(sync_settings.clone()).await;
match response {
Ok(r) => {
sync_settings.token = Some(r.next_batch.clone());
Ok(r)
}
Err(e) => {
error!("Received an invalid response: {e}");
Err(e)
}
}
}
pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
let now = Instant::now();
if let Some(t) = last_sync_time {
if now - *t <= Duration::from_secs(1) {
Self::sleep().await;
}
}
*last_sync_time = Some(now);
}
}