matrix_sdk_crypto/store/
crypto_store_wrapper.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
use std::{future, ops::Deref, sync::Arc};

use futures_core::Stream;
use futures_util::StreamExt;
use matrix_sdk_common::store_locks::CrossProcessStoreLock;
use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
use tokio::sync::{broadcast, Mutex};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{debug, trace, warn};

use super::{caches::SessionStore, DeviceChanges, IdentityChanges, LockableCryptoStore};
use crate::{
    olm::InboundGroupSession,
    store,
    store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
    CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
};

/// A wrapper for crypto store implementations that adds update notifiers.
///
/// This is shared between [`StoreInner`] and
/// [`crate::verification::VerificationStore`].
#[derive(Debug)]
pub(crate) struct CryptoStoreWrapper {
    user_id: OwnedUserId,
    device_id: OwnedDeviceId,

    store: Arc<DynCryptoStore>,

    /// A cache for the Olm Sessions.
    sessions: SessionStore,

    /// The sender side of a broadcast stream that is notified whenever we get
    /// an update to an inbound group session.
    room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,

    /// The sender side of a broadcast stream that is notified whenever we
    /// receive an `m.room_key.withheld` message.
    room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,

    /// The sender side of a broadcast channel which sends out secrets we
    /// received as a `m.secret.send` event.
    secrets_broadcaster: broadcast::Sender<GossippedSecret>,

    /// The sender side of a broadcast channel which sends out devices and user
    /// identities which got updated or newly created.
    identities_broadcaster:
        broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
}

impl CryptoStoreWrapper {
    pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
        let room_keys_received_sender = broadcast::Sender::new(10);
        let room_keys_withheld_received_sender = broadcast::Sender::new(10);
        let secrets_broadcaster = broadcast::Sender::new(10);
        // The identities broadcaster is responsible for user identities as well as
        // devices, that's why we increase the capacity here.
        let identities_broadcaster = broadcast::Sender::new(20);

