use std::cmp::Ordering;
use eyeball_im::{
VectorDiff, VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream,
};
use futures_core::Stream;
use imbl::Vector;
use super::{
ops::{
VecVectorDiffFamily, VectorDiffContainerFamily, VectorDiffContainerOps, VectorDiffFamily,
},
EmptyCountStream, EmptyLimitStream, Filter, FilterMap, Head, Skip, Sort, SortBy, SortByKey,
Tail,
};
pub trait VectorDiffContainer:
VectorDiffContainerOps<Self::Element, Family = <Self as VectorDiffContainer>::Family>
{
type Element: Clone + 'static;
#[doc(hidden)]
type Family: VectorDiffContainerFamily<Member<Self::Element> = Self>;
}
impl<T: Clone + 'static> VectorDiffContainer for VectorDiff<T> {
type Element = T;
type Family = VectorDiffFamily;
}
impl<T: Clone + 'static> VectorDiffContainer for Vec<VectorDiff<T>> {
type Element = T;
type Family = VecVectorDiffFamily;
}
pub trait VectorSubscriberExt<T> {
fn batched(self) -> BatchedVectorSubscriber<T>;
}
impl<T> VectorSubscriberExt<T> for VectorSubscriber<T> {
fn batched(self) -> BatchedVectorSubscriber<T> {
BatchedVectorSubscriber { inner: self }
}
}
#[derive(Debug)]
pub struct BatchedVectorSubscriber<T> {
inner: VectorSubscriber<T>,
}
pub trait VectorObserver<T>: Sized {
#[doc(hidden)]
type Stream: Stream;
#[doc(hidden)]
fn into_parts(self) -> (Vector<T>, Self::Stream);
}
impl<T: Clone + 'static> VectorObserver<T> for VectorSubscriber<T> {
type Stream = VectorSubscriberStream<T>;
fn into_parts(self) -> (Vector<T>, Self::Stream) {
self.into_values_and_stream()
}
}
impl<T: Clone + 'static> VectorObserver<T> for BatchedVectorSubscriber<T> {
type Stream = VectorSubscriberBatchedStream<T>;
fn into_parts(self) -> (Vector<T>, Self::Stream) {
self.inner.into_values_and_batched_stream()
}
}
impl<T, S> VectorObserver<T> for (Vector<T>, S)
where
S: Stream,
S::Item: VectorDiffContainer,
{
type Stream = S;
fn into_parts(self) -> (Vector<T>, Self::Stream) {
self
}
}
pub trait VectorObserverExt<T>: VectorObserver<T>
where
T: Clone + 'static,
<Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
{
fn filter<F>(self, f: F) -> (Vector<T>, Filter<Self::Stream, F>)
where
F: Fn(&T) -> bool,
{
let (items, stream) = self.into_parts();
Filter::new(items, stream, f)
}
fn filter_map<U, F>(self, f: F) -> (Vector<U>, FilterMap<Self::Stream, F>)
where
U: Clone,
F: Fn(T) -> Option<U>,
{
let (items, stream) = self.into_parts();
FilterMap::new(items, stream, f)
}
fn head(self, limit: usize) -> (Vector<T>, Head<Self::Stream, EmptyLimitStream>) {
let (items, stream) = self.into_parts();
Head::new(items, stream, limit)
}
fn dynamic_head<L>(self, limit_stream: L) -> Head<Self::Stream, L>
where
L: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Head::dynamic(items, stream, limit_stream)
}
fn dynamic_head_with_initial_value<L>(
self,
initial_limit: usize,
limit_stream: L,
) -> (Vector<T>, Head<Self::Stream, L>)
where
L: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Head::dynamic_with_initial_limit(items, stream, initial_limit, limit_stream)
}
fn tail(self, limit: usize) -> (Vector<T>, Tail<Self::Stream, EmptyLimitStream>) {
let (items, stream) = self.into_parts();
Tail::new(items, stream, limit)
}
fn dynamic_tail<L>(self, limit_stream: L) -> Tail<Self::Stream, L>
where
L: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Tail::dynamic(items, stream, limit_stream)
}
fn dynamic_tail_with_initial_value<L>(
self,
initial_limit: usize,
limit_stream: L,
) -> (Vector<T>, Tail<Self::Stream, L>)
where
L: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Tail::dynamic_with_initial_limit(items, stream, initial_limit, limit_stream)
}
fn skip(self, count: usize) -> (Vector<T>, Skip<Self::Stream, EmptyCountStream>) {
let (items, stream) = self.into_parts();
Skip::new(items, stream, count)
}
fn dynamic_skip<C>(self, count_stream: C) -> Skip<Self::Stream, C>
where
C: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Skip::dynamic(items, stream, count_stream)
}
fn dynamic_skip_with_initial_count<C>(
self,
initial_count: usize,
count_stream: C,
) -> (Vector<T>, Skip<Self::Stream, C>)
where
C: Stream<Item = usize>,
{
let (items, stream) = self.into_parts();
Skip::dynamic_with_initial_count(items, stream, initial_count, count_stream)
}
fn sort(self) -> (Vector<T>, Sort<Self::Stream>)
where
T: Ord,
{
let (items, stream) = self.into_parts();
Sort::new(items, stream)
}
fn sort_by<F>(self, compare: F) -> (Vector<T>, SortBy<Self::Stream, F>)
where
F: Fn(&T, &T) -> Ordering,
{
let (items, stream) = self.into_parts();
SortBy::new(items, stream, compare)
}
fn sort_by_key<F, K>(self, key_fn: F) -> (Vector<T>, SortByKey<Self::Stream, F>)
where
F: Fn(&T) -> K,
K: Ord,
{
let (items, stream) = self.into_parts();
SortByKey::new(items, stream, key_fn)
}
}
impl<T, O> VectorObserverExt<T> for O
where
T: Clone + 'static,
O: VectorObserver<T>,
<Self::Stream as Stream>::Item: VectorDiffContainer<Element = T>,
{
}