h2/proto/streams/
store.rs

1use super::*;
2
3use indexmap::{self, IndexMap};
4
5use std::convert::Infallible;
6use std::fmt;
7use std::marker::PhantomData;
8use std::ops;
9
10/// Storage for streams
11#[derive(Debug)]
12pub(super) struct Store {
13    slab: slab::Slab<Stream>,
14    ids: IndexMap<StreamId, SlabIndex>,
15}
16
17/// "Pointer" to an entry in the store
18pub(super) struct Ptr<'a> {
19    key: Key,
20    store: &'a mut Store,
21}
22
23/// References an entry in the store.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(crate) struct Key {
26    index: SlabIndex,
27    /// Keep the stream ID in the key as an ABA guard, since slab indices
28    /// could be re-used with a new stream.
29    stream_id: StreamId,
30}
31
32// We can never have more than `StreamId::MAX` streams in the store,
33// so we can save a smaller index (u32 vs usize).
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35struct SlabIndex(u32);
36
37#[derive(Debug)]
38pub(super) struct Queue<N> {
39    indices: Option<store::Indices>,
40    _p: PhantomData<N>,
41}
42
43pub(super) trait Next {
44    fn next(stream: &Stream) -> Option<Key>;
45
46    fn set_next(stream: &mut Stream, key: Option<Key>);
47
48    fn take_next(stream: &mut Stream) -> Option<Key>;
49
50    fn is_queued(stream: &Stream) -> bool;
51
52    fn set_queued(stream: &mut Stream, val: bool);
53}
54
55/// A linked list
56#[derive(Debug, Clone, Copy)]
57struct Indices {
58    pub head: Key,
59    pub tail: Key,
60}
61
62pub(super) enum Entry<'a> {
63    Occupied(OccupiedEntry<'a>),
64    Vacant(VacantEntry<'a>),
65}
66
67pub(super) struct OccupiedEntry<'a> {
68    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69}
70
71pub(super) struct VacantEntry<'a> {
72    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73    slab: &'a mut slab::Slab<Stream>,
74}
75
76pub(super) trait Resolve {
77    fn resolve(&mut self, key: Key) -> Ptr;
78}
79
80// ===== impl Store =====
81
82impl Store {
83    pub fn new() -> Self {
84        Store {
85            slab: slab::Slab::new(),
86            ids: IndexMap::new(),
87        }
88    }
89
90    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91        let index = match self.ids.get(id) {
92            Some(key) => *key,
93            None => return None,
94        };
95
96        Some(Ptr {
97            key: Key {
98                index,
99                stream_id: *id,
100            },
101            store: self,
102        })
103    }
104
105    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106        let index = SlabIndex(self.slab.insert(val) as u32);
107        assert!(self.ids.insert(id, index).is_none());
108
109        Ptr {
110            key: Key {
111                index,
112                stream_id: id,
113            },
114            store: self,
115        }
116    }
117
118    pub fn find_entry(&mut self, id: StreamId) -> Entry {
119        use self::indexmap::map::Entry::*;
120
121        match self.ids.entry(id) {
122            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123            Vacant(e) => Entry::Vacant(VacantEntry {
124                ids: e,
125                slab: &mut self.slab,
126            }),
127        }
128    }
129
130    #[allow(clippy::blocks_in_conditions)]
131    pub(crate) fn for_each<F>(&mut self, mut f: F)
132    where
133        F: FnMut(Ptr),
134    {
135        match self.try_for_each(|ptr| {
136            f(ptr);
137            Ok::<_, Infallible>(())
138        }) {
139            Ok(()) => (),
140            #[allow(unused)]
141            Err(infallible) => match infallible {},
142        }
143    }
144
145    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146    where
147        F: FnMut(Ptr) -> Result<(), E>,
148    {
149        let mut len = self.ids.len();
150        let mut i = 0;
151
152        while i < len {
153            // Get the key by index, this makes the borrow checker happy
154            let (stream_id, index) = {
155                let entry = self.ids.get_index(i).unwrap();
156                (*entry.0, *entry.1)
157            };
158
159            f(Ptr {
160                key: Key { index, stream_id },
161                store: self,
162            })?;
163
164            // TODO: This logic probably could be better...
165            let new_len = self.ids.len();
166
167            if new_len < len {
168                debug_assert!(new_len == len - 1);
169                len -= 1;
170            } else {
171                i += 1;
172            }
173        }
174
175        Ok(())
176    }
177}
178
179impl Resolve for Store {
180    fn resolve(&mut self, key: Key) -> Ptr {
181        Ptr { key, store: self }
182    }
183}
184
185impl ops::Index<Key> for Store {
186    type Output = Stream;
187
188    fn index(&self, key: Key) -> &Self::Output {
189        self.slab
190            .get(key.index.0 as usize)
191            .filter(|s| s.id == key.stream_id)
192            .unwrap_or_else(|| {
193                panic!("dangling store key for stream_id={:?}", key.stream_id);
194            })
195    }
196}
197
198impl ops::IndexMut<Key> for Store {
199    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
200        self.slab
201            .get_mut(key.index.0 as usize)
202            .filter(|s| s.id == key.stream_id)
203            .unwrap_or_else(|| {
204                panic!("dangling store key for stream_id={:?}", key.stream_id);
205            })
206    }
207}
208
209impl Store {
210    #[cfg(feature = "unstable")]
211    pub fn num_active_streams(&self) -> usize {
212        self.ids.len()
213    }
214
215    #[cfg(feature = "unstable")]
216    pub fn num_wired_streams(&self) -> usize {
217        self.slab.len()
218    }
219}
220
221// While running h2 unit/integration tests, enable this debug assertion.
222//
223// In practice, we don't need to ensure this. But the integration tests
224// help to make sure we've cleaned up in cases where we could (like, the
225// runtime isn't suddenly dropping the task for unknown reasons).
226#[cfg(feature = "unstable")]
227impl Drop for Store {
228    fn drop(&mut self) {
229        use std::thread;
230
231        if !thread::panicking() {
232            debug_assert!(self.slab.is_empty());
233        }
234    }
235}
236
237// ===== impl Queue =====
238
239impl<N> Queue<N>
240where
241    N: Next,
242{
243    pub fn new() -> Self {
244        Queue {
245            indices: None,
246            _p: PhantomData,
247        }
248    }
249
250    pub fn take(&mut self) -> Self {
251        Queue {
252            indices: self.indices.take(),
253            _p: PhantomData,
254        }
255    }
256
257    /// Queue the stream.
258    ///
259    /// If the stream is already contained by the list, return `false`.
260    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261        tracing::trace!("Queue::push_back");
262
263        if N::is_queued(stream) {
264            tracing::trace!(" -> already queued");
265            return false;
266        }
267
268        N::set_queued(stream, true);
269
270        // The next pointer shouldn't be set
271        debug_assert!(N::next(stream).is_none());
272
273        // Queue the stream
274        match self.indices {
275            Some(ref mut idxs) => {
276                tracing::trace!(" -> existing entries");
277
278                // Update the current tail node to point to `stream`
279                let key = stream.key();
280                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
282                // Update the tail pointer
283                idxs.tail = stream.key();
284            }
285            None => {
286                tracing::trace!(" -> first entry");
287                self.indices = Some(store::Indices {
288                    head: stream.key(),
289                    tail: stream.key(),
290                });
291            }
292        }
293
294        true
295    }
296
297    /// Queue the stream
298    ///
299    /// If the stream is already contained by the list, return `false`.
300    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
301        tracing::trace!("Queue::push_front");
302
303        if N::is_queued(stream) {
304            tracing::trace!(" -> already queued");
305            return false;
306        }
307
308        N::set_queued(stream, true);
309
310        // The next pointer shouldn't be set
311        debug_assert!(N::next(stream).is_none());
312
313        // Queue the stream
314        match self.indices {
315            Some(ref mut idxs) => {
316                tracing::trace!(" -> existing entries");
317
318                // Update the provided stream to point to the head node
319                let head_key = stream.resolve(idxs.head).key();
320                N::set_next(stream, Some(head_key));
321
322                // Update the head pointer
323                idxs.head = stream.key();
324            }
325            None => {
326                tracing::trace!(" -> first entry");
327                self.indices = Some(store::Indices {
328                    head: stream.key(),
329                    tail: stream.key(),
330                });
331            }
332        }
333
334        true
335    }
336
337    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338    where
339        R: Resolve,
340    {
341        if let Some(mut idxs) = self.indices {
342            let mut stream = store.resolve(idxs.head);
343
344            if idxs.head == idxs.tail {
345                assert!(N::next(&stream).is_none());
346                self.indices = None;
347            } else {
348                idxs.head = N::take_next(&mut stream).unwrap();
349                self.indices = Some(idxs);
350            }
351
352            debug_assert!(N::is_queued(&stream));
353            N::set_queued(&mut stream, false);
354
355            return Some(stream);
356        }
357
358        None
359    }
360
361    pub fn is_empty(&self) -> bool {
362        self.indices.is_none()
363    }
364
365    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
366    where
367        R: Resolve,
368        F: Fn(&Stream) -> bool,
369    {
370        if let Some(idxs) = self.indices {
371            let should_pop = f(&store.resolve(idxs.head));
372            if should_pop {
373                return self.pop(store);
374            }
375        }
376
377        None
378    }
379}
380
381// ===== impl Ptr =====
382
383impl<'a> Ptr<'a> {
384    /// Returns the Key associated with the stream
385    pub fn key(&self) -> Key {
386        self.key
387    }
388
389    pub fn store_mut(&mut self) -> &mut Store {
390        self.store
391    }
392
393    /// Remove the stream from the store
394    pub fn remove(self) -> StreamId {
395        // The stream must have been unlinked before this point
396        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
397
398        // Remove the stream state
399        let stream = self.store.slab.remove(self.key.index.0 as usize);
400        assert_eq!(stream.id, self.key.stream_id);
401        stream.id
402    }
403
404    /// Remove the StreamId -> stream state association.
405    ///
406    /// This will effectively remove the stream as far as the H2 protocol is
407    /// concerned.
408    pub fn unlink(&mut self) {
409        let id = self.key.stream_id;
410        self.store.ids.swap_remove(&id);
411    }
412}
413
414impl<'a> Resolve for Ptr<'a> {
415    fn resolve(&mut self, key: Key) -> Ptr {
416        Ptr {
417            key,
418            store: &mut *self.store,
419        }
420    }
421}
422
423impl<'a> ops::Deref for Ptr<'a> {
424    type Target = Stream;
425
426    fn deref(&self) -> &Stream {
427        &self.store[self.key]
428    }
429}
430
431impl<'a> ops::DerefMut for Ptr<'a> {
432    fn deref_mut(&mut self) -> &mut Stream {
433        &mut self.store[self.key]
434    }
435}
436
437impl<'a> fmt::Debug for Ptr<'a> {
438    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
439        (**self).fmt(fmt)
440    }
441}
442
443// ===== impl OccupiedEntry =====
444
445impl<'a> OccupiedEntry<'a> {
446    pub fn key(&self) -> Key {
447        let stream_id = *self.ids.key();
448        let index = *self.ids.get();
449        Key { index, stream_id }
450    }
451}
452
453// ===== impl VacantEntry =====
454
455impl<'a> VacantEntry<'a> {
456    pub fn insert(self, value: Stream) -> Key {
457        // Insert the value in the slab
458        let stream_id = value.id;
459        let index = SlabIndex(self.slab.insert(value) as u32);
460
461        // Insert the handle in the ID map
462        self.ids.insert(index);
463
464        Key { index, stream_id }
465    }
466}