multiplexing.d 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. module vibe.http.internal.http2.multiplexing;
  2. import vibe.container.hashmap;
  3. import vibe.container.internal.utilallocator;
  4. import vibe.core.sync;
  5. import vibe.core.log;
  6. import vibe.core.net;
  7. import vibe.core.core : yield;
  8. import std.exception;
  9. import std.container : RedBlackTree;
  10. /** Stream multiplexing in HTTP/2
  11. * References: https://tools.ietf.org/html/rfc7540#section-5
  12. *
  13. * The purposes of stream registration into a multiplexer are the following:
  14. * 1. Check correctness of HTTP/2 frames received, following the rules defined
  15. * in the HTTP/2 RFC (https://tools.ietf.org/html/rfc7540)
  16. * 2. Implement stream prioritization / dependency:
  17. * https://tools.ietf.org/html/rfc7540#section-5.3
  18. * 3. Hold data structures which are supposed to mantain the state of a connection,
  19. * since HTTP/2 opens only 1 tcp connection on which multiple frames can be sent.
  20. */
  21. /* ======================================================= */
  22. /* ================ STREAM MANAGEMENT =================== */
  23. /* ======================================================= */
  24. /// register a stream on a MUX
  25. auto registerStream(Mux)(ref Mux multiplexer, const uint sid) @trusted
  26. {
  27. return multiplexer.register(sid);
  28. }
  29. /// close a stream on a MUX
  30. auto closeStream(Mux)(ref Mux multiplexer, const uint sid) @trusted
  31. {
  32. return multiplexer.close(sid);
  33. }
  34. /// check if stream is OPEN (meaning, currently registered and active)
  35. auto isOpenStream(Mux)(ref Mux multiplexer, const uint sid) @trusted
  36. {
  37. return multiplexer.isOpen(sid);
  38. }
  39. /// connection preface (SETTINGS) can be received only ONCE
  40. auto isConnectionPreface(Mux)(ref Mux multiplexer) @trusted
  41. {
  42. return multiplexer.isConnPreface();
  43. }
  44. /* ======================================================= */
  45. /* ================= FLOW CONTROL ======================== */
  46. /* ======================================================= */
  47. /** Per-connection window
  48. * Valid for EVERY stream in MUX[idx]
  49. */
  50. auto connectionWindow(Mux)(ref Mux multiplexer) @trusted
  51. {
  52. return multiplexer.connWindow;
  53. }
  54. /// Update the connection window value
  55. auto updateConnectionWindow(Mux)(ref Mux multiplexer, const ulong newWin) @trusted
  56. {
  57. return multiplexer.updateConnWindow(newWin);
  58. }
  59. /** Per-stream window
  60. * Valid for stream `sid` in MUX[idx]
  61. */
  62. auto streamConnectionWindow(Mux)(ref Mux multiplexer, const uint sid) @trusted
  63. {
  64. return multiplexer.streamConnWindow(sid);
  65. }
  66. /// Update the stream connection window value
  67. auto updateStreamConnectionWindow(Mux)(ref Mux multiplexer, const uint sid, const ulong newWin) @trusted
  68. {
  69. return multiplexer.updateStreamConnWindow(sid, newWin);
  70. }
  71. /** A TaskCondition is used to synchronize DATA frame sending
  72. * this enforces flow control on every outgoing DATA frame
  73. * So that the client-established connection/stream window
  74. * is not exceeded.
  75. * Each connection (MUX) has its own condition.
  76. */
  77. void waitCondition(Mux)(ref Mux multiplexer, const uint sid) @trusted
  78. {
  79. multiplexer.wait(sid);
  80. }
  81. /// signal the waiting task(s) that a change
  82. /// in the connection window has occourred
  83. void notifyCondition(Mux)(ref Mux multiplexer) @trusted
  84. {
  85. multiplexer.notify();
  86. }
  87. /// check if waiting tasks are enqueued for this connection
  88. uint checkCondition(Mux)(ref Mux multiplexer, const uint sid) @trusted
  89. {
  90. return multiplexer.checkCond(sid);
  91. }
  92. /// signal that the DATA dispatch is over
  93. /// task is no longer enqueued
  94. void doneCondition(Mux)(ref Mux multiplexer, const uint sid) @trusted
  95. {
  96. multiplexer.endWait(sid);
  97. }
  98. /** Underlying multiplexer data structure
  99. * Uses a TaskMutex to perform sensitive operations
  100. * since multiple streams might be operating on the same
  101. * connection (MUX)
  102. */
  103. struct HTTP2Multiplexer {
  104. /// used to register open streams, which must be unique
  105. private alias H2Queue = RedBlackTree!uint;
  106. private {
  107. IAllocator m_alloc;
  108. H2Queue m_open; // set of open streams
  109. uint m_closed; // index of the last closed stream
  110. uint m_last; // index of last open stream
  111. uint m_max; // maximum number of streams open at the same time
  112. uint m_countOpen; // current number of open streams (in m_open)
  113. TaskMutex m_lock;
  114. TaskCondition m_cond;
  115. uint[uint] m_waiting;
  116. ulong m_wsize;
  117. ulong[uint] m_streamWSize;
  118. bool m_connPreface = true;
  119. }
  120. @disable this();
  121. this(Alloc)(Alloc alloc, const uint max, const ulong wsize, const uint tsize=4096) @trusted
  122. nothrow {
  123. m_alloc = alloc;
  124. try {
  125. m_lock = alloc.make!TaskMutex();
  126. m_cond = alloc.make!TaskCondition(m_lock);
  127. m_open = alloc.make!H2Queue();
  128. } catch (Exception e) assert(false, e.msg);
  129. m_last = 0;
  130. m_max = max;
  131. m_wsize = wsize;
  132. }
  133. /** The methods from here downwards
  134. * are not supposed to be used directly,
  135. * but through the documented wrappers above.
  136. */
  137. @property void wait(const uint sid) @trusted
  138. {
  139. synchronized(m_lock) {
  140. if(!(sid in m_waiting)) m_waiting[sid] = 0;
  141. else m_waiting[sid]++;
  142. m_cond.wait();
  143. }
  144. }
  145. @property void endWait(const uint sid) @trusted
  146. {
  147. synchronized(m_lock) {
  148. if(!(sid in m_waiting)) m_waiting[sid] = 0;
  149. else m_waiting[sid]--;
  150. }
  151. }
  152. @property void notify() @trusted
  153. {
  154. m_cond.notify();
  155. }
  156. @property uint checkCond(const uint sid) @safe
  157. {
  158. if(!(sid in m_waiting)) return 0;
  159. return m_waiting[sid] > 0 && isOpen(sid);
  160. }
  161. @property ulong connWindow() @safe
  162. {
  163. return m_wsize;
  164. }
  165. @property ulong streamConnWindow(const uint sid) @safe
  166. {
  167. if(!(sid in m_streamWSize)) return 0;
  168. return m_streamWSize[sid];
  169. }
  170. @property bool isConnPreface() @safe
  171. {
  172. // can only be true once per connection
  173. auto b = m_connPreface;
  174. m_lock.performLocked!({
  175. m_connPreface = false;
  176. });
  177. return b;
  178. }
  179. // register a new open stream
  180. bool register(const uint sid) @safe
  181. {
  182. if(sid == 0) return true; // success, but sid=0 is not registered
  183. if(m_countOpen + 1 > m_max) return false; // PROTOCOL_ERROR: too many open streams
  184. if(sid <= m_last && sid != 0) return false; // Stream ID must be greater than previously
  185. // registered ones
  186. m_lock.performLocked!({
  187. m_countOpen++;
  188. m_open.insert(sid);
  189. m_last = sid;
  190. m_streamWSize[sid] = m_wsize;
  191. });
  192. return true;
  193. }
  194. // close an open stream
  195. bool close(const uint sid) @safe
  196. {
  197. if(!(sid in m_open)) return false; //Cannot close a stream which is not open
  198. if(sid in m_waiting && m_waiting[sid]) return false; //Cannot close a stream which is blocked
  199. m_lock.performLocked!({
  200. m_countOpen--;
  201. m_open.removeKey(sid);
  202. m_streamWSize.remove(sid);
  203. });
  204. return true;
  205. }
  206. // open streams are present in m_open
  207. bool isOpen(const uint sid) @safe
  208. {
  209. return sid in m_open;
  210. }
  211. bool updateConnWindow(const ulong newWin) @safe
  212. {
  213. if(newWin > ulong.max || newWin < 0) return false;
  214. logDebug("MUX: CONTROL FLOW WINDOW: from %d to %d bytes", m_wsize, newWin);
  215. m_lock.performLocked!({
  216. m_wsize = newWin;
  217. });
  218. return true;
  219. }
  220. bool updateStreamConnWindow(const uint sid, const ulong newWin) @safe
  221. {
  222. if(newWin > ulong.max || newWin < 0) return false;
  223. if(sid == 0) return true;
  224. logDebug("MUX: CONTROL FLOW WINDOW: stream %d from %d to %d bytes",
  225. sid, (sid in m_streamWSize) ? m_streamWSize[sid] : m_wsize, newWin);
  226. m_lock.performLocked!({
  227. m_streamWSize[sid] = newWin;
  228. });
  229. return true;
  230. }
  231. }