#![cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
use std::collections::BTreeMap;
use async_stream::stream;
use futures_core::Stream;
use futures_util::{stream_select, StreamExt};
use matrix_sdk_base::crypto::{
IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
};
use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use super::Room;
use crate::{
encryption::identities::{IdentityUpdates, UserIdentity},
event_handler::EventHandlerDropGuard,
Client, Error, Result,
};
#[derive(Debug)]
pub struct IdentityStatusChanges {
room_identity_state: RoomIdentityState<Room>,
_drop_guard: EventHandlerDropGuard,
}
impl IdentityStatusChanges {
pub async fn create_stream(
room: Room,
) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
let identity_updates = wrap_identity_updates(&room.client).await?;
let (drop_guard, room_member_events) = wrap_room_member_events(&room);
let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
let mut state = IdentityStatusChanges {
room_identity_state: RoomIdentityState::new(room).await,
_drop_guard: drop_guard,
};
Ok(stream!({
let mut current_state =
filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
if !current_state.is_empty() {
current_state.sort();
yield current_state;
}
while let Some(item) = unprocessed_stream.next().await {
let mut update = filter_non_self(
state.room_identity_state.process_change(item).await,
&own_user_id,
);
if !update.is_empty() {
update.sort();
yield update;
}
}
}))
}
}
fn filter_for_initial_update(
mut input: Vec<IdentityStatusChange>,
own_user_id: &UserId,
) -> Vec<IdentityStatusChange> {
input.retain(|change| {
change.user_id != own_user_id && change.changed_to != IdentityState::Verified
});
input
}
fn filter_non_self(
mut input: Vec<IdentityStatusChange>,
own_user_id: &UserId,
) -> Vec<IdentityStatusChange> {
input.retain(|change| change.user_id != own_user_id);
input
}
fn combine_streams(
identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
) -> impl Stream<Item = RoomIdentityChange> {
stream_select!(identity_updates, room_member_events)
}
async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
Ok(client
.encryption()
.user_identities_stream()
.await?
.map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
}
fn to_base_updates(input: IdentityUpdates) -> matrix_sdk_base::crypto::store::IdentityUpdates {
matrix_sdk_base::crypto::store::IdentityUpdates {
new: to_base_identities(input.new),
changed: to_base_identities(input.changed),
unchanged: Default::default(),
}
}
fn to_base_identities(
input: BTreeMap<OwnedUserId, UserIdentity>,
) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
}
fn wrap_room_member_events(
room: &Room,
) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
let own_user_id = room.own_user_id().to_owned();
let room_id = room.room_id();
let (sender, receiver) = mpsc::channel(16);
let handle =
room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
if *event.state_key() == own_user_id {
return;
}
let _: Result<_, _> = sender.send(RoomIdentityChange::SyncRoomMemberEvent(event)).await;
});
let drop_guard = room.client.event_handler_drop_guard(handle);
(drop_guard, ReceiverStream::new(receiver))
}
#[cfg(test)]
mod tests {
use std::{
pin::{pin, Pin},
time::Duration,
};
use futures_core::Stream;
use futures_util::FutureExt;
use matrix_sdk_base::crypto::{IdentityState, IdentityStatusChange};
use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
use test_setup::TestSetup;
use tokio_stream::{StreamExt, Timeout};
#[async_test]
async fn test_when_user_becomes_unpinned_we_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.pin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.unpin_bob().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_user_becomes_verification_violation_we_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.verify_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.unpin_bob().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_user_becomes_pinned_we_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.pin_bob().await;
let change1 = next_change(&mut changes).await;
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Pinned);
assert_eq!(change2.len(), 1);
}
#[async_test]
async fn test_when_user_becomes_verified_we_dont_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.verify_bob().await;
t.unpin_bob().await;
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change2.len(), 1);
}
#[async_test]
async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.verify_bob().await;
let change1 = next_change(&mut changes).await;
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Verified);
assert_eq!(change2.len(), 1);
}
#[async_test]
async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.verify_bob_with(
IdentityChangeDataSet::key_query_with_identity_b(),
IdentityChangeDataSet::master_signing_keys_b(),
IdentityChangeDataSet::self_signing_keys_b(),
)
.await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.verify_bob().await;
let change1 = next_change(&mut changes).await;
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change1.len(), 1);
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Verified);
assert_eq!(change2.len(), 1);
}
#[async_test]
async fn test_when_an_unpinned_user_joins_we_report_it() {
let mut t = TestSetup::new_just_me_room().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.bob_joins().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_an_verification_violating_user_joins_we_report_it() {
let mut t = TestSetup::new_just_me_room().await;
t.verify_bob().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.bob_joins().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_a_verified_user_joins_we_dont_report_it() {
let mut t = TestSetup::new_just_me_room().await;
t.verify_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.bob_joins().await;
t.unpin_bob().await;
let mut changes = pin!(changes);
let change = next_change(&mut changes).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_a_pinned_user_joins_we_do_not_report() {
let mut t = TestSetup::new_just_me_room().await;
t.pin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.bob_joins().await;
tokio::time::sleep(Duration::from_millis(200)).await;
let change = changes.next().now_or_never();
assert!(change.is_none());
}
#[async_test]
async fn test_when_an_unpinned_user_leaves_we_report_it() {
let mut t = TestSetup::new_room_with_other_bob().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.bob_leaves().await;
let change1 = next_change(&mut changes).await;
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Pinned);
assert_eq!(change2.len(), 1);
}
#[async_test]
async fn test_multiple_identity_changes_are_reported() {
let mut t = TestSetup::new_just_me_room().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
t.bob_joins().await;
let change1 = next_change(&mut changes).await;
t.pin_bob().await;
let change2 = next_change(&mut changes).await;
t.bob_leaves().await;
t.bob_joins().await;
t.unpin_bob().await;
let change3 = next_change(&mut changes).await;
t.bob_leaves().await;
let change4 = next_change(&mut changes).await;
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change3[0].user_id, t.bob_user_id());
assert_eq!(change4[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change2[0].changed_to, IdentityState::Pinned);
assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
assert_eq!(change4[0].changed_to, IdentityState::Pinned);
assert_eq!(change1.len(), 1);
assert_eq!(change2.len(), 1);
assert_eq!(change3.len(), 1);
assert_eq!(change4.len(), 1);
}
#[async_test]
async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
let t = TestSetup::new_room_with_other_bob().await;
t.unpin_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
}
#[async_test]
async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
let t = TestSetup::new_room_with_other_bob().await;
t.verify_bob().await;
let changes = t.subscribe_to_identity_status_changes().await;
t.unpin_bob().await;
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
}
async fn next_change(
changes: &mut Pin<&mut Timeout<impl Stream<Item = Vec<IdentityStatusChange>>>>,
) -> Vec<IdentityStatusChange> {
changes
.next()
.await
.expect("Should not reach end of changes stream")
.expect("Should not time out waiting for a change")
}
mod test_setup {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures_core::Stream;
use matrix_sdk_base::{
crypto::{
testing::simulate_key_query_response_for_verification, IdentityStatusChange,
OtherUserIdentity,
},
RoomState,
};
use matrix_sdk_test::{
test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
};
use ruma::{
api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
events::room::member::MembershipState,
owned_user_id, OwnedUserId, TransactionId, UserId,
};
use serde_json::json;
use tokio_stream::{StreamExt as _, Timeout};
use wiremock::{
matchers::{header, method, path_regex},
Mock, MockServer, ResponseTemplate,
};
use crate::{
encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
};
pub(super) struct TestSetup {
client: Client,
bob_user_id: OwnedUserId,
sync_response_builder: SyncResponseBuilder,
room: Room,
}
impl TestSetup {
pub(super) async fn new_just_me_room() -> Self {
let (client, user_id, mut sync_response_builder) = Self::init().await;
let room = create_just_me_room(&client, &mut sync_response_builder).await;
Self { client, bob_user_id: user_id, sync_response_builder, room }
}
pub(super) async fn new_room_with_other_bob() -> Self {
let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
let room = create_room_with_other_member(
&mut sync_response_builder,
&client,
&bob_user_id,
)
.await;
Self { client, bob_user_id, sync_response_builder, room }
}
pub(super) fn bob_user_id(&self) -> &UserId {
&self.bob_user_id
}
pub(super) async fn pin_bob(&self) {
if self.bob_user_identity().await.is_some() {
assert!(
!self.bob_is_pinned().await,
"pin_bob() called when the identity is already pinned!"
);
self.bob_user_identity()
.await
.expect("User should exist")
.pin()
.await
.expect("Should not fail to pin");
} else {
self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
.await;
}
assert!(self.bob_is_pinned().await);
}
pub(super) async fn unpin_bob(&self) {
self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
}
pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
serde_json::to_string(
key_query_response
.master_keys
.first_key_value()
.expect("Master key should have a value")
.1,
)
.expect("Should be able to serialise master key")
}
let a = IdentityChangeDataSet::key_query_with_identity_a();
let b = IdentityChangeDataSet::key_query_with_identity_b();
let requested_master_key = master_key_json(&requested);
let a_master_key = master_key_json(&a);
if requested_master_key == a_master_key {
self.change_bob_identity(b).await;
if !self.bob_is_pinned().await {
self.pin_bob().await;
}
self.change_bob_identity(a).await;
} else {
self.change_bob_identity(a).await;
if !self.bob_is_pinned().await {
self.pin_bob().await;
}
self.change_bob_identity(b).await;
}
assert!(!self.bob_is_pinned().await);
}
pub(super) async fn verify_bob(&self) {
self.verify_bob_with(
IdentityChangeDataSet::key_query_with_identity_a(),
IdentityChangeDataSet::master_signing_keys_a(),
IdentityChangeDataSet::self_signing_keys_a(),
)
.await;
}
pub(super) async fn verify_bob_with(
&self,
key_query: KeyQueryResponse,
master_signing_key: serde_json::Value,
self_signing_key: serde_json::Value,
) {
self.change_bob_identity(key_query).await;
let my_user_id = self.client.user_id().expect("I should have a user id");
let my_identity = self
.client
.encryption()
.get_user_identity(my_user_id)
.await
.expect("Should not fail to get own user identity")
.expect("Should have an own user identity")
.underlying_identity()
.own()
.expect("Our own identity should be of type Own");
let signature_upload_request = self
.bob_crypto_other_identity()
.await
.verify()
.await
.expect("Should be able to verify other identity");
let verification_response = simulate_key_query_response_for_verification(
signature_upload_request,
my_identity,
my_user_id,
self.bob_user_id(),
master_signing_key,
self_signing_key,
);
self.client
.mark_request_as_sent(&TransactionId::new(), &verification_response)
.await
.unwrap();
assert!(self.bob_is_verified().await);
}
pub(super) async fn bob_joins(&mut self) {
self.bob_membership_change(MembershipState::Join).await;
}
pub(super) async fn bob_leaves(&mut self) {
self.bob_membership_change(MembershipState::Leave).await;
}
pub(super) async fn subscribe_to_identity_status_changes(
&self,
) -> Timeout<impl Stream<Item = Vec<IdentityStatusChange>>> {
self.room
.subscribe_to_identity_status_changes()
.await
.expect("Should be able to subscribe")
.timeout(Duration::from_secs(5))
}
async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
let (client, _server) = create_client_and_server().await;
client
.olm_machine()
.await
.as_ref()
.expect("We should have an Olm machine")
.bootstrap_cross_signing(true)
.await
.expect("Should be able to bootstrap cross-signing");
let bob_user_id = owned_user_id!("@bob:localhost");
let sync_response_builder = SyncResponseBuilder::default();
(client, bob_user_id, sync_response_builder)
}
async fn change_bob_identity(
&self,
key_query_response: get_keys::v3::Response,
) -> OtherUserIdentity {
self.client
.mark_request_as_sent(&TransactionId::new(), &key_query_response)
.await
.expect("Should not fail to send identity changes");
self.bob_crypto_other_identity().await
}
async fn bob_membership_change(&mut self, new_state: MembershipState) {
let sync_response = self
.sync_response_builder
.add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
StateTestEvent::Custom(sync_response_member(
&self.bob_user_id,
new_state.clone(),
)),
))
.build_sync_response();
self.room.client.process_sync(sync_response).await.unwrap();
let m = self
.room
.get_member_no_sync(&self.bob_user_id)
.await
.expect("Should not fail to get member");
match (&new_state, m) {
(MembershipState::Leave, None) => {}
(_, None) => {
panic!("Member should exist")
}
(_, Some(m)) => {
assert_eq!(*m.membership(), new_state);
}
};
}
async fn bob_is_pinned(&self) -> bool {
!self.bob_crypto_other_identity().await.identity_needs_user_approval()
}
async fn bob_is_verified(&self) -> bool {
self.bob_crypto_other_identity().await.is_verified()
}
async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
self.bob_user_identity()
.await
.expect("User identity should exist")
.underlying_identity()
.other()
.expect("Identity should be Other, not Own")
}
async fn bob_user_identity(&self) -> Option<UserIdentity> {
self.client
.encryption()
.get_user_identity(&self.bob_user_id)
.await
.expect("Should not fail to get user identity")
}
}
async fn create_just_me_room(
client: &Client,
sync_response_builder: &mut SyncResponseBuilder,
) -> Room {
let create_room_sync_response = sync_response_builder
.add_joined_room(
JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
.add_state_event(StateTestEvent::Member),
)
.build_sync_response();
client.process_sync(create_room_sync_response).await.unwrap();
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
assert_eq!(room.state(), RoomState::Joined);
room
}
async fn create_room_with_other_member(
builder: &mut SyncResponseBuilder,
client: &Client,
other_user_id: &UserId,
) -> Room {
let create_room_sync_response = builder
.add_joined_room(
JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
.add_state_event(StateTestEvent::Member)
.add_state_event(StateTestEvent::Custom(sync_response_member(
other_user_id,
MembershipState::Join,
))),
)
.build_sync_response();
client.process_sync(create_room_sync_response).await.unwrap();
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
assert_eq!(room.state(), RoomState::Joined);
assert_eq!(
*room
.get_member_no_sync(other_user_id)
.await
.expect("Should not fail to get member")
.expect("Member should exist")
.membership(),
MembershipState::Join
);
room
}
async fn create_client_and_server() -> (Client, MockServer) {
let server = MockServer::start().await;
mock_members_request(&server).await;
mock_secret_storage_default_key(&server).await;
let client = logged_in_client(Some(server.uri())).await;
(client, server)
}
async fn mock_members_request(server: &MockServer) {
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
.and(header("authorization", "Bearer 1234"))
.respond_with(
ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
)
.mount(server)
.await;
}
async fn mock_secret_storage_default_key(server: &MockServer) {
Mock::given(method("GET"))
.and(path_regex(
r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.mount(server)
.await;
}
fn sync_response_member(
user_id: &UserId,
membership: MembershipState,
) -> serde_json::Value {
json!({
"content": {
"membership": membership.to_string(),
},
"event_id": format!(
"$aa{}bb:localhost",
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
),
"origin_server_ts": 1472735824,
"sender": "@example:localhost",
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 1234
}
})
}
}
}