Просмотр исходного кода

mv mutex example to separated example

221V 1 неделя назад
Родитель
Сommit
74d6817288
3 измененных файлов с 165 добавлено и 91 удалено
  1. 2 0
      vtest/public/index.html
  2. 5 91
      vtest/source/app.d
  3. 158 0
      vtest/source/mutex_test.d

+ 2 - 0
vtest/public/index.html

@@ -9,6 +9,7 @@
 
 <header>
   <h1>Websocket test</h1>
+  <p>change <b>/ws</b> to <b>/ws_mutex</b> for mutex example<br>use <b>disconnect();</b> and <b>connect();</b> in browser console</p>
   <div id="status"></div>
 </header>
 
@@ -59,6 +60,7 @@ var connected = document.getElementById('connected');
 var content = document.getElementById('content');
 var output = document.getElementById('output');
 
+//server.value = 'ws://' + window.location.host + '/ws_mutex';
 server.value = 'ws://' + window.location.host + '/ws';
 connected.style.display = 'none';
 content.style.display = 'none';

+ 5 - 91
vtest/source/app.d

@@ -15,6 +15,8 @@ import ws_bert_login : ws_bert_handle, login_test; // login - logged - logout --
 
 import memcached_test : memcached_test; // memcached example
 
+import mutex_test : init_mutex, ws_mutex_handle; // mutex example
+
 
 import std.string;
 import std.array;
@@ -24,7 +26,6 @@ import std.variant : Variant;
 
 
 import std.datetime : SysTime, Clock;
-//import core.sync.mutex : Mutex;
 import std.concurrency : spawn;
 import vibe.core.concurrency;
 import vibe.core.core : Task, sleep;
