use std::{
collections::{BTreeMap, BTreeSet},
fmt,
ops::Deref,
result::Result as StdResult,
str::Utf8Error,
sync::{Arc, RwLock as StdRwLock},
};
use eyeball_im::{Vector, VectorDiff};
use futures_util::Stream;
use once_cell::sync::OnceCell;
#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
mod observable_map;
mod traits;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use observable_map::ObservableMap;
use ruma::{
events::{
presence::PresenceEvent,
receipt::ReceiptEventContent,
room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::warn;
use crate::{
event_cache::store as event_cache_store,
rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
pub(crate) mod ambiguity_map;
mod memory_store;
pub mod migration_helpers;
mod send_queue;
#[cfg(any(test, feature = "testing"))]
pub use self::integration_tests::StateStoreIntegrationTests;
pub use self::{
memory_store::MemoryStore,
send_queue::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
SentMediaInfo, SentRequestKey, SerializableEventContent,
},
traits::{
ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
},
};
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error(transparent)]
Backend(Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Identifier(#[from] ruma::IdParseError),
#[error("The store failed to be unlocked")]
StoreLocked,
#[error("The store is not encrypted but was tried to be opened with a passphrase")]
UnencryptedStore,
#[error("Error encrypting or decrypting data from the store: {0}")]
Encryption(#[from] StoreEncryptionError),
#[error("Error encoding or decoding data from the store: {0}")]
Codec(#[from] Utf8Error),
#[error(
"The database format changed in an incompatible way, current \
version: {0}, latest version: {1}"
)]
UnsupportedDatabaseVersion(usize, usize),
#[error("Redaction failed: {0}")]
Redaction(#[source] ruma::canonical_json::RedactionError),
}
impl StoreError {
#[inline]
pub fn backend<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Self::Backend(Box::new(error))
}
}
pub type Result<T, E = StoreError> = std::result::Result<T, E>;
#[derive(Clone)]
pub(crate) struct Store {
pub(super) inner: Arc<DynStateStore>,
session_meta: Arc<OnceCell<SessionMeta>>,
pub(super) sync_token: Arc<RwLock<Option<String>>>,
rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
sync_lock: Arc<Mutex<()>>,
}
impl Store {
pub fn new(inner: Arc<DynStateStore>) -> Self {
Self {
inner,
session_meta: Default::default(),
sync_token: Default::default(),
rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
sync_lock: Default::default(),
}
}
pub fn sync_lock(&self) -> &Mutex<()> {
&self.sync_lock
}
async fn load_room_infos(&self) -> Result<Vec<RoomInfo>> {
let mut room_infos = self.inner.get_room_infos().await?;
let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
for room_info in room_infos.iter_mut() {
if room_info.apply_migrations(self.inner.clone()).await {
migrated_room_infos.push(room_info.clone());
}
}
if !migrated_room_infos.is_empty() {
let changes = StateChanges {
room_infos: migrated_room_infos
.into_iter()
.map(|room_info| (room_info.room_id.clone(), room_info))
.collect(),
..Default::default()
};
if let Err(error) = self.inner.save_changes(&changes).await {
warn!("Failed to save migrated room infos: {error}");
}
}
Ok(room_infos)
}
pub async fn set_session_meta(
&self,
session_meta: SessionMeta,
room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
) -> Result<()> {
{
let room_infos = self.load_room_infos().await?;
let mut rooms = self.rooms.write().unwrap();
for room_info in room_infos {
let new_room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
room_info,
room_info_notable_update_sender.clone(),
);
let new_room_id = new_room.room_id().to_owned();
rooms.insert(new_room_id, new_room);
}
}
let token =
self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
*self.sync_token.write().await = token;
self.session_meta.set(session_meta).expect("Session Meta was already set");
Ok(())
}
pub fn session_meta(&self) -> Option<&SessionMeta> {
self.session_meta.get()
}
pub fn rooms(&self) -> Vec<Room> {
self.rooms.read().unwrap().iter().cloned().collect()
}
pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
self.rooms
.read()
.unwrap()
.iter()
.filter(|room| filter.matches(room.state()))
.cloned()
.collect()
}
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.rooms.read().unwrap().stream()
}
pub fn room(&self, room_id: &RoomId) -> Option<Room> {
self.rooms.read().unwrap().get(room_id).cloned()
}
#[cfg(feature = "experimental-sliding-sync")]
pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
self.rooms.read().unwrap().get(room_id).is_some()
}
pub fn get_or_create_room(
&self,
room_id: &RoomId,
room_type: RoomState,
room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
) -> Room {
let user_id =
&self.session_meta.get().expect("Creating room while not being logged in").user_id;
self.rooms
.write()
.unwrap()
.get_or_create(room_id, || {
Room::new(
user_id,
self.inner.clone(),
room_id,
room_type,
room_info_notable_update_sender,
)
})
.clone()
}
pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
self.inner.remove_room(room_id).await?;
self.rooms.write().unwrap().remove(room_id);
Ok(())
}
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for Store {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Store")
.field("inner", &self.inner)
.field("session_meta", &self.session_meta)
.field("sync_token", &self.sync_token)
.field("rooms", &self.rooms)
.finish_non_exhaustive()
}
}
impl Deref for Store {
type Target = DynStateStore;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
#[derive(Clone, Debug, Default)]
pub struct StateChanges {
pub sync_token: Option<String>,
pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
pub state:
BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
pub room_account_data:
BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
pub stripped_state: BTreeMap<
OwnedRoomId,
BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
>,
pub ambiguity_maps: BTreeMap<OwnedRoomId, BTreeMap<String, BTreeSet<OwnedUserId>>>,
}
impl StateChanges {
pub fn new(sync_token: String) -> Self {
Self { sync_token: Some(sync_token), ..Default::default() }
}
pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
self.presence.insert(event.sender, raw_event);
}
pub fn add_room(&mut self, room: RoomInfo) {
self.room_infos.insert(room.room_id.clone(), room);
}
pub fn add_room_account_data(
&mut self,
room_id: &RoomId,
event: AnyRoomAccountDataEvent,
raw_event: Raw<AnyRoomAccountDataEvent>,
) {
self.room_account_data
.entry(room_id.to_owned())
.or_default()
.insert(event.event_type(), raw_event);
}
pub fn add_stripped_member(
&mut self,
room_id: &RoomId,
user_id: &UserId,
event: Raw<StrippedRoomMemberEvent>,
) {
self.stripped_state
.entry(room_id.to_owned())
.or_default()
.entry(StateEventType::RoomMember)
.or_default()
.insert(user_id.into(), event.cast());
}
pub fn add_state_event(
&mut self,
room_id: &RoomId,
event: AnySyncStateEvent,
raw_event: Raw<AnySyncStateEvent>,
) {
self.state
.entry(room_id.to_owned())
.or_default()
.entry(event.event_type())
.or_default()
.insert(event.state_key().to_owned(), raw_event);
}
pub fn add_redaction(
&mut self,
room_id: &RoomId,
redacted_event_id: &EventId,
redaction: Raw<SyncRoomRedactionEvent>,
) {
self.redactions
.entry(room_id.to_owned())
.or_default()
.insert(redacted_event_id.to_owned(), redaction);
}
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
self.receipts.insert(room_id.to_owned(), event);
}
}
#[derive(Clone)]
pub struct StoreConfig {
#[cfg(feature = "e2e-encryption")]
pub(crate) crypto_store: Arc<DynCryptoStore>,
pub(crate) state_store: Arc<DynStateStore>,
pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
cross_process_store_locks_holder_name: String,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for StoreConfig {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
fmt.debug_struct("StoreConfig").finish()
}
}
impl StoreConfig {
#[must_use]
pub fn new(cross_process_store_locks_holder_name: String) -> Self {
Self {
#[cfg(feature = "e2e-encryption")]
crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
state_store: Arc::new(MemoryStore::new()),
event_cache_store: event_cache_store::EventCacheStoreLock::new(
event_cache_store::MemoryStore::new(),
cross_process_store_locks_holder_name.clone(),
),
cross_process_store_locks_holder_name,
}
}
#[cfg(feature = "e2e-encryption")]
pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
self.crypto_store = store.into_crypto_store();
self
}
pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
self.state_store = store.into_state_store();
self
}
pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
where
S: event_cache_store::IntoEventCacheStore,
{
self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
event_cache_store,
self.cross_process_store_locks_holder_name.clone(),
);
self
}
}