1use super::*;
2
3use indexmap::{self, IndexMap};
4
5use std::convert::Infallible;
6use std::fmt;
7use std::marker::PhantomData;
8use std::ops;
9
10#[derive(Debug)]
12pub(super) struct Store {
13 slab: slab::Slab<Stream>,
14 ids: IndexMap<StreamId, SlabIndex>,
15}
16
17pub(super) struct Ptr<'a> {
19 key: Key,
20 store: &'a mut Store,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(crate) struct Key {
26 index: SlabIndex,
27 stream_id: StreamId,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35struct SlabIndex(u32);
36
37#[derive(Debug)]
38pub(super) struct Queue<N> {
39 indices: Option<store::Indices>,
40 _p: PhantomData<N>,
41}
42
43pub(super) trait Next {
44 fn next(stream: &Stream) -> Option<Key>;
45
46 fn set_next(stream: &mut Stream, key: Option<Key>);
47
48 fn take_next(stream: &mut Stream) -> Option<Key>;
49
50 fn is_queued(stream: &Stream) -> bool;
51
52 fn set_queued(stream: &mut Stream, val: bool);
53}
54
55#[derive(Debug, Clone, Copy)]
57struct Indices {
58 pub head: Key,
59 pub tail: Key,
60}
61
62pub(super) enum Entry<'a> {
63 Occupied(OccupiedEntry<'a>),
64 Vacant(VacantEntry<'a>),
65}
66
67pub(super) struct OccupiedEntry<'a> {
68 ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69}
70
71pub(super) struct VacantEntry<'a> {
72 ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73 slab: &'a mut slab::Slab<Stream>,
74}
75
76pub(super) trait Resolve {
77 fn resolve(&mut self, key: Key) -> Ptr;
78}
79
80impl Store {
83 pub fn new() -> Self {
84 Store {
85 slab: slab::Slab::new(),
86 ids: IndexMap::new(),
87 }
88 }
89
90 pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91 let index = match self.ids.get(id) {
92 Some(key) => *key,
93 None => return None,
94 };
95
96 Some(Ptr {
97 key: Key {
98 index,
99 stream_id: *id,
100 },
101 store: self,
102 })
103 }
104
105 pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106 let index = SlabIndex(self.slab.insert(val) as u32);
107 assert!(self.ids.insert(id, index).is_none());
108
109 Ptr {
110 key: Key {
111 index,
112 stream_id: id,
113 },
114 store: self,
115 }
116 }
117
118 pub fn find_entry(&mut self, id: StreamId) -> Entry {
119 use self::indexmap::map::Entry::*;
120
121 match self.ids.entry(id) {
122 Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123 Vacant(e) => Entry::Vacant(VacantEntry {
124 ids: e,
125 slab: &mut self.slab,
126 }),
127 }
128 }
129
130 #[allow(clippy::blocks_in_conditions)]
131 pub(crate) fn for_each<F>(&mut self, mut f: F)
132 where
133 F: FnMut(Ptr),
134 {
135 match self.try_for_each(|ptr| {
136 f(ptr);
137 Ok::<_, Infallible>(())
138 }) {
139 Ok(()) => (),
140 #[allow(unused)]
141 Err(infallible) => match infallible {},
142 }
143 }
144
145 pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146 where
147 F: FnMut(Ptr) -> Result<(), E>,
148 {
149 let mut len = self.ids.len();
150 let mut i = 0;
151
152 while i < len {
153 let (stream_id, index) = {
155 let entry = self.ids.get_index(i).unwrap();
156 (*entry.0, *entry.1)
157 };
158
159 f(Ptr {
160 key: Key { index, stream_id },
161 store: self,
162 })?;
163
164 let new_len = self.ids.len();
166
167 if new_len < len {
168 debug_assert!(new_len == len - 1);
169 len -= 1;
170 } else {
171 i += 1;
172 }
173 }
174
175 Ok(())
176 }
177}
178
179impl Resolve for Store {
180 fn resolve(&mut self, key: Key) -> Ptr {
181 Ptr { key, store: self }
182 }
183}
184
185impl ops::Index<Key> for Store {
186 type Output = Stream;
187
188 fn index(&self, key: Key) -> &Self::Output {
189 self.slab
190 .get(key.index.0 as usize)
191 .filter(|s| s.id == key.stream_id)
192 .unwrap_or_else(|| {
193 panic!("dangling store key for stream_id={:?}", key.stream_id);
194 })
195 }
196}
197
198impl ops::IndexMut<Key> for Store {
199 fn index_mut(&mut self, key: Key) -> &mut Self::Output {
200 self.slab
201 .get_mut(key.index.0 as usize)
202 .filter(|s| s.id == key.stream_id)
203 .unwrap_or_else(|| {
204 panic!("dangling store key for stream_id={:?}", key.stream_id);
205 })
206 }
207}
208
209impl Store {
210 #[cfg(feature = "unstable")]
211 pub fn num_active_streams(&self) -> usize {
212 self.ids.len()
213 }
214
215 #[cfg(feature = "unstable")]
216 pub fn num_wired_streams(&self) -> usize {
217 self.slab.len()
218 }
219}
220
221#[cfg(feature = "unstable")]
227impl Drop for Store {
228 fn drop(&mut self) {
229 use std::thread;
230
231 if !thread::panicking() {
232 debug_assert!(self.slab.is_empty());
233 }
234 }
235}
236
237impl<N> Queue<N>
240where
241 N: Next,
242{
243 pub fn new() -> Self {
244 Queue {
245 indices: None,
246 _p: PhantomData,
247 }
248 }
249
250 pub fn take(&mut self) -> Self {
251 Queue {
252 indices: self.indices.take(),
253 _p: PhantomData,
254 }
255 }
256
257 pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261 tracing::trace!("Queue::push_back");
262
263 if N::is_queued(stream) {
264 tracing::trace!(" -> already queued");
265 return false;
266 }
267
268 N::set_queued(stream, true);
269
270 debug_assert!(N::next(stream).is_none());
272
273 match self.indices {
275 Some(ref mut idxs) => {
276 tracing::trace!(" -> existing entries");
277
278 let key = stream.key();
280 N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
282 idxs.tail = stream.key();
284 }
285 None => {
286 tracing::trace!(" -> first entry");
287 self.indices = Some(store::Indices {
288 head: stream.key(),
289 tail: stream.key(),
290 });
291 }
292 }
293
294 true
295 }
296
297 pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
301 tracing::trace!("Queue::push_front");
302
303 if N::is_queued(stream) {
304 tracing::trace!(" -> already queued");
305 return false;
306 }
307
308 N::set_queued(stream, true);
309
310 debug_assert!(N::next(stream).is_none());
312
313 match self.indices {
315 Some(ref mut idxs) => {
316 tracing::trace!(" -> existing entries");
317
318 let head_key = stream.resolve(idxs.head).key();
320 N::set_next(stream, Some(head_key));
321
322 idxs.head = stream.key();
324 }
325 None => {
326 tracing::trace!(" -> first entry");
327 self.indices = Some(store::Indices {
328 head: stream.key(),
329 tail: stream.key(),
330 });
331 }
332 }
333
334 true
335 }
336
337 pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338 where
339 R: Resolve,
340 {
341 if let Some(mut idxs) = self.indices {
342 let mut stream = store.resolve(idxs.head);
343
344 if idxs.head == idxs.tail {
345 assert!(N::next(&stream).is_none());
346 self.indices = None;
347 } else {
348 idxs.head = N::take_next(&mut stream).unwrap();
349 self.indices = Some(idxs);
350 }
351
352 debug_assert!(N::is_queued(&stream));
353 N::set_queued(&mut stream, false);
354
355 return Some(stream);
356 }
357
358 None
359 }
360
361 pub fn is_empty(&self) -> bool {
362 self.indices.is_none()
363 }
364
365 pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
366 where
367 R: Resolve,
368 F: Fn(&Stream) -> bool,
369 {
370 if let Some(idxs) = self.indices {
371 let should_pop = f(&store.resolve(idxs.head));
372 if should_pop {
373 return self.pop(store);
374 }
375 }
376
377 None
378 }
379}
380
381impl<'a> Ptr<'a> {
384 pub fn key(&self) -> Key {
386 self.key
387 }
388
389 pub fn store_mut(&mut self) -> &mut Store {
390 self.store
391 }
392
393 pub fn remove(self) -> StreamId {
395 debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
397
398 let stream = self.store.slab.remove(self.key.index.0 as usize);
400 assert_eq!(stream.id, self.key.stream_id);
401 stream.id
402 }
403
404 pub fn unlink(&mut self) {
409 let id = self.key.stream_id;
410 self.store.ids.swap_remove(&id);
411 }
412}
413
414impl<'a> Resolve for Ptr<'a> {
415 fn resolve(&mut self, key: Key) -> Ptr {
416 Ptr {
417 key,
418 store: &mut *self.store,
419 }
420 }
421}
422
423impl<'a> ops::Deref for Ptr<'a> {
424 type Target = Stream;
425
426 fn deref(&self) -> &Stream {
427 &self.store[self.key]
428 }
429}
430
431impl<'a> ops::DerefMut for Ptr<'a> {
432 fn deref_mut(&mut self) -> &mut Stream {
433 &mut self.store[self.key]
434 }
435}
436
437impl<'a> fmt::Debug for Ptr<'a> {
438 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
439 (**self).fmt(fmt)
440 }
441}
442
443impl<'a> OccupiedEntry<'a> {
446 pub fn key(&self) -> Key {
447 let stream_id = *self.ids.key();
448 let index = *self.ids.get();
449 Key { index, stream_id }
450 }
451}
452
453impl<'a> VacantEntry<'a> {
456 pub fn insert(self, value: Stream) -> Key {
457 let stream_id = value.id;
459 let index = SlabIndex(self.slab.insert(value) as u32);
460
461 self.ids.insert(index);
463
464 Key { index, stream_id }
465 }
466}