@@ -67,65 +68,6 @@ struct Msg3{
 +/
 
 
-/*
-struct UserState{ // with mutex
-  string client_id;
-  SysTime last_online_at;
-  
-  this(string client_id, SysTime last_online_at) pure nothrow @safe{
-    this.client_id = client_id;
-    this.last_online_at = last_online_at;
-  }
-}
-
-alias UserStateMap = UserState[string];
-
-class UserStateManager{ // with mutex
-  private UserStateMap states;
-  private Object lockObj;
-  
-  this(){
-    lockObj = new Object();
-  }
-  
-  bool contains(string client_id){
-    synchronized(lockObj){
-      return (client_id in states) !is null;
-    }
-  }
-  
-  void addOrUpdate(string client_id){
-    synchronized(lockObj){
-      auto now = Clock.currTime();
-      states.remove(client_id);
-      states[client_id] = UserState(client_id, now);
-    }
-  }
-  
-  void cleanup(){
-    synchronized(lockObj){
-      auto now = Clock.currTime();
-      auto toRemove = states.byKey
-        .filter!(k => (now - states[k].last_online_at).total!"seconds" > 30)
-        .array;
-      
-      foreach(client_id; toRemove){
-        states.remove(client_id);
-      }
-    }
-  }
-  
-  size_t length() const{
-    synchronized(lockObj){
-      return states.length;
-    }
-  }
-}
-
-__gshared UserStateManager userStateManager;
-*/
-
-
 
 import std.conv : to;
 
@@ -193,16 +135,6 @@ string test_args_types_mismash2(test_key3 x, test_key4 y){
 }
 
 
-/*
-void startCleanupTask(){ // with mutex
-  while(true){
-    writeln("startCleanupTask();");
-    userStateManager.cleanup();
-    //sleep(dur!"hours"(1)); // every 1 hour = 3600 sec
-    sleep(dur!"seconds"(30)); // every 30 sec
-  }
-}
-*/
 
 /+
 void startCleanupTask2(){ // with message passing (actors model like in Erlang); worker -- err here - do not tick every 30 sec
@@ -309,13 +241,6 @@ void main(){
   
   
   
-  /*
-  userStateManager = new UserStateManager(); // with mutex
-  userStateManager.addOrUpdate("123");
-  */
-  
-  
-  
   auto settings = new HTTPServerSettings;
   //settings.port = 8080;
   //settings.bindAddresses = ["::1", "127.0.0.1"];
@@ -333,6 +258,7 @@ void main(){
   //router.get("/", staticTemplate!"index.html");
   router.get("/", serveStaticFile("public/index.html") ); // static html + ws echo example
   router.get("/ws", handleWebSockets(&ws_handle) ); // static html + ws echo example
+  router.get("/ws_mutex", handleWebSockets(&ws_mutex_handle) ); // static html + ws echo example + mutex example
   router.get("/test", &test); // Mustache template + postgresql pool example
   router.get("/ws_login_test", handleWebSockets(&ws_bert_handle) ); // ws handler begins from "ws_" and next same http page path // login - logged - logout -- via ws with bert
   router.get("/login_test", &login_test); // login - logged - logout -- via ws with bert
@@ -347,10 +273,9 @@ void main(){
   
   
   
-  //writeln("userStateManager.states is null? ", userStateManager.states is null);
-  //sleep(dur!"seconds"(1));
+  init_mutex(); // mutex example
+  
   
-  //spawn(&startCleanupTask); // clean inactive user state -- with mutex
   
   /*
   user_state_pid = spawn(&startCleanupTask2); // worker - clean inactive user state -- with message passing (actors model like in Erlang) -- err here - do not tick every 30 sec
@@ -382,16 +307,6 @@ void ws_handle(scope WebSocket sock){
     throw new StringException("client_id not found");
   }
   
-  /*
-  bool is_new_client = true;
-  if(userStateManager.contains(client_id)){ // used with mutex
-    is_new_client = false;
-    writeln("Reconnection from existing client: ", client_id);
-  }else{
-    writeln("New client connected: ", client_id);
-  }
-  userStateManager.addOrUpdate(client_id);
-  */
   
   /*
   user_state_pid.send(Msg3(thisTid, 1, client_id)); // 1 for check is client exists -- err here - do not tick every 30 sec
@@ -563,7 +478,6 @@ void test(HTTPServerRequest req, HTTPServerResponse res){
   
   
   
-  
   //test_pg_conn_driver();
   test_pg_conn_driver_queries();
   

+ 158 - 0
vtest/source/mutex_test.d

@@ -0,0 +1,158 @@
+
+// mutex example
+
+// http://127.0.0.1:8080/   +   'ws://' + window.location.host + '/ws_mutex';
+
+
+import std.stdio : writeln;
+
+import std.string;
+import std.array;
+import std.algorithm;
+
+import vibe.core.core;
+import vibe.http.router;
+import vibe.http.server;
+import vibe.http.fileserver;
+import vibe.http.websockets;
+import vibe.core.log;
+
+import core.sync.mutex : Mutex;
+import std.datetime : SysTime, Clock;
+import std.concurrency : spawn;
+import vibe.core.concurrency;
+import vibe.core.core : Task, sleep;
+import core.time : Duration, dur;
+
+
+struct UserState{
+  string client_id;
+  SysTime last_online_at;
+  
+  this(string client_id, SysTime last_online_at) pure nothrow @safe{
+    this.client_id = client_id;
+    this.last_online_at = last_online_at;
+  }
+}
+
+alias UserStateMap = UserState[string];
+
+
+class UserStateManager{
+  private UserStateMap states;
+  private Object lockObj;
+  
+  this(){
+    lockObj = new Object();
+  }
+  
+  bool contains(string client_id){
+    synchronized(lockObj){
+      return (client_id in states) !is null;
+    }
+  }
+  
+  void addOrUpdate(string client_id){
+    synchronized(lockObj){
+      auto now = Clock.currTime();
+      states.remove(client_id);
+      states[client_id] = UserState(client_id, now);
+    }
+  }
+  
+  void cleanup(){
+    synchronized(lockObj){
+      auto now = Clock.currTime();
+      auto toRemove = states.byKey
+        .filter!(k => (now - states[k].last_online_at).total!"seconds" > 30)
+        .array;
+      
+      foreach(client_id; toRemove){
+        states.remove(client_id);
+      }
+    }
+  }
+  
+  size_t length() const{
+    synchronized(lockObj){
+      return states.length;
+    }
+  }
+}
+
+__gshared UserStateManager userStateManager;
+
+
+
+void startCleanupTask(){
+  while(true){
+    writeln("startCleanupTask();");
+    userStateManager.cleanup();
+    //writeln("88 userStateManager.states is null? ", userStateManager.states is null);
+    //sleep(dur!"hours"(1)); // every 1 hour = 3600 sec
+    sleep(dur!"seconds"(30)); // every 30 sec
+  }
+}
+
+
+void init_mutex(){
+  userStateManager = new UserStateManager();
+  //userStateManager.addOrUpdate("123");
+  
+  //writeln("99 userStateManager.states is null? ", userStateManager.states is null);
+  //sleep(dur!"seconds"(1));
+  
+  spawn(&startCleanupTask); // clean inactive user state
+}
+
+
+void ws_mutex_handle(scope WebSocket sock){
+  string client_id = "";
+  if("client_id" in sock.request.query){
+    client_id = sock.request.query["client_id"];
+    //writeln("found client_id = ", client_id);
+  }else{
+    //writeln("client_id not found");
+    throw new StringException("client_id not found");
+  }
+  
+  
+  bool is_new_client = true;
+  if(userStateManager.contains(client_id)){
+    is_new_client = false;
+    writeln("Reconnection from existing client: ", client_id);
+  }else{
+    writeln("New client connected: ", client_id);
+  }
+  userStateManager.addOrUpdate(client_id);
+  
+  
+  while(sock.waitForData()){
+    auto msg = sock.receiveText();
+    sock.send(msg ~ " :)");
+  }
+  writeln("after disconnect");
+}
+
+/*
+
+./vtest
+Listening for requests on http://127.0.0.1:8080/
+//99 userStateManager.states is null? true
+Please open http://127.0.0.1:8080/ in your browser.
+startCleanupTask();
+//88 userStateManager.states is null? true
+startCleanupTask();
+//88 userStateManager.states is null? true
+New client connected: zkytkcKzeGaDfvfDs6y5
+after disconnect
+Reconnection from existing client: zkytkcKzeGaDfvfDs6y5
+after disconnect
+startCleanupTask();
+//88 userStateManager.states is null? false
+New client connected: zkytkcKzeGaDfvfDs6y5
+after disconnect
+startCleanupTask();
+//88 userStateManager.states is null? false
+
+*/