Browse Source

send ws token + mutex - clear worker + fix ws after disconnect

221V 3 days ago
parent
commit
565169ffef
3 changed files with 163 additions and 10 deletions
  1. 25 1
      vtest/public/index.html
  2. 130 2
      vtest/source/app.d
  3. 8 7
      vtest/source/ws_bert_login.d

+ 25 - 1
vtest/public/index.html

@@ -30,6 +30,27 @@
 </main>
 
 <script>
+async function generateCookie(){
+  const idLength = 15; // 15 bytes = 120 bits
+  const randomBytes = new Uint8Array(idLength);
+  window.crypto.getRandomValues(randomBytes); // crypto strong random bytes
+  
+  const hashBuffer = await window.crypto.subtle.digest("SHA-256", randomBytes.buffer); // sha-256 hash
+  const idBytes = new Uint8Array(hashBuffer.slice(0, 15)); // take first 15 bytes of hash
+  const base64 = btoa(String.fromCharCode.apply(null, idBytes)) // base64 encode (url safe)
+    .replace(/\+/g, '-')
+    .replace(/\//g, '_')
+    .replace(/=+$/, '');
+  return base64;
+}
+
+var cookie_user_id = localStorage.getItem('user_id');
+if(!(cookie_user_id)){
+  generateCookie().then(value => { localStorage.setItem('user_id', value); cookie_user_id = value; });
+}
+
+
+
 var websocket;
 var server = document.getElementById('server');
 var message = document.getElementById('message');
@@ -43,7 +64,10 @@ connected.style.display = 'none';
 content.style.display = 'none';
 
 function connect(){
-  wsHost = server.value;
+  //wsHost = server.value;
+  wsHost = server.value + '?client_id=' + cookie_user_id;
+  console.log('cookie_user_id = ', cookie_user_id);
+  console.log('wsHost = ', wsHost);
   websocket = new WebSocket(wsHost);
   showScreen('<b>Connecting to: ' +  wsHost + '</b>');
   websocket.onopen = function(evt){ onOpen(evt) };

+ 130 - 2
vtest/source/app.d

@@ -15,7 +15,74 @@ import ws_bert_login : ws_bert_handle, login_test; // login - logged - logout --
 
 
 import std.string;
-//import std.array;
+import std.array;
+import std.algorithm;
+
+
+import std.datetime : SysTime, Clock; // , dur;
+//import core.sync.mutex : Mutex;
+import std.concurrency : spawn;
+import vibe.core.concurrency;
+import vibe.core.core : Task, sleep;
+import core.time : Duration, dur;
+
+
+struct UserState{
+  string client_id;
+  SysTime last_online_at;
+  
+  this(string client_id, SysTime last_online_at) pure nothrow @safe{
+    this.client_id = client_id;
+    this.last_online_at = last_online_at;
+  }
+}
+
+alias UserStateMap = UserState[string];
+
+class UserStateManager{
+  private UserStateMap states;
+  private Object lockObj;
+  
+  this(){
+    lockObj = new Object();
+  }
+  
+  bool contains(string client_id){
+    synchronized(lockObj){
+      return (client_id in states) !is null;
+    }
+  }
+  
+  void addOrUpdate(string client_id){
+    synchronized(lockObj){
+      auto now = Clock.currTime();
+      states.remove(client_id);
+      states[client_id] = UserState(client_id, now);
+    }
+  }
+  
+  void cleanup(){
+    synchronized(lockObj){
+      auto now = Clock.currTime();
+      auto toRemove = states.byKey
+        .filter!(k => (now - states[k].last_online_at).total!"seconds" > 30)
+        .array;
+      
+      foreach(client_id; toRemove){
+        states.remove(client_id);
+      }
+    }
+  }
+  
+  size_t length() const{
+    synchronized(lockObj){
+      return states.length;
+    }
+  }
+}
+
+__gshared UserStateManager userStateManager;
+
 
 
 import memcached4d;
@@ -87,6 +154,16 @@ string test_args_types_mismash2(test_key3 x, test_key4 y){
 
 
 
+void startCleanupTask(){
+  while(true){
+    writeln("startCleanupTask();");
+    userStateManager.cleanup();
+    //sleep(dur!"hours"(1)); // every 1 hour = 3600 sec
+    sleep(dur!"seconds"(30)); // every 30 sec
+  }
+}
+
+
 
 void main(){
   read_settings_toml(); // read settings, settings validation
@@ -113,6 +190,12 @@ void main(){
   }
   
   
+  
+  userStateManager = new UserStateManager();
+  userStateManager.addOrUpdate("123");
+  
+  
+  
   auto settings = new HTTPServerSettings;
   //settings.port = 8080;
   //settings.bindAddresses = ["::1", "127.0.0.1"];
@@ -141,6 +224,14 @@ void main(){
     listener.stopListening();
   }
   
+  
+  
+  //writeln("userStateManager.states is null? ", userStateManager.states is null);
+  //sleep(dur!"seconds"(1));
+  spawn(&startCleanupTask); // clean inactive user state
+  
+  
+  
   logInfo("Please open http://127.0.0.1:8080/ in your browser.");
   runApplication();
 }
@@ -148,10 +239,47 @@ void main(){
 
 void ws_handle(scope WebSocket sock){
   // simple echo server + :)
-  while(sock.connected){
+  
+  //writeln("sock = ", sock); // vibe.http.websockets.WebSocket
+  //writeln("sock.request = ", sock.request); // GET /ws?client_id=YaHoAnZo3JPYOwX7yn35 HTTP/1.1
+  //writeln("sock.request.requestPath = ", sock.request.requestPath); // /ws
+  //writeln("sock.request.queryString = ", sock.request.queryString); // client_id=YaHoAnZo3JPYOwX7yn35
+  //writeln("sock.request.query = ", sock.request.query); // ["client_id": "YaHoAnZo3JPYOwX7yn35"]
+  
+  string client_id = "";
+  if("client_id" in sock.request.query){
+    client_id = sock.request.query["client_id"];
+    //writeln("found client_id = ", client_id);
+  }else{
+    //writeln("client_id not found");
+    throw new StringException("client_id not found");
+  }
+  
+  bool is_new_client = true;
+  if(userStateManager.contains(client_id)){
+    is_new_client = false;
+    writeln("Reconnection from existing client: ", client_id);
+  }else{
+    writeln("New client connected: ", client_id);
+  }
+  userStateManager.addOrUpdate(client_id);
+  
+  /*
+  try{
+    while(sock.connected){
+      auto msg = sock.receiveText();
+      sock.send(msg ~ " :)");
+    }
+  }catch(Exception ex){
+    writeln("disconnected client_id = ", client_id);
+    writeln("Error: ", ex.msg); // Error: Connection closed while reading message.
+  }
+  */
+  while(sock.waitForData()){
     auto msg = sock.receiveText();
     sock.send(msg ~ " :)");
   }
+  writeln("after disconnect"); // now shows
 }
 
 /*

+ 8 - 7
vtest/source/ws_bert_login.d

@@ -72,20 +72,21 @@ void ws_bert_handle(scope WebSocket sock){
   //}
   // simple echo server + :)
   
-  //string client_id = req.attributes.get("client_id", "");
-  //writeln("96 client_id = ", client_id);
-  
   // https://vibed.org/api/vibe.http.websockets/WebSocket
   // https://vibed.org/api/vibe.http.websockets/WebSocket.request
   // https://vibed.org/api/vibe.http.server/HTTPServerRequest
   //writeln("sock.request = ", sock.request); // GET /ws_login_test HTTP/1.1
   //writeln("sock.request.headers = ", sock.request.headers); // ["Host": "127.0.0.1:8080", "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0", "Accept": "*/*", "Accept-Language": "uk-UA,uk;q=0.8,en-US;q=0.5,en;q=0.3", "Accept-Encoding": "gzip, deflate, br, zstd", "Sec-WebSocket-Version": "13", "Origin": "http://127.0.0.1:8080", "Sec-WebSocket-Extensions": "permessage-deflate", "Sec-WebSocket-Key": "NW+zQGtSdkuSRHWuU/VevA==", "DNT": "1", "Sec-GPC": "1", "Connection": "keep-alive, Upgrade", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "websocket", "Sec-Fetch-Site": "same-origin", "Pragma": "no-cache", "Cache-Control": "no-cache", "Upgrade": "websocket"]
   
-  //writeln("sock.request.context = ", sock.request.context); // []
-  //writeln("sock.request.params = ", sock.request.params); // []
-  //string client_id = req.params.get("client_id", "");
+  //writeln("sock.request = ", sock.request);
+  //writeln("sock.request.query = ", sock.request.query); // ["client_id": "YaHoAnZo3JPYOwX7yn35"]
+  
+  //string client_id = "";
+  //if("client_id" in sock.request.query){
+  //  client_id = sock.request.query["client_id"];
+  //}
   
-  while(sock.connected){
+  while(sock.waitForData()){
     //auto msg = sock.receiveText();
     //sock.send(msg ~ " :)");
     auto msg = sock.receiveBinary();