websockets.d 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291
  1. /**
  2. Implements WebSocket support and fallbacks for older browsers.
  3. Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
  4. Copyright: © 2012-2014 Sönke Ludwig
  5. License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
  6. Authors: Jan Krüger
  7. */
  8. module vibe.http.websockets;
  9. ///
  10. @safe unittest {
  11. void handleConn(scope WebSocket sock)
  12. {
  13. // simple echo server
  14. while (sock.connected) {
  15. auto msg = sock.receiveText();
  16. sock.send(msg);
  17. }
  18. }
  19. void startServer()
  20. {
  21. import vibe.http.router;
  22. auto router = new URLRouter;
  23. router.get("/ws", handleWebSockets(&handleConn));
  24. // Start HTTP server using listenHTTP()...
  25. }
  26. }
  27. import vibe.core.core;
  28. import vibe.core.log;
  29. import vibe.core.net;
  30. import vibe.core.sync;
  31. import vibe.stream.operations;
  32. import vibe.http.server;
  33. import vibe.http.client;
  34. import vibe.core.connectionpool;
  35. import core.time;
  36. import std.algorithm: equal, splitter;
  37. import std.array;
  38. import std.base64;
  39. import std.conv;
  40. import std.exception;
  41. import std.bitmanip;
  42. import std.digest.sha;
  43. import std.string;
  44. import std.functional;
  45. import std.uuid;
  46. import std.base64;
  47. import std.digest.sha;
  48. import std.uni: asLowerCase;
  49. import vibe.crypto.cryptorand;
  50. @safe:
  51. alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
  52. /// Exception thrown by $(D vibe.http.websockets).
  53. class WebSocketException: Exception
  54. {
  55. @safe pure nothrow:
  56. ///
  57. this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
  58. {
  59. super(msg, file, line, next);
  60. }
  61. ///
  62. this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
  63. {
  64. super(msg, next, file, line);
  65. }
  66. }
  67. /** Establishes a WebSocket connection at the specified endpoint.
  68. */
  69. WebSocket connectWebSocketEx(URL url,
  70. scope void delegate(scope HTTPClientRequest) @safe request_modifier,
  71. const(HTTPClientSettings) settings = defaultSettings)
  72. @safe {
  73. const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
  74. url.schema = use_tls ? "https" : "http";
  75. auto rng = secureRNG();
  76. auto challengeKey = generateChallengeKey(rng);
  77. auto answerKey = computeAcceptKey(challengeKey);
  78. auto res = requestHTTP(url, (scope req){
  79. req.method = HTTPMethod.GET;
  80. req.headers["Upgrade"] = "websocket";
  81. req.headers["Connection"] = "Upgrade";
  82. req.headers["Sec-WebSocket-Version"] = "13";
  83. req.headers["Sec-WebSocket-Key"] = challengeKey;
  84. request_modifier(req);
  85. }, settings);
  86. enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
  87. auto key = "sec-websocket-accept" in res.headers;
  88. enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
  89. enforce(*key == answerKey, "Response has wrong accept key");
  90. auto conn = res.switchProtocol("websocket");
  91. return new WebSocket(conn, rng, res);
  92. }
  93. /// ditto
  94. void connectWebSocketEx(URL url,
  95. scope void delegate(scope HTTPClientRequest) @safe request_modifier,
  96. scope WebSocketHandshakeDelegate del,
  97. const(HTTPClientSettings) settings = defaultSettings)
  98. @safe {
  99. const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
  100. url.schema = use_tls ? "https" : "http";
  101. /*scope*/auto rng = secureRNG();
  102. auto challengeKey = generateChallengeKey(rng);
  103. auto answerKey = computeAcceptKey(challengeKey);
  104. requestHTTP(url,
  105. (scope req) {
  106. req.method = HTTPMethod.GET;
  107. req.headers["Upgrade"] = "websocket";
  108. req.headers["Connection"] = "Upgrade";
  109. req.headers["Sec-WebSocket-Version"] = "13";
  110. req.headers["Sec-WebSocket-Key"] = challengeKey;
  111. request_modifier(req);
  112. },
  113. (scope res) {
  114. enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
  115. auto key = "sec-websocket-accept" in res.headers;
  116. enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
  117. enforce(*key == answerKey, "Response has wrong accept key");
  118. res.switchProtocol("websocket", (scope conn) @trusted {
  119. scope ws = new WebSocket(conn, rng, res);
  120. del(ws);
  121. if (ws.connected) ws.close();
  122. });
  123. },
  124. settings
  125. );
  126. }
  127. /// ditto
  128. WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
  129. @safe {
  130. return connectWebSocketEx(url, (scope req) {}, settings);
  131. }
  132. /// ditto
  133. void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
  134. @safe {
  135. connectWebSocketEx(url, (scope req) {}, del, settings);
  136. }
  137. /// ditto
  138. void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
  139. @system {
  140. connectWebSocket(url, (scope ws) nothrow {
  141. try del(ws);
  142. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  143. }, settings);
  144. }
  145. /// Scheduled for deprecation - use a `@safe` callback instead.
  146. void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
  147. @system {
  148. connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
  149. }
  150. /// Scheduled for deprecation - use a `nothrow` callback instead.
  151. void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
  152. @safe {
  153. connectWebSocket(url, (scope ws) nothrow {
  154. try del(ws);
  155. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  156. }, settings);
  157. }
  158. /**
  159. Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
  160. */
  161. void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
  162. @safe {
  163. auto pUpgrade = "Upgrade" in req.headers;
  164. auto pConnection = "Connection" in req.headers;
  165. auto pKey = "Sec-WebSocket-Key" in req.headers;
  166. //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
  167. auto pVersion = "Sec-WebSocket-Version" in req.headers;
  168. auto isUpgrade = false;
  169. if( pConnection ) {
  170. auto connectionTypes = splitter(*pConnection, ",");
  171. foreach( t ; connectionTypes ) {
  172. if( t.strip().asLowerCase().equal("upgrade") ) {
  173. isUpgrade = true;
  174. break;
  175. }
  176. }
  177. }
  178. string req_error;
  179. if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
  180. else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
  181. else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
  182. else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
  183. if (req_error.length) {
  184. logDebug("Browser sent invalid WebSocket request: %s", req_error);
  185. res.statusCode = HTTPStatus.badRequest;
  186. res.writeBody(req_error);
  187. return;
  188. }
  189. auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
  190. res.headers["Sec-WebSocket-Accept"] = accept;
  191. res.headers["Connection"] = "Upgrade";
  192. ConnectionStream conn = res.switchProtocol("websocket");
  193. // NOTE: silencing scope warning here - WebSocket references the scoped
  194. // req/res objects throughout its lifetime, which has a narrower scope
  195. scope socket = () @trusted { return new WebSocket(conn, req, res); } ();
  196. try {
  197. on_handshake(socket);
  198. } catch (Exception e) {
  199. logDiagnostic("WebSocket handler failed: %s", e.msg);
  200. }
  201. socket.close();
  202. }
  203. /// Scheduled for deprecation - use a `@safe` callback instead.
  204. void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
  205. @system {
  206. handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
  207. }
  208. /// Scheduled for deprecation - use a `nothrow` callback instead.
  209. void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
  210. {
  211. handleWebSocket((scope ws) nothrow {
  212. try on_handshake(ws);
  213. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  214. }, req, res);
  215. }
  216. /// ditto
  217. void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
  218. @system {
  219. handleWebSocket((scope ws) nothrow {
  220. try on_handshake(ws);
  221. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  222. }, req, res);
  223. }
  224. /**
  225. Returns a HTTP request handler that establishes web socket conections.
  226. */
  227. HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
  228. @safe {
  229. return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
  230. }
  231. /// ditto
  232. HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
  233. @safe {
  234. void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
  235. @safe {
  236. auto pUpgrade = "Upgrade" in req.headers;
  237. auto pConnection = "Connection" in req.headers;
  238. auto pKey = "Sec-WebSocket-Key" in req.headers;
  239. //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
  240. auto pVersion = "Sec-WebSocket-Version" in req.headers;
  241. auto isUpgrade = false;
  242. if( pConnection ) {
  243. auto connectionTypes = splitter(*pConnection, ",");
  244. foreach( t ; connectionTypes ) {
  245. if( t.strip().asLowerCase().equal("upgrade") ) {
  246. isUpgrade = true;
  247. break;
  248. }
  249. }
  250. }
  251. if( !(isUpgrade &&
  252. pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
  253. pKey &&
  254. pVersion && *pVersion == "13") )
  255. {
  256. logDebug("Browser sent invalid WebSocket request.");
  257. res.statusCode = HTTPStatus.badRequest;
  258. res.writeVoidBody();
  259. return;
  260. }
  261. auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
  262. res.headers["Sec-WebSocket-Accept"] = accept;
  263. res.headers["Connection"] = "Upgrade";
  264. res.switchProtocol("websocket", (scope conn) {
  265. // TODO: put back 'scope' once it is actually enforced by DMD
  266. /*scope*/ auto socket = new WebSocket(conn, req, res);
  267. try on_handshake(socket);
  268. catch (Exception e) {
  269. logDiagnostic("WebSocket handler failed: %s", e.msg);
  270. }
  271. socket.close();
  272. });
  273. }
  274. return &callback;
  275. }
  276. /// Scheduled for deprecation - use a `@safe` callback instead.
  277. HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
  278. @system {
  279. return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
  280. }
  281. /// Scheduled for deprecation - use a `@safe` callback instead.
  282. HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
  283. @system {
  284. return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
  285. }
  286. /// Scheduled for deprecation - use a `nothrow` callback instead.
  287. HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
  288. {
  289. return handleWebSockets(delegate (scope ws) nothrow {
  290. try on_handshake(ws);
  291. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  292. });
  293. }
  294. /// ditto
  295. HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
  296. {
  297. return handleWebSockets(delegate (scope ws) nothrow {
  298. try on_handshake(ws);
  299. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  300. });
  301. }
  302. /// ditto
  303. HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
  304. @system {
  305. return handleWebSockets(delegate (scope ws) nothrow {
  306. try on_handshake(ws);
  307. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  308. });
  309. }
  310. /// ditto
  311. HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
  312. @system {
  313. return handleWebSockets(delegate (scope ws) nothrow {
  314. try on_handshake(ws);
  315. catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
  316. });
  317. }
  318. /**
  319. * Provides the reason that a websocket connection has closed.
  320. *
  321. * Further documentation for the WebSocket and it's codes can be found from:
  322. * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
  323. *
  324. * ---
  325. *
  326. * void echoSocket(scope WebSocket sock)
  327. * {
  328. * import std.datetime : seconds;
  329. *
  330. * while(sock.waitForData(3.seconds))
  331. * {
  332. * string msg = sock.receiveText;
  333. * logInfo("Got a message: %s", msg);
  334. * sock.send(msg);
  335. * }
  336. *
  337. * if(sock.connected)
  338. * sock.close(WebSocketCloseReason.policyViolation, "timeout");
  339. * }
  340. * ---
  341. */
  342. enum WebSocketCloseReason : short
  343. {
  344. none = 0,
  345. normalClosure = 1000,
  346. goingAway = 1001,
  347. protocolError = 1002,
  348. unsupportedData = 1003,
  349. noStatusReceived = 1005,
  350. abnormalClosure = 1006,
  351. invalidFramePayloadData = 1007,
  352. policyViolation = 1008,
  353. messageTooBig = 1009,
  354. internalError = 1011,
  355. serviceRestart = 1012,
  356. tryAgainLater = 1013,
  357. badGateway = 1014,
  358. tlsHandshake = 1015
  359. }
  360. string closeReasonString(WebSocketCloseReason reason) @nogc @safe
  361. {
  362. import std.math : floor;
  363. //round down to the nearest thousand to get category
  364. switch(cast(short)(cast(float)reason / 1000f).floor)
  365. {
  366. case 0:
  367. return "Reserved and Unused";
  368. case 1:
  369. switch(reason)
  370. {
  371. case 1000:
  372. return "Normal Closure";
  373. case 1001:
  374. return "Going Away";
  375. case 1002:
  376. return "Protocol Error";
  377. case 1003:
  378. return "Unsupported Data";
  379. case 1004:
  380. return "RESERVED";
  381. case 1005:
  382. return "No Status Recvd";
  383. case 1006:
  384. return "Abnormal Closure";
  385. case 1007:
  386. return "Invalid Frame Payload Data";
  387. case 1008:
  388. return "Policy Violation";
  389. case 1009:
  390. return "Message Too Big";
  391. case 1010:
  392. return "Missing Extension";
  393. case 1011:
  394. return "Internal Error";
  395. case 1012:
  396. return "Service Restart";
  397. case 1013:
  398. return "Try Again Later";
  399. case 1014:
  400. return "Bad Gateway";
  401. case 1015:
  402. return "TLS Handshake";
  403. default:
  404. return "RESERVED";
  405. }
  406. case 2:
  407. return "Reserved for extensions";
  408. case 3:
  409. return "Available for frameworks and libraries";
  410. case 4:
  411. return "Available for applications";
  412. default:
  413. return "UNDEFINED - Nasal Demons";
  414. }
  415. }
  416. unittest
  417. {
  418. assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused");
  419. assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused");
  420. assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
  421. assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
  422. assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
  423. assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
  424. assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
  425. assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
  426. assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
  427. assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons");
  428. //check the other spec cases
  429. for(short i = 1000; i < 1017; i++)
  430. {
  431. if(i == 1004 || i > 1015)
  432. {
  433. assert(
  434. (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
  435. "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
  436. );
  437. }
  438. else
  439. assert(
  440. (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
  441. "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
  442. );
  443. }
  444. }
  445. /**
  446. * Represents a single _WebSocket connection.
  447. *
  448. * ---
  449. * int main (string[] args)
  450. * {
  451. * auto taskHandle = runTask(() => connectToWS());
  452. * return runApplication(&args);
  453. * }
  454. *
  455. * void connectToWS ()
  456. * {
  457. * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
  458. * auto ws = connectWebSocket(ws_url);
  459. * logInfo("WebSocket connected");
  460. *
  461. * while (ws.waitForData())
  462. * {
  463. * auto txt = ws.receiveText;
  464. * logInfo("Received: %s", txt);
  465. * }
  466. * logFatal("Connection lost!");
  467. * }
  468. * ---
  469. */
  470. final class WebSocket {
  471. @safe:
  472. private {
  473. ConnectionStream m_conn;
  474. bool m_sentCloseFrame = false;
  475. IncomingWebSocketMessage m_nextMessage = null;
  476. HTTPServerRequest m_request;
  477. HTTPServerResponse m_serverResponse;
  478. HTTPClientResponse m_clientResponse;
  479. Task m_reader;
  480. Task m_ownerTask;
  481. InterruptibleTaskMutex m_readMutex, m_writeMutex;
  482. InterruptibleTaskCondition m_readCondition;
  483. Timer m_pingTimer;
  484. uint m_lastPingIndex;
  485. bool m_pongReceived;
  486. short m_closeCode;
  487. const(char)[] m_closeReason;
  488. /// The entropy generator to use
  489. /// If not null, it means this is a server socket.
  490. RandomNumberStream m_rng;
  491. }
  492. scope:
  493. /**
  494. * Private constructor, called from `connectWebSocket`.
  495. *
  496. * Params:
  497. * conn = Underlying connection string
  498. * request = HTTP request used to establish the connection
  499. * rng = Source of entropy to use. If null, assume we're a server socket
  500. * client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
  501. */
  502. private this(ConnectionStream conn, HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
  503. {
  504. m_ownerTask = Task.getThis();
  505. m_conn = conn;
  506. m_request = request;
  507. m_clientResponse = client_res;
  508. m_serverResponse = server_res;
  509. assert(m_conn);
  510. m_rng = rng;
  511. m_writeMutex = new InterruptibleTaskMutex;
  512. m_readMutex = new InterruptibleTaskMutex;
  513. m_readCondition = new InterruptibleTaskCondition(m_readMutex);
  514. m_readMutex.performLocked!({
  515. // NOTE: Silencing scope warning here - m_reader MUST be stopped
  516. // before the end of the lifetime of the WebSocket object,
  517. // which is done in the mandatory call to close().
  518. // The same goes for m_pingTimer below.
  519. m_reader = () @trusted { return runTask(&startReader); } ();
  520. if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
  521. m_pongReceived = true;
  522. m_pingTimer = () @trusted { return setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); } ();
  523. }
  524. });
  525. }
  526. private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
  527. {
  528. this(conn, null, null, rng, client_res);
  529. }
  530. private this(ConnectionStream conn, HTTPServerRequest request, HTTPServerResponse res)
  531. {
  532. this(conn, request, res, null, null);
  533. }
  534. /**
  535. Determines if the WebSocket connection is still alive and ready for sending.
  536. Note that for determining the ready state for $(EM reading), you need
  537. to use $(D waitForData) instead, because both methods can return
  538. different values while a disconnect is in proress.
  539. See_also: $(D waitForData)
  540. */
  541. @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
  542. /**
  543. Returns the close code sent by the remote end.
  544. Note if the connection was never opened, is still alive, or was closed
  545. locally this value will be 0. If no close code was given by the remote
  546. end in the close frame, the value will be 1005. If the connection was
  547. not closed cleanly by the remote end, this value will be 1006.
  548. */
  549. @property short closeCode() { return m_closeCode; }
  550. /**
  551. Returns the close reason sent by the remote end.
  552. Note if the connection was never opened, is still alive, or was closed
  553. locally this value will be an empty string.
  554. */
  555. @property const(char)[] closeReason() { return m_closeReason; }
  556. /**
  557. The HTTP request that established the web socket connection.
  558. */
  559. @property inout(HTTPServerRequest) request() inout { return m_request; }
  560. /**
  561. Checks if data is readily available for read.
  562. */
  563. @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
  564. /** Waits until either a message arrives or until the connection is closed.
  565. This function can be used in a read loop to cleanly determine when to stop reading.
  566. */
  567. bool waitForData()
  568. {
  569. if (m_nextMessage) return true;
  570. m_readMutex.performLocked!({
  571. while (connected && m_nextMessage is null)
  572. m_readCondition.wait();
  573. });
  574. return m_nextMessage !is null;
  575. }
  576. /// ditto
  577. bool waitForData(Duration timeout)
  578. {
  579. import std.datetime;
  580. if (m_nextMessage) return true;
  581. immutable limit_time = Clock.currTime(UTC()) + timeout;
  582. m_readMutex.performLocked!({
  583. while (connected && m_nextMessage is null && timeout > 0.seconds) {
  584. m_readCondition.wait(timeout);
  585. timeout = limit_time - Clock.currTime(UTC());
  586. }
  587. });
  588. return m_nextMessage !is null;
  589. }
  590. /**
  591. Sends a text message.
  592. On the JavaScript side, the text will be available as message.data (type string).
  593. Throws:
  594. A `WebSocketException` is thrown if the connection gets closed
  595. before or during the transfer of the message.
  596. */
  597. void send(scope const(char)[] data)
  598. {
  599. send(
  600. (scope message) { message.write(cast(const ubyte[])data); },
  601. FrameOpcode.text);
  602. }
  603. /**
  604. Sends a binary message.
  605. On the JavaScript side, the text will be available as message.data (type Blob).
  606. Throws:
  607. A `WebSocketException` is thrown if the connection gets closed
  608. before or during the transfer of the message.
  609. */
  610. void send(in ubyte[] data)
  611. {
  612. send((scope message){ message.write(data); }, FrameOpcode.binary);
  613. }
  614. /**
  615. Sends a message using an output stream.
  616. Throws:
  617. A `WebSocketException` is thrown if the connection gets closed
  618. before or during the transfer of the message.
  619. */
  620. void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
  621. {
  622. m_writeMutex.performLocked!({
  623. enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
  624. /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
  625. scope(exit) message.finalize();
  626. sender(message);
  627. });
  628. }
  629. /**
  630. Actively closes the connection.
  631. Params:
  632. code = Numeric code indicating a termination reason.
  633. reason = Message describing why the connection was terminated.
  634. */
  635. void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
  636. {
  637. import std.algorithm.comparison : min;
  638. if(reason !is null && reason.length == 0)
  639. reason = (cast(WebSocketCloseReason)code).closeReasonString;
  640. //control frame payloads are limited to 125 bytes
  641. version(assert)
  642. assert(reason.length <= 123);
  643. else
  644. reason = reason[0 .. min($, 123)];
  645. if (connected) {
  646. try {
  647. send((scope msg) {
  648. m_sentCloseFrame = true;
  649. if (code != 0) {
  650. msg.write(std.bitmanip.nativeToBigEndian(code));
  651. msg.write(cast(const ubyte[])reason);
  652. }
  653. }, FrameOpcode.close);
  654. } catch (Exception e) {
  655. logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
  656. }
  657. }
  658. if (m_pingTimer) m_pingTimer.stop();
  659. if (Task.getThis() == m_ownerTask) {
  660. m_writeMutex.performLocked!({
  661. if (m_clientResponse) {
  662. m_clientResponse.disconnect();
  663. m_clientResponse = HTTPClientResponse.init;
  664. }
  665. if (m_serverResponse) {
  666. m_serverResponse.finalize();
  667. m_serverResponse = HTTPServerResponse.init;
  668. }
  669. });
  670. m_reader.join();
  671. () @trusted { destroy(m_conn); } ();
  672. m_conn = ConnectionStream.init;
  673. }
  674. }
  675. /**
  676. Receives a new message and returns its contents as a newly allocated data array.
  677. Params:
  678. strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
  679. Throws: WebSocketException if the connection is closed or
  680. if $(D strict == true) and the frame received is not the right type
  681. */
  682. ubyte[] receiveBinary(bool strict = true)
  683. {
  684. ubyte[] ret;
  685. receive((scope message){
  686. enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
  687. "Expected a binary message, got "~message.frameOpcode.to!string());
  688. ret = message.readAll();
  689. });
  690. return ret;
  691. }
  692. /// ditto
  693. string receiveText(bool strict = true)
  694. {
  695. string ret;
  696. receive((scope message){
  697. enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
  698. "Expected a text message, got "~message.frameOpcode.to!string());
  699. ret = message.readAllUTF8();
  700. });
  701. return ret;
  702. }
  703. /**
  704. Receives a new message using an InputStream.
  705. Throws: WebSocketException if the connection is closed.
  706. */
  707. void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
  708. {
  709. m_readMutex.performLocked!({
  710. while (!m_nextMessage) {
  711. enforce!WebSocketException(connected, "Connection closed while reading message.");
  712. m_readCondition.wait();
  713. }
  714. receiver(m_nextMessage);
  715. m_nextMessage = null;
  716. m_readCondition.notifyAll();
  717. });
  718. }
  719. private void startReader()
  720. nothrow {
  721. try m_readMutex.performLocked!({}); //Wait until initialization
  722. catch (Exception e) {
  723. logException(e, "WebSocket reader task failed to wait for initialization");
  724. try m_conn.close();
  725. catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
  726. m_closeCode = WebSocketCloseReason.abnormalClosure;
  727. try m_readCondition.notifyAll();
  728. catch (Exception e) assert(false, e.msg);
  729. return;
  730. }
  731. try {
  732. loop:
  733. while (!m_conn.empty) {
  734. assert(!m_nextMessage);
  735. /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
  736. switch (msg.frameOpcode) {
  737. default: throw new WebSocketException("unknown frame opcode");
  738. case FrameOpcode.ping:
  739. send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
  740. break;
  741. case FrameOpcode.pong:
  742. // test if pong matches previous ping
  743. if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
  744. logDebugV("Received PONG that doesn't match previous ping.");
  745. break;
  746. }
  747. logDebugV("Received matching PONG.");
  748. m_pongReceived = true;
  749. break;
  750. case FrameOpcode.close:
  751. logDebug("Got closing frame (%s)", m_sentCloseFrame);
  752. // If no close code was passed, we default to 1005
  753. this.m_closeCode = WebSocketCloseReason.noStatusReceived;
  754. // If provided in the frame, attempt to parse the close code/reason
  755. if (msg.peek().length >= short.sizeof) {
  756. this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
  757. if (msg.peek().length > short.sizeof) {
  758. this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
  759. }
  760. }
  761. if(!m_sentCloseFrame) close();
  762. logDebug("Terminating connection (%s)", m_sentCloseFrame);
  763. break loop;
  764. case FrameOpcode.text:
  765. case FrameOpcode.binary:
  766. case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
  767. m_readMutex.performLocked!({
  768. m_nextMessage = msg;
  769. m_readCondition.notifyAll();
  770. while (m_nextMessage) m_readCondition.wait();
  771. });
  772. break;
  773. }
  774. }
  775. } catch (Exception e) {
  776. logDiagnostic("Error while reading websocket message: %s", e.msg);
  777. logDiagnostic("Closing connection.");
  778. }
  779. // If no close code was passed, e.g. this was an unclean termination
  780. // of our websocket connection, set the close code to 1006.
  781. if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
  782. try m_conn.close();
  783. catch (Exception e) logException(e, "Failed to close WebSocket connection");
  784. try m_readCondition.notifyAll();
  785. catch (Exception e) assert(false, e.msg);
  786. }
  787. private void sendPing()
  788. nothrow {
  789. try {
  790. if (!m_pongReceived) {
  791. logDebug("Pong skipped. Closing connection.");
  792. close();
  793. try m_readCondition.notifyAll();
  794. catch (Exception e) assert(false, e.msg);
  795. return;
  796. }
  797. m_pongReceived = false;
  798. send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
  799. logDebugV("Ping sent");
  800. } catch (Exception e) {
  801. logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
  802. }
  803. }
  804. }
  805. /**
  806. Represents a single outgoing _WebSocket message as an OutputStream.
  807. */
  808. final class OutgoingWebSocketMessage : OutputStream {
  809. @safe:
  810. private {
  811. RandomNumberStream m_rng;
  812. Stream m_conn;
  813. FrameOpcode m_frameOpcode;
  814. Appender!(ubyte[]) m_buffer;
  815. bool m_finalized = false;
  816. }
  817. private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
  818. {
  819. assert(conn !is null);
  820. m_conn = conn;
  821. m_frameOpcode = frameOpcode;
  822. m_rng = rng;
  823. }
  824. static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
  825. override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
  826. } else {
  827. override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
  828. }
  829. alias write = OutputStream.write;
  830. private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode)
  831. {
  832. assert(!m_finalized);
  833. if (!m_buffer.data.length) {
  834. ubyte[Frame.maxHeaderSize] header_padding;
  835. m_buffer.put(header_padding[]);
  836. }
  837. m_buffer.put(bytes);
  838. return bytes.length;
  839. }
  840. void flush()
  841. {
  842. assert(!m_finalized);
  843. if (m_buffer.data.length > 0)
  844. sendFrame(false);
  845. }
  846. void finalize()
  847. {
  848. if (m_finalized) return;
  849. m_finalized = true;
  850. sendFrame(true);
  851. }
  852. private void sendFrame(bool fin)
  853. {
  854. if (!m_buffer.data.length)
  855. write(null, IOMode.once);
  856. assert(m_buffer.data.length >= Frame.maxHeaderSize);
  857. Frame frame;
  858. frame.fin = fin;
  859. frame.opcode = m_frameOpcode;
  860. frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
  861. auto hsize = frame.getHeaderSize(m_rng !is null);
  862. auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
  863. frame.writeHeader(msg[0 .. hsize], m_rng);
  864. m_conn.write(msg);
  865. m_conn.flush();
  866. m_buffer.clear();
  867. }
  868. alias write = OutputStream.write;
  869. }
  870. /**
  871. Represents a single incoming _WebSocket message as an InputStream.
  872. */
  873. final class IncomingWebSocketMessage : InputStream {
  874. @safe:
  875. private {
  876. RandomNumberStream m_rng;
  877. Stream m_conn;
  878. Frame m_currentFrame;
  879. }
  880. private this(Stream conn, RandomNumberStream rng)
  881. {
  882. assert(conn !is null);
  883. m_conn = conn;
  884. m_rng = rng;
  885. skipFrame(); // reads the first frame
  886. }
  887. @property bool empty() const { return m_currentFrame.payload.length == 0; }
  888. @property ulong leastSize() const { return m_currentFrame.payload.length; }
  889. @property bool dataAvailableForRead() { return true; }
  890. /// The frame type for this nessage;
  891. @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
  892. const(ubyte)[] peek() { return m_currentFrame.payload; }
  893. /**
  894. * Retrieve the next websocket frame of the stream and discard the current
  895. * one
  896. *
  897. * This function is helpful if one wish to process frames by frames,
  898. * or minimize memory allocation, as `peek` will only return the current
  899. * frame data, and read requires a pre-allocated buffer.
  900. *
  901. * Returns:
  902. * `false` if the current frame is the final one, `true` if a new frame
  903. * was read.
  904. */
  905. bool skipFrame()
  906. {
  907. if (m_currentFrame.fin)
  908. return false;
  909. m_currentFrame = Frame.readFrame(m_conn);
  910. return true;
  911. }
  912. size_t read(scope ubyte[] dst, IOMode mode)
  913. {
  914. size_t nread = 0;
  915. while (dst.length > 0) {
  916. enforce!WebSocketException(!empty , "cannot read from empty stream");
  917. enforce!WebSocketException(leastSize > 0, "no data available" );
  918. import std.algorithm : min;
  919. auto sz = cast(size_t)min(leastSize, dst.length);
  920. dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
  921. dst = dst[sz .. $];
  922. m_currentFrame.payload = m_currentFrame.payload[sz .. $];
  923. nread += sz;
  924. if (leastSize == 0) {
  925. if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
  926. break;
  927. this.skipFrame();
  928. }
  929. }
  930. return nread;
  931. }
  932. alias read = InputStream.read;
  933. }
  934. /// Magic string defined by the RFC for challenging the server during upgrade
  935. private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
  936. /**
  937. * The Opcode is 4 bits, as defined in Section 5.2
  938. *
  939. * Values are defined in section 11.8
  940. * Currently only 6 values are defined, however the opcode is defined as
  941. * taking 4 bits.
  942. */
  943. public enum FrameOpcode : ubyte {
  944. continuation = 0x0,
  945. text = 0x1,
  946. binary = 0x2,
  947. close = 0x8,
  948. ping = 0x9,
  949. pong = 0xA
  950. }
  951. static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
  952. private struct Frame {
  953. @safe:
  954. enum maxHeaderSize = 14;
  955. bool fin;
  956. FrameOpcode opcode;
  957. ubyte[] payload;
  958. /**
  959. * Return the header length encoded with the expected amount of bits
  960. *
  961. * The WebSocket RFC define a variable-length payload length.
  962. * In short, it means that:
  963. * - If the length is <= 125, it is stored as the 7 least significant
  964. * bits of the second header byte. The first bit is reserved for MASK.
  965. * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
  966. * 126 is stored in the aforementioned 7 bits, and the actual length
  967. * is stored in the next two bytes, resulting in a 4 bytes header
  968. * ( + masking key, if any).
  969. * - If the length is > 65_536, a magic value of 127 will be used for
  970. * the 7-bit field, and the next 8 bytes are expected to be the length,
  971. * resulting in a 10 bytes header ( + masking key, if any).
  972. *
  973. * Those functions encapsulate all this logic and allow to just get the
  974. * length with the desired size.
  975. *
  976. * Return:
  977. * - For `ubyte`, the value to store in the 7 bits field, either the
  978. * length or a magic value (126 or 127).
  979. * - For `ushort`, a value in the range [126; 65_536].
  980. * If payload.length is not in this bound, an assertion will be triggered.
  981. * - For `ulong`, a value in the range [65_537; size_t.max].
  982. * If payload.length is not in this bound, an assertion will be triggered.
  983. */
  984. size_t getHeaderSize(bool mask)
  985. {
  986. size_t ret = 1;
  987. if (payload.length < 126) ret += 1;
  988. else if (payload.length < 65536) ret += 3;
  989. else ret += 9;
  990. if (mask) ret += 4;
  991. return ret;
  992. }
  993. void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
  994. {
  995. ubyte[4] buff;
  996. ubyte firstByte = cast(ubyte)opcode;
  997. if (fin) firstByte |= 0x80;
  998. dst[0] = firstByte;
  999. dst = dst[1 .. $];
  1000. auto b1 = sys_rng ? 0x80 : 0x00;
  1001. if (payload.length < 126) {
  1002. dst[0] = cast(ubyte)(b1 | payload.length);
  1003. dst = dst[1 .. $];
  1004. } else if (payload.length < 65536) {
  1005. dst[0] = cast(ubyte) (b1 | 126);
  1006. dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
  1007. dst = dst[3 .. $];
  1008. } else {
  1009. dst[0] = cast(ubyte) (b1 | 127);
  1010. dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
  1011. dst = dst[9 .. $];
  1012. }
  1013. if (sys_rng) {
  1014. sys_rng.read(dst[0 .. 4]);
  1015. for (size_t i = 0; i < payload.length; i++)
  1016. payload[i] ^= dst[i % 4];
  1017. }
  1018. }
  1019. static Frame readFrame(InputStream stream)
  1020. {
  1021. Frame frame;
  1022. ubyte[8] data;
  1023. stream.read(data[0 .. 2]);
  1024. frame.fin = (data[0] & 0x80) != 0;
  1025. frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
  1026. bool masked = !!(data[1] & 0b1000_0000);
  1027. //parsing length
  1028. ulong length = data[1] & 0b0111_1111;
  1029. if (length == 126) {
  1030. stream.read(data[0 .. 2]);
  1031. length = bigEndianToNative!ushort(data[0 .. 2]);
  1032. } else if (length == 127) {
  1033. stream.read(data);
  1034. length = bigEndianToNative!ulong(data);
  1035. // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
  1036. // interpreted as a 64-bit unsigned integer (the most significant
  1037. // bit MUST be 0)
  1038. enforce!WebSocketException(!(length >> 63),
  1039. "Received length has a non-zero most significant bit");
  1040. }
  1041. logDebug("Read frame: %s %s %s length=%d",
  1042. frame.opcode,
  1043. frame.fin ? "final frame" : "continuation",
  1044. masked ? "masked" : "not masked",
  1045. length);
  1046. // Masking key is 32 bits / uint
  1047. if (masked)
  1048. stream.read(data[0 .. 4]);
  1049. // Read payload
  1050. // TODO: Provide a way to limit the size read, easy
  1051. // DOS for server code here (rejectedsoftware/vibe.d#1496).
  1052. enforce!WebSocketException(length <= size_t.max);
  1053. frame.payload = new ubyte[](cast(size_t)length);
  1054. stream.read(frame.payload);
  1055. //de-masking
  1056. if (masked)
  1057. foreach (size_t i; 0 .. cast(size_t)length)
  1058. frame.payload[i] = frame.payload[i] ^ data[i % 4];
  1059. return frame;
  1060. }
  1061. }
  1062. unittest {
  1063. import std.algorithm.searching : all;
  1064. final class DummyRNG : RandomNumberStream {
  1065. @safe:
  1066. @property bool empty() { return false; }
  1067. @property ulong leastSize() { return ulong.max; }
  1068. @property bool dataAvailableForRead() { return true; }
  1069. const(ubyte)[] peek() { return null; }
  1070. size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
  1071. alias read = RandomNumberStream.read;
  1072. }
  1073. ubyte[14] hdrbuf;
  1074. auto rng = new DummyRNG;
  1075. Frame f;
  1076. f.payload = new ubyte[125];
  1077. assert(f.getHeaderSize(false) == 2);
  1078. hdrbuf[] = 0;
  1079. f.writeHeader(hdrbuf[0 .. 2], null);
  1080. assert(hdrbuf[0 .. 2] == [0, 125]);
  1081. assert(f.getHeaderSize(true) == 6);
  1082. hdrbuf[] = 0;
  1083. f.writeHeader(hdrbuf[0 .. 6], rng);
  1084. assert(hdrbuf[0 .. 2] == [0, 128|125]);
  1085. assert(hdrbuf[2 .. 6].all!(b => b == 13));
  1086. f.payload = new ubyte[126];
  1087. assert(f.getHeaderSize(false) == 4);
  1088. hdrbuf[] = 0;
  1089. f.writeHeader(hdrbuf[0 .. 4], null);
  1090. assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
  1091. assert(f.getHeaderSize(true) == 8);
  1092. hdrbuf[] = 0;
  1093. f.writeHeader(hdrbuf[0 .. 8], rng);
  1094. assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
  1095. assert(hdrbuf[4 .. 8].all!(b => b == 13));
  1096. f.payload = new ubyte[65535];
  1097. assert(f.getHeaderSize(false) == 4);
  1098. hdrbuf[] = 0;
  1099. f.writeHeader(hdrbuf[0 .. 4], null);
  1100. assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
  1101. assert(f.getHeaderSize(true) == 8);
  1102. hdrbuf[] = 0;
  1103. f.writeHeader(hdrbuf[0 .. 8], rng);
  1104. assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
  1105. assert(hdrbuf[4 .. 8].all!(b => b == 13));
  1106. f.payload = new ubyte[65536];
  1107. assert(f.getHeaderSize(false) == 10);
  1108. hdrbuf[] = 0;
  1109. f.writeHeader(hdrbuf[0 .. 10], null);
  1110. assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
  1111. assert(f.getHeaderSize(true) == 14);
  1112. hdrbuf[] = 0;
  1113. f.writeHeader(hdrbuf[0 .. 14], rng);
  1114. assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
  1115. assert(hdrbuf[10 .. 14].all!(b => b == 13));
  1116. }
  1117. /**
  1118. * Generate a challenge key for the protocol upgrade phase.
  1119. */
  1120. private string generateChallengeKey(RandomNumberStream rng)
  1121. {
  1122. ubyte[16] buffer;
  1123. rng.read(buffer);
  1124. return Base64.encode(buffer);
  1125. }
  1126. private string computeAcceptKey(string challengekey)
  1127. {
  1128. immutable(ubyte)[] b = challengekey.representation;
  1129. immutable(ubyte)[] a = s_webSocketGuid.representation;
  1130. SHA1 hash;
  1131. hash.start();
  1132. hash.put(b);
  1133. hash.put(a);
  1134. auto result = Base64.encode(hash.finish());
  1135. return to!(string)(result);
  1136. }