        Self {
            user_id: user_id.to_owned(),
            device_id: device_id.to_owned(),
            store: store.into_crypto_store(),
            sessions: SessionStore::new(),
            room_keys_received_sender,
            room_keys_withheld_received_sender,
            secrets_broadcaster,
            identities_broadcaster,
        }
    }

    /// Save the set of changes to the store.
    ///
    /// Also responsible for sending updates to the broadcast streams such as
    /// `room_keys_received_sender` and `secrets_broadcaster`.
    ///
    /// # Arguments
    ///
    /// * `changes` - The set of changes that should be stored.
    pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
        let room_key_updates: Vec<_> =
            changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();

        let withheld_session_updates: Vec<_> = changes
            .withheld_session_info
            .iter()
            .flat_map(|(room_id, session_map)| {
                session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
                    room_id: room_id.to_owned(),
                    session_id: session_id.to_owned(),
                    withheld_event: withheld_event.clone(),
                })
            })
            .collect();

        // If our own identity verified status changes we need to do some checks on
        // other identities. So remember the verification status before
        // processing the changes
        let own_identity_was_verified_before_change = self
            .store
            .get_user_identity(self.user_id.as_ref())
            .await?
            .as_ref()
            .and_then(|i| i.own())
            .is_some_and(|own| own.is_verified());

        let secrets = changes.secrets.to_owned();
        let devices = changes.devices.to_owned();
        let identities = changes.identities.to_owned();

        if devices
            .changed
            .iter()
            .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
        {
            // If our own device key changes, we need to clear the
            // session cache because the sessions contain a copy of our
            // device key.
            self.sessions.clear().await;
        } else {
            // Otherwise add the sessions to the cache.
            for session in &changes.sessions {
                self.sessions.add(session.clone()).await;
            }
        }

        self.store.save_changes(changes).await?;

        // If we updated our own public identity, log it for debugging purposes
        if tracing::level_enabled!(tracing::Level::DEBUG) {
            for updated_identity in
                identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
            {
                let master_key = updated_identity.master_key().get_first_key();
                let user_signing_key = updated_identity.user_signing_key().get_first_key();
                let self_signing_key = updated_identity.self_signing_key().get_first_key();

                debug!(
                    ?master_key,
                    ?user_signing_key,
                    ?self_signing_key,
                    previously_verified = updated_identity.was_previously_verified(),
                    verified = updated_identity.is_verified(),
                    "Stored our own identity"
                );
            }
        }

        if !room_key_updates.is_empty() {
            // Ignore the result. It can only fail if there are no listeners.
            let _ = self.room_keys_received_sender.send(room_key_updates);
        }

        if !withheld_session_updates.is_empty() {
            let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
        }

        for secret in secrets {
            let _ = self.secrets_broadcaster.send(secret);
        }

        if !devices.is_empty() || !identities.is_empty() {
            // Mapping the devices and user identities from the read-only variant to one's
            // that contain side-effects requires our own identity. This is
            // guaranteed to be up-to-date since we just persisted it.
            let maybe_own_identity =
                self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());

            // If our identity was not verified before the change and is now, that means
            // this could impact the verification chain of other known
            // identities.
            if let Some(own_identity_after) = maybe_own_identity.as_ref() {
                // Only do this if our identity is passing from not verified to verified,
                // the previously_verified can only change in that case.
                if !own_identity_was_verified_before_change && own_identity_after.is_verified() {
                    debug!("Own identity is now verified, check all known identities for verification status changes");
                    // We need to review all the other identities to see if they are verified now
                    // and mark them as such
                    self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
                        own_identity_after,
                    )
                    .await?;
                }
            }

            let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
        }

        Ok(())
    }

    async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
        &self,
        own_identity_after: &OwnUserIdentityData,
    ) -> Result<(), CryptoStoreError> {
        let tracked_users = self.store.load_tracked_users().await?;
        let mut updated_identities: Vec<UserIdentityData> = Default::default();
        for tracked_user in tracked_users {
            if let Some(other_identity) = self
                .store
                .get_user_identity(tracked_user.user_id.as_ref())
                .await?
                .as_ref()
                .and_then(|i| i.other())
            {
                if !other_identity.was_previously_verified()
                    && own_identity_after.is_identity_signed(other_identity)
                {
                    trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
                    other_identity.mark_as_previously_verified();
                    updated_identities.push(other_identity.clone().into());
                }
            }
        }

        if !updated_identities.is_empty() {
            let identity_changes =
                IdentityChanges { changed: updated_identities, ..Default::default() };
            self.store
                .save_changes(Changes {
                    identities: identity_changes.clone(),
                    ..Default::default()
                })
                .await?;

            let _ = self.identities_broadcaster.send((
                Some(own_identity_after.clone()),
                identity_changes,
                DeviceChanges::default(),
            ));
        }

        Ok(())
    }

    pub async fn get_sessions(
        &self,
        sender_key: &str,
    ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
        let sessions = self.sessions.get(sender_key).await;

        let sessions = if sessions.is_none() {
            let mut entries = self.sessions.entries.write().await;

            let sessions = entries.get(sender_key);

            if sessions.is_some() {
                sessions.cloned()
            } else {
                let sessions = self.store.get_sessions(sender_key).await?;
                let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));

                entries.insert(sender_key.to_owned(), sessions.clone());

                Some(sessions)
            }
        } else {
            sessions
        };

        Ok(sessions)
    }

    /// Save a list of inbound group sessions to the store.
    ///
    /// # Arguments
    ///
    /// * `sessions` - The sessions to be saved.
    /// * `backed_up_to_version` - If the keys should be marked as having been
    ///   backed up, the version of the backup.
    ///
    /// Note: some implementations ignore `backup_version` and assume the
    /// current backup version, which is normally the same.
    pub async fn save_inbound_group_sessions(
        &self,
        sessions: Vec<InboundGroupSession>,
        backed_up_to_version: Option<&str>,
    ) -> store::Result<()> {
        let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
        self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;

        if !room_key_updates.is_empty() {
            // Ignore the result. It can only fail if there are no listeners.
            let _ = self.room_keys_received_sender.send(room_key_updates);
        }
        Ok(())
    }

    /// Receive notifications of room keys being received as a [`Stream`].
    ///
    /// Each time a room key is updated in any way, an update will be sent to
    /// the stream. Updates that happen at the same time are batched into a
    /// [`Vec`].
    ///
    /// If the reader of the stream lags too far behind, a warning will be
    /// logged and items will be dropped.
    pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
        let stream = BroadcastStream::new(self.room_keys_received_sender.subscribe());
        Self::filter_errors_out_of_stream(stream, "room_keys_received_stream")
    }

    /// Receive notifications of received `m.room_key.withheld` messages.
    ///
    /// Each time an `m.room_key.withheld` is received and stored, an update
    /// will be sent to the stream. Updates that happen at the same time are
    /// batched into a [`Vec`].
    ///
    /// If the reader of the stream lags too far behind, a warning will be
    /// logged and items will be dropped.
    pub fn room_keys_withheld_received_stream(
        &self,
    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
        let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
        Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
    }

    /// Receive notifications of gossipped secrets being received and stored in
    /// the secret inbox as a [`Stream`].
    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
        let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
        Self::filter_errors_out_of_stream(stream, "secrets_stream")
    }

    /// Returns a stream of newly created or updated cryptographic identities.
    ///
    /// This is just a helper method which allows us to build higher level
    /// device and user identity streams.
    pub(super) fn identities_stream(
        &self,
    ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> {
        let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
        Self::filter_errors_out_of_stream(stream, "identities_stream")
    }

    /// Helper for *_stream functions: filters errors out of the stream,
    /// creating a new Stream.
    ///
    /// `BroadcastStream`s gives us `Result`s which can fail with
    /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
    /// to work with, so here we just emit a warning and drop the errors.
    fn filter_errors_out_of_stream<ItemType>(
        stream: BroadcastStream<ItemType>,
        stream_name: &str,
    ) -> impl Stream<Item = ItemType>
    where
        ItemType: 'static + Clone + Send,
    {
        let stream_name = stream_name.to_owned();
        stream.filter_map(move |result| {
            future::ready(match result {
                Ok(r) => Some(r),
                Err(BroadcastStreamRecvError::Lagged(lag)) => {
                    warn!("{stream_name} missed {lag} updates");
                    None
                }
            })
        })
    }

    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
    /// given key and value when hold.
    pub(crate) fn create_store_lock(
        &self,
        lock_key: String,
        lock_value: String,
    ) -> CrossProcessStoreLock<LockableCryptoStore> {
        CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
    }
}

impl Deref for CryptoStoreWrapper {
    type Target = DynCryptoStore;

    fn deref(&self) -> &Self::Target {
        self.store.deref()
    }
}

#[cfg(test)]
mod test {
    use matrix_sdk_test::async_test;
    use ruma::user_id;

    use super::*;
    use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;

    #[async_test]
    async fn test_cache_cleared_after_device_update() {
        let user_id = user_id!("@alice:example.com");
        let (first, second) =
            get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;

        let sender_key = second.identity_keys().curve25519.to_base64();

        first
            .store()
            .inner
            .store
            .sessions
            .get(&sender_key)
            .await
            .expect("We should have a session in the cache.");

        let device_data = first
            .get_device(user_id, first.device_id(), None)
            .await
            .unwrap()
            .expect("We should have access to our own device.")
            .inner;

        // When we save a new version of our device keys
        first
            .store()
            .save_changes(Changes {
                devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
                ..Default::default()
            })
            .await
            .unwrap();

        // Then the session is no longer in the cache
        assert!(
            first.store().inner.store.sessions.get(&sender_key).await.is_none(),
            "The session should no longer be in the cache after our own device keys changed"
        );
    }
}