use std::{fmt::Display, sync::Arc};
use eyeball_im::ObservableVectorTransaction;
use ruma::MilliSecondsSinceUnixEpoch;
use tracing::{error, event_enabled, instrument, trace, warn, Level};
use super::{
controller::TimelineMetadata, util::timestamp_to_date, TimelineItem, TimelineItemKind,
VirtualTimelineItem,
};
pub(super) struct DayDividerAdjuster {
ops: Vec<DayDividerOperation>,
consumed: bool,
}
impl Drop for DayDividerAdjuster {
fn drop(&mut self) {
if !std::thread::panicking() && !self.consumed {
error!("a DayDividerAdjuster has not been consumed with run()");
}
}
}
impl Default for DayDividerAdjuster {
fn default() -> Self {
Self {
ops: Default::default(),
consumed: true,
}
}
}
struct PrevItemDesc<'a> {
item_index: usize,
item: &'a Arc<TimelineItem>,
insert_op_at: usize,
}
impl DayDividerAdjuster {
pub fn mark_used(&mut self) {
self.consumed = false;
}
#[instrument(skip_all)]
pub fn run(
&mut self,
items: &mut ObservableVectorTransaction<'_, Arc<TimelineItem>>,
meta: &mut TimelineMetadata,
) {
let mut prev_item: Option<PrevItemDesc<'_>> = None;
let mut latest_event_ts = None;
for (i, item) in items.iter().enumerate() {
match item.kind() {
TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(ts)) => {
if !self.handle_day_divider(i, *ts, prev_item.as_ref().map(|desc| desc.item)) {
prev_item = Some(PrevItemDesc {
item_index: i,
item,
insert_op_at: self.ops.len(),
});
}
}
TimelineItemKind::Event(event) => {
let ts = event.timestamp();
self.handle_event(i, ts, prev_item, latest_event_ts);
prev_item =
Some(PrevItemDesc { item_index: i, item, insert_op_at: self.ops.len() });
latest_event_ts = Some(ts);
}
TimelineItemKind::Virtual(VirtualTimelineItem::ReadMarker) => {
}
}
}
for (i, item) in items.iter().enumerate().rev() {
if item.is_day_divider() {
if !self
.ops
.iter()
.any(|op| matches!(op, DayDividerOperation::Remove(j) if i == *j))
{
trace!("removing trailing day divider @ {i}");
let index =
self.ops.iter().position(|op| op.index() > i).unwrap_or(self.ops.len());
self.ops.insert(index, DayDividerOperation::Remove(i));
}
}
if item.is_event() {
break;
}
}
let initial_state =
if event_enabled!(Level::TRACE) { Some(items.iter().cloned().collect()) } else { None };
self.process_ops(items, meta);
if let Some(report) = self.check_invariants(items, initial_state) {
warn!("Errors encountered when checking invariants.");
warn!("{report}");
#[cfg(any(debug_assertions, test))]
panic!("There was an error checking date separator invariants");
}
self.consumed = true;
}
#[inline]
fn handle_day_divider(
&mut self,
i: usize,
ts: MilliSecondsSinceUnixEpoch,
prev_item: Option<&Arc<TimelineItem>>,
) -> bool {
let Some(prev_item) = prev_item else {
return false;
};
match prev_item.kind() {
TimelineItemKind::Event(event) => {
if is_same_date_as(event.timestamp(), ts) {
trace!("removing day divider following event with same timestamp @ {i}");
self.ops.push(DayDividerOperation::Remove(i));
return true;
}
}
TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(_)) => {
trace!("removing duplicate day divider @ {i}");
self.ops.push(DayDividerOperation::Remove(i));
return true;
}
TimelineItemKind::Virtual(VirtualTimelineItem::ReadMarker) => {
}
}
false
}
#[inline]
fn handle_event(
&mut self,
i: usize,
ts: MilliSecondsSinceUnixEpoch,
prev_item_desc: Option<PrevItemDesc<'_>>,
latest_event_ts: Option<MilliSecondsSinceUnixEpoch>,
) {
let Some(PrevItemDesc { item_index, insert_op_at, item }) = prev_item_desc else {
trace!("inserting the first day divider @ {}", i);
self.ops.push(DayDividerOperation::Insert(i, ts));
return;
};
match item.kind() {
TimelineItemKind::Event(prev_event) => {
let prev_ts = prev_event.timestamp();
if !is_same_date_as(prev_ts, ts) {
trace!("inserting day divider @ {} between two events with different dates", i);
self.ops.push(DayDividerOperation::Insert(i, ts));
}
}
TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(prev_ts)) => {
let event_date = timestamp_to_date(ts);
if timestamp_to_date(*prev_ts) != event_date {
if let Some(last_event_ts) = latest_event_ts {
if timestamp_to_date(last_event_ts) == event_date {
trace!("removed day divider @ {item_index} between two events that have the same date");
self.ops.insert(insert_op_at, DayDividerOperation::Remove(item_index));
return;
}
}
trace!("replacing day divider @ {item_index} with new timestamp from event");
self.ops.insert(insert_op_at, DayDividerOperation::Replace(item_index, ts));
}
}
TimelineItemKind::Virtual(VirtualTimelineItem::ReadMarker) => {
}
}
}
fn process_ops(
&self,
items: &mut ObservableVectorTransaction<'_, Arc<TimelineItem>>,
meta: &mut TimelineMetadata,
) {
let mut offset = 0i64;
let mut max_i = 0;
for op in &self.ops {
match *op {
DayDividerOperation::Insert(i, ts) => {
assert!(i >= max_i, "trying to insert at {i} < max_i={max_i}");
let at = (i64::try_from(i).unwrap() + offset)
.min(i64::try_from(items.len()).unwrap());
assert!(at >= 0);
let at = at as usize;
let item = meta.new_timeline_item(VirtualTimelineItem::DayDivider(ts));
if at == items.len() {
items.push_back(item);
} else if at == 0 {
items.push_front(item);
} else {
items.insert(at, item);
}
offset += 1;
max_i = i;
}
DayDividerOperation::Replace(i, ts) => {
assert!(i >= max_i, "trying to replace at {i} < max_i={max_i}");
let at = i64::try_from(i).unwrap() + offset;
assert!(at >= 0);
let at = at as usize;
let replaced = &items[at];
if !replaced.is_day_divider() {
error!("we replaced a non day-divider @ {i}: {:?}", replaced.kind());
}
let unique_id = replaced.unique_id();
let item = TimelineItem::new(
VirtualTimelineItem::DayDivider(ts),
unique_id.to_owned(),
);
items.set(at, item);
max_i = i;
}
DayDividerOperation::Remove(i) => {
assert!(i >= max_i, "trying to replace at {i} < max_i={max_i}");
let at = i64::try_from(i).unwrap() + offset;
assert!(at >= 0);
let removed = items.remove(at as usize);
if !removed.is_day_divider() {
error!("we removed a non day-divider @ {i}: {:?}", removed.kind());
}
offset -= 1;
max_i = i;
}
}
}
}
fn check_invariants<'a, 'o>(
&mut self,
items: &'a ObservableVectorTransaction<'o, Arc<TimelineItem>>,
initial_state: Option<Vec<Arc<TimelineItem>>>,
) -> Option<DayDividerInvariantsReport<'a, 'o>> {
let mut report = DayDividerInvariantsReport {
initial_state,
errors: Vec::new(),
operations: std::mem::take(&mut self.ops),
final_state: items,
};
if let Some(item) = items.get(0) {
if item.is_read_marker() {
if let Some(next_item) = items.get(1) {
if !next_item.is_day_divider() {
report.errors.push(DayDividerInsertError::FirstItemNotDayDivider);
}
}
} else if !item.is_day_divider() {
report.errors.push(DayDividerInsertError::FirstItemNotDayDivider);
}
}
{
let mut prev_was_day_divider = false;
for (i, item) in items.iter().enumerate() {
if item.is_day_divider() {
if prev_was_day_divider {
report.errors.push(DayDividerInsertError::DuplicateDayDivider { at: i });
}
prev_was_day_divider = true;
} else {
prev_was_day_divider = false;
}
}
};
if let Some(last) = items.last() {
if last.is_day_divider() {
report.errors.push(DayDividerInsertError::TrailingDayDivider);
}
}
{
let mut prev_event_ts = None;
let mut prev_day_divider_ts = None;
for (i, item) in items.iter().enumerate() {
if let Some(ev) = item.as_event() {
let ts = ev.timestamp();
if let Some(prev_ts) = prev_event_ts {
if !is_same_date_as(prev_ts, ts) {
report.errors.push(
DayDividerInsertError::MissingDayDividerBetweenEvents { at: i },
);
}
}
if let Some(prev_ts) = prev_day_divider_ts {
if !is_same_date_as(prev_ts, ts) {
report.errors.push(
DayDividerInsertError::InconsistentDateAfterPreviousDayDivider {
at: i,
},
);
}
} else {
report
.errors
.push(DayDividerInsertError::MissingDayDividerBeforeEvent { at: i });
}
prev_event_ts = Some(ts);
} else if let TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(ts)) =
item.kind()
{
if let Some(prev_ts) = prev_day_divider_ts {
if is_same_date_as(prev_ts, *ts) {
report
.errors
.push(DayDividerInsertError::DuplicateDayDivider { at: i });
}
}
prev_event_ts = None;
prev_day_divider_ts = Some(*ts);
}
}
}
if let Some(state) = &report.initial_state {
if state.iter().any(|item| item.is_read_marker())
&& !report.final_state.iter().any(|item| item.is_read_marker())
{
report.errors.push(DayDividerInsertError::ReadMarkerDisappeared);
}
}
if report.errors.is_empty() {
None
} else {
Some(report)
}
}
}
#[derive(Debug)]
enum DayDividerOperation {
Insert(usize, MilliSecondsSinceUnixEpoch),
Replace(usize, MilliSecondsSinceUnixEpoch),
Remove(usize),
}
impl DayDividerOperation {
fn index(&self) -> usize {
match self {
DayDividerOperation::Insert(i, _)
| DayDividerOperation::Replace(i, _)
| DayDividerOperation::Remove(i) => *i,
}
}
}
#[inline]
fn is_same_date_as(lhs: MilliSecondsSinceUnixEpoch, rhs: MilliSecondsSinceUnixEpoch) -> bool {
timestamp_to_date(lhs) == timestamp_to_date(rhs)
}
struct DayDividerInvariantsReport<'a, 'o> {
initial_state: Option<Vec<Arc<TimelineItem>>>,
operations: Vec<DayDividerOperation>,
final_state: &'a ObservableVectorTransaction<'o, Arc<TimelineItem>>,
errors: Vec<DayDividerInsertError>,
}
impl Display for DayDividerInvariantsReport<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn write_items(
f: &mut std::fmt::Formatter<'_>,
items: &[Arc<TimelineItem>],
) -> std::fmt::Result {
for (i, item) in items.iter().enumerate() {
if let TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(ts)) = item.kind()
{
writeln!(f, "#{i} --- {}", ts.0)?;
} else if let Some(event) = item.as_event() {
writeln!(
f,
"#{i} {}: {}",
event
.event_id()
.map_or_else(|| "(no event id)".to_owned(), |id| id.to_string()),
event.timestamp().0
)?;
} else {
writeln!(f, "#{i} (other virtual item)")?;
}
}
Ok(())
}
if let Some(initial_state) = &self.initial_state {
writeln!(f, "Initial state:")?;
write_items(f, initial_state)?;
writeln!(f, "\nOperations to apply:")?;
for op in &self.operations {
match *op {
DayDividerOperation::Insert(i, ts) => writeln!(f, "insert @ {i}: {}", ts.0)?,
DayDividerOperation::Replace(i, ts) => writeln!(f, "replace @ {i}: {}", ts.0)?,
DayDividerOperation::Remove(i) => writeln!(f, "remove @ {i}")?,
}
}
writeln!(f, "\nFinal state:")?;
write_items(f, self.final_state.iter().cloned().collect::<Vec<_>>().as_slice())?;
writeln!(f)?;
}
for err in &self.errors {
writeln!(f, "{err}")?;
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
enum DayDividerInsertError {
#[error("The first item isn't a day divider")]
FirstItemNotDayDivider,
#[error("Duplicate day divider @ {at}.")]
DuplicateDayDivider { at: usize },
#[error("The last item is a day divider.")]
TrailingDayDivider,
#[error("Missing day divider between events @ {at}")]
MissingDayDividerBetweenEvents { at: usize },
#[error("Missing day divider before event @ {at}")]
MissingDayDividerBeforeEvent { at: usize },
#[error("Event @ {at} and the previous day divider aren't targeting the same date")]
InconsistentDateAfterPreviousDayDivider { at: usize },
#[error("The read marker has been removed")]
ReadMarkerDisappeared,
}
#[cfg(test)]
mod tests {
use assert_matches2::assert_let;
use eyeball_im::ObservableVector;
use ruma::{owned_event_id, owned_user_id, uint, MilliSecondsSinceUnixEpoch};
use super::DayDividerAdjuster;
use crate::timeline::{
controller::TimelineMetadata,
event_item::{EventTimelineItemKind, RemoteEventTimelineItem},
util::timestamp_to_date,
EventTimelineItem, TimelineItemContent, VirtualTimelineItem,
};
fn event_with_ts(timestamp: MilliSecondsSinceUnixEpoch) -> EventTimelineItem {
let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
event_id: owned_event_id!("$1"),
transaction_id: None,
read_receipts: Default::default(),
is_own: false,
is_highlighted: false,
encryption_info: None,
original_json: None,
latest_edit_json: None,
origin: crate::timeline::event_item::RemoteEventOrigin::Sync,
});
EventTimelineItem::new(
owned_user_id!("@alice:example.org"),
crate::timeline::TimelineDetails::Pending,
timestamp,
TimelineItemContent::RedactedMessage,
event_kind,
Default::default(),
false,
)
}
fn test_metadata() -> TimelineMetadata {
TimelineMetadata::new(
owned_user_id!("@a:b.c"),
ruma::RoomVersionId::V11,
None,
None,
Some(false),
)
}
#[test]
fn test_no_trailing_day_divider() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
let timestamp_next_day =
MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap());
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert_let!(Some(item) = iter.next());
assert!(item.is_day_divider());
assert_let!(Some(item) = iter.next());
assert!(item.is_remote_event());
assert_let!(Some(item) = iter.next());
assert!(item.is_read_marker());
assert!(iter.next().is_none());
}
#[test]
fn test_read_marker_in_between_event_and_day_divider() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
let timestamp_next_day =
MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap());
assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day));
let event = event_with_ts(timestamp);
txn.push_back(meta.new_timeline_item(event.clone()));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp_next_day)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
txn.push_back(meta.new_timeline_item(event));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().unwrap().is_read_marker());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().is_none());
}
#[test]
fn test_read_marker_in_between_day_dividers() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
let timestamp_next_day =
MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap());
assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().unwrap().is_read_marker());
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().is_none());
}
#[test]
fn test_remove_all_day_dividers() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
let timestamp_next_day =
MilliSecondsSinceUnixEpoch((42 + 3600 * 24 * 1000).try_into().unwrap());
assert_ne!(timestamp_to_date(timestamp), timestamp_to_date(timestamp_next_day));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp_next_day)));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().is_none());
}
#[test]
fn test_event_read_marker_spurious_day_divider() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().unwrap().is_read_marker());
assert!(iter.next().is_none());
}
#[test]
fn test_multiple_trailing_day_dividers() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::DayDivider(timestamp)));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_read_marker());
assert!(iter.next().is_none());
}
#[test]
fn test_start_with_read_marker() {
let mut items = ObservableVector::new();
let mut txn = items.transaction();
let mut meta = test_metadata();
let timestamp = MilliSecondsSinceUnixEpoch(uint!(42));
txn.push_back(meta.new_timeline_item(VirtualTimelineItem::ReadMarker));
txn.push_back(meta.new_timeline_item(event_with_ts(timestamp)));
let mut adjuster = DayDividerAdjuster::default();
adjuster.run(&mut txn, &mut meta);
txn.commit();
let mut iter = items.iter();
assert!(iter.next().unwrap().is_read_marker());
assert!(iter.next().unwrap().is_day_divider());
assert!(iter.next().unwrap().is_remote_event());
assert!(iter.next().is_none());
}
}