app.d 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import std.stdio;
  2. import std.conv;
  3. import std.range;
  4. import std.datetime : SysTime, Clock;
  5. import core.sync.mutex : Mutex;
  6. import std.concurrency;
  7. import slf4d;
  8. import handy_httpd;
  9. import handy_httpd.components.websocket;
  10. import handy_httpd.handlers.path_handler;
  11. import handy_httpd.handlers.file_resolving_handler;
  12. struct UserState{
  13. string client_id;
  14. SysTime last_online_at;
  15. }
  16. UserState[string] userStates;
  17. Mutex userStatesMutex;
  18. class MyWebSocketRequestHandler : HttpRequestHandler{
  19. private Mutex userStatesMutex;
  20. this(Mutex mutex){
  21. this.userStatesMutex = mutex;
  22. writeln("DEBUG: MyWebSocketRequestHandler created");
  23. }
  24. //WebSocketMessageHandler factory;
  25. //this(WebSocketMessageHandler factory){
  26. // this.factory = factory;
  27. //}
  28. void handle(ref HttpRequestContext ctx){
  29. writeln("DEBUG: MyWebSocketRequestHandler.handle(...) called");
  30. bool isWebSocketUpgrade = false;
  31. string client_id = "";
  32. //writeln("ctx.request.headers = ", ctx.request.headers);
  33. //writeln("ctx.request = ", ctx.request);
  34. //writeln("ctx.request.queryParams = ", ctx.request.queryParams);
  35. //writeln("ctx.request.queryParams[\"client_id\"] = ", ctx.request.queryParams["client_id"]);
  36. if(ctx.request.queryParams.contains("client_id")){
  37. client_id = ctx.request.queryParams["client_id"];
  38. isWebSocketUpgrade = true;
  39. }
  40. //writeln("ctx.request.url = ", ctx.request.url);
  41. /*
  42. auto secWsKey = ctx.request.headers.getFirst("Sec-WebSocket-Key");
  43. if( (!(client_id.empty)) && (secWsKey !is null) && (secWsKey.get().length > 0) ){
  44. isWebSocketUpgrade = true;
  45. }
  46. */
  47. /*
  48. if(!isWebSocketUpgrade){
  49. throw new HttpStatusException(HttpStatus.UPGRADE_REQUIRED);
  50. }
  51. if(client_id.empty){
  52. throw new HttpStatusException(HttpStatus.BAD_REQUEST, "Missing X-User-ID header");
  53. }
  54. */
  55. if(!isWebSocketUpgrade || client_id.empty){
  56. throw new HttpStatusException(HttpStatus.BAD_REQUEST, "Invalid WebSocket handshake");
  57. }
  58. writeln("DEBUG: Valid WS upgrade for client: ", client_id);
  59. auto handler = new MyWebSocketHandler(client_id, this.userStatesMutex);
  60. ctx.server.getWebSocketManager().registerConnection(ctx, handler);
  61. }
  62. }
  63. class MyWebSocketHandler : WebSocketMessageHandler{
  64. private string client_id;
  65. private Mutex userStatesMutex;
  66. this(){
  67. this.client_id = "not_found";
  68. this.userStatesMutex = null;
  69. writeln("DEBUG: MyWebSocketHandler created with client_id: ", this.client_id);
  70. }
  71. this(string client_id, Mutex mutex){
  72. this.client_id = client_id.idup;
  73. this.userStatesMutex = mutex;
  74. writeln("DEBUG: MyWebSocketHandler created with client_id: ", this.client_id);
  75. }
  76. override void onConnectionEstablished(WebSocketConnection conn, in HttpRequest request){
  77. if(this.userStatesMutex is null){
  78. writeln("ERROR: this.userStatesMutex not initialized!");
  79. //assert(false);
  80. return;
  81. }
  82. this.userStatesMutex.lock();
  83. scope(exit) this.userStatesMutex.unlock();
  84. if(client_id in userStates){
  85. writeln("Reconnect detected for client: ", client_id);
  86. }else{
  87. writeln("New connection for client: ", client_id);
  88. userStates[client_id] = UserState(client_id, Clock.currTime());
  89. //existingState = new UserState();
  90. //existingState.client_id = client_id.idup;
  91. //existingState.connectedAt = Clock.currTime();
  92. }
  93. writeln("DEBUG: onConnectionEstablished(...) called for client: ", client_id);
  94. infoF!"Connection established with id %s"(conn.id); // id 8a4429e7-19b8-4dc4-bca6-14f64a471392
  95. conn.sendTextMessage("Hey yourself!");
  96. }
  97. override void onTextMessage(WebSocketTextMessage msg){
  98. infoF!"Got TEXT: %s"(msg.payload);
  99. writeln("Message from client: ", client_id);
  100. //writeln("Message from client: ", userStates[client_id]);
  101. //msg.conn.sendTextMessage("Hey yourself!");
  102. msg.conn.sendTextMessage(msg.payload ~ " :)");
  103. }
  104. override void onCloseMessage(WebSocketCloseMessage msg){
  105. /*
  106. this.userStatesMutex.lock();
  107. scope(exit) this.userStatesMutex.unlock();
  108. if(client_id in userStates){
  109. userStates.remove(client_id);
  110. }
  111. */
  112. infoF!"Closed: %d, %s"(msg.statusCode, msg.message);
  113. }
  114. }
  115. void main(string[] args){
  116. ServerConfig cfg;
  117. if(args.length > 1){
  118. cfg.port = args[1].to!ushort;
  119. }
  120. userStatesMutex = new Mutex();
  121. cfg.workerPoolSize = 3;
  122. cfg.enableWebSockets = true; // Important! Websockets won't work unless `enableWebSockets` is set to true!
  123. //WebSocketHandler handler = new WebSocketHandler(new MyWebSocketHandler());
  124. //auto handler = new WebSocketHandler(new MyWebSocketHandler("123"));
  125. auto handler = new MyWebSocketRequestHandler(userStatesMutex);
  126. //new HttpServer(new FileResolvingHandler("public"), cfg).start();
  127. PathHandler pathHandler = new PathHandler()
  128. .addMapping(Method.GET, "/ws", handler)
  129. .addMapping("/**", new FileResolvingHandler("public"));
  130. new HttpServer(pathHandler, cfg).start();
  131. }