h2/proto/streams/
counts.rs1use super::*;
2
3#[derive(Debug)]
4pub(super) struct Counts {
5 peer: peer::Dyn,
8
9 max_send_streams: usize,
11
12 num_send_streams: usize,
14
15 max_recv_streams: usize,
17
18 num_recv_streams: usize,
20
21 max_local_reset_streams: usize,
23
24 num_local_reset_streams: usize,
26
27 max_remote_reset_streams: usize,
29
30 num_remote_reset_streams: usize,
32
33 max_local_error_reset_streams: Option<usize>,
38
39 num_local_error_reset_streams: usize,
42}
43
44impl Counts {
45 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
47 Counts {
48 peer,
49 max_send_streams: config.initial_max_send_streams,
50 num_send_streams: 0,
51 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
52 num_recv_streams: 0,
53 max_local_reset_streams: config.local_reset_max,
54 num_local_reset_streams: 0,
55 max_remote_reset_streams: config.remote_reset_max,
56 num_remote_reset_streams: 0,
57 max_local_error_reset_streams: config.local_max_error_reset_streams,
58 num_local_error_reset_streams: 0,
59 }
60 }
61
62 pub fn next_send_stream_will_reach_capacity(&self) -> bool {
67 self.max_send_streams <= (self.num_send_streams + 1)
68 }
69
70 pub fn peer(&self) -> peer::Dyn {
72 self.peer
73 }
74
75 pub fn has_streams(&self) -> bool {
76 self.num_send_streams != 0 || self.num_recv_streams != 0
77 }
78
79 pub fn can_inc_num_local_error_resets(&self) -> bool {
81 if let Some(max) = self.max_local_error_reset_streams {
82 max > self.num_local_error_reset_streams
83 } else {
84 true
85 }
86 }
87
88 pub fn inc_num_local_error_resets(&mut self) {
89 assert!(self.can_inc_num_local_error_resets());
90
91 self.num_local_error_reset_streams += 1;
93 }
94
95 pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
96 self.max_local_error_reset_streams
97 }
98
99 pub fn can_inc_num_recv_streams(&self) -> bool {
101 self.max_recv_streams > self.num_recv_streams
102 }
103
104 pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
110 assert!(self.can_inc_num_recv_streams());
111 assert!(!stream.is_counted);
112
113 self.num_recv_streams += 1;
115 stream.is_counted = true;
116 }
117
118 pub fn can_inc_num_send_streams(&self) -> bool {
120 self.max_send_streams > self.num_send_streams
121 }
122
123 pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
129 assert!(self.can_inc_num_send_streams());
130 assert!(!stream.is_counted);
131
132 self.num_send_streams += 1;
134 stream.is_counted = true;
135 }
136
137 pub fn can_inc_num_reset_streams(&self) -> bool {
139 self.max_local_reset_streams > self.num_local_reset_streams
140 }
141
142 pub fn inc_num_reset_streams(&mut self) {
148 assert!(self.can_inc_num_reset_streams());
149
150 self.num_local_reset_streams += 1;
151 }
152
153 pub(crate) fn max_remote_reset_streams(&self) -> usize {
154 self.max_remote_reset_streams
155 }
156
157 pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
160 self.max_remote_reset_streams > self.num_remote_reset_streams
161 }
162
163 pub(crate) fn inc_num_remote_reset_streams(&mut self) {
169 assert!(self.can_inc_num_remote_reset_streams());
170
171 self.num_remote_reset_streams += 1;
172 }
173
174 pub(crate) fn dec_num_remote_reset_streams(&mut self) {
175 assert!(self.num_remote_reset_streams > 0);
176
177 self.num_remote_reset_streams -= 1;
178 }
179
180 pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
181 match settings.max_concurrent_streams() {
182 Some(val) => self.max_send_streams = val as usize,
183 None if is_initial => self.max_send_streams = usize::MAX,
184 None => {}
185 }
186 }
187
188 pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195 where
196 F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197 {
198 let is_pending_reset = stream.is_pending_reset_expiration();
200
201 let ret = f(self, &mut stream);
203
204 self.transition_after(stream, is_pending_reset);
205
206 ret
207 }
208
209 pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211 tracing::trace!(
212 "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213 pending_send_empty={:?}; buffered_send_data={}; \
214 num_recv={}; num_send={}",
215 stream.id,
216 stream.state,
217 stream.is_closed(),
218 stream.pending_send.is_empty(),
219 stream.buffered_send_data,
220 self.num_recv_streams,
221 self.num_send_streams
222 );
223
224 if stream.is_closed() {
225 if !stream.is_pending_reset_expiration() {
226 stream.unlink();
227 if is_reset_counted {
228 self.dec_num_reset_streams();
229 }
230 }
231
232 if !stream.state.is_scheduled_reset() && stream.is_counted {
233 tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234 self.dec_num_streams(&mut stream);
236 }
237 }
238
239 if stream.is_released() {
241 stream.remove();
242 }
243 }
244
245 pub(crate) fn max_send_streams(&self) -> usize {
248 self.max_send_streams
249 }
250
251 pub(crate) fn max_recv_streams(&self) -> usize {
254 self.max_recv_streams
255 }
256
257 fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258 assert!(stream.is_counted);
259
260 if self.peer.is_local_init(stream.id) {
261 assert!(self.num_send_streams > 0);
262 self.num_send_streams -= 1;
263 stream.is_counted = false;
264 } else {
265 assert!(self.num_recv_streams > 0);
266 self.num_recv_streams -= 1;
267 stream.is_counted = false;
268 }
269 }
270
271 fn dec_num_reset_streams(&mut self) {
272 assert!(self.num_local_reset_streams > 0);
273 self.num_local_reset_streams -= 1;
274 }
275}
276
277impl Drop for Counts {
278 fn drop(&mut self) {
279 use std::thread;
280
281 if !thread::panicking() {
282 debug_assert!(!self.has_streams());
283 }
284 }
285}