Просмотр исходного кода

try with message passing but locking error

221V 5 дней назад
Родитель
Сommit
b122c7a008
1 измененных файлов с 129 добавлено и 0 удалено
  1. 129 0
      vtest/source/app.d

+ 129 - 0
vtest/source/app.d

@@ -18,6 +18,7 @@ import ws_bert_login : ws_bert_handle, login_test; // login - logged - logout --
 import std.string;
 import std.array;
 import std.algorithm;
+import std.variant : Variant;
 
 
 
@@ -30,6 +31,39 @@ import core.time : Duration, dur;
 
 
 
+/+
+Tid user_state_pid; // with dlang message passing like in Erlang -- err here - do not tick every 30 sec
+
+struct UserState2{
+  string client_id;
+  SysTime last_online_at;
+}
+
+alias UserStateMap2 = UserState2[string];
+UserStateMap2 user_states;
+
+struct Msg1{
+  uint8 command;
+}
+
+/*
+struct Msg2{
+  Tid sender;
+  string client_id;
+}
+*/
+
+struct Msg21{
+  uint8 command;
+  string client_id;
+}
+
+struct Msg3{
+  Tid sender;
+  uint8 command;
+  string client_id;
+}
++/
 
 
 /*
@@ -171,6 +205,83 @@ void startCleanupTask(){ // with mutex
 }
 */
 
+/+
+void startCleanupTask2(){ // with message passing (actors model like in Erlang); worker -- err here - do not tick every 30 sec
+  writeln("startCleanupTask2();");
+  while(true){
+    /*
+    auto msg = receiveOnly!(Tid, uint8, string)();
+    writeln("int: ", msg[1]); // got int
+    writeln("string: ", msg[2]); // got string
+    msg[0].send(thisTid); // send message back
+    */
+    
+    receive(
+      (Msg1 msg){
+        if(msg.command == 0){ // 0 for delete inactive clients
+          writeln("221 ", user_states.keys); // show all keys
+          auto now = Clock.currTime();
+          auto toRemove = user_states.byKey
+            .filter!(k => (now - user_states[k].last_online_at).total!"seconds" > 30) // clean every 30 seconds
+            .array;
+          
+          foreach(client_id; toRemove){
+            user_states.remove(client_id);
+          }
+        }
+        return true;
+      },
+      
+      /*
+      (Msg2 msg){
+        msg.sender
+        msg.client_id
+      },
+      */
+      
+      (Msg21 msg){
+        if(msg.command == 2){ // 2 for add_or_upd client
+          //auto now = Clock.currTime();
+          user_states[msg.client_id] = UserState2( client_id : msg.client_id, last_online_at : Clock.currTime() );
+          //writeln("user_states.length = ", user_states.length);
+          //writeln(user_states.keys); // show all keys
+          //writeln(user_states.values); // show all values
+        }else if(msg.command == 3){ // 3 for delete client
+          user_states.remove(msg.client_id);
+        }
+        return true;
+      },
+      
+      (Msg3 msg){
+        if(msg.command == 1){ // 1 for check is new client
+          //msg.sender.send( (msg.client_id in user_states) !is null ); // is client exists
+          msg.sender.send( (msg.client_id in user_states) is null ); // is new client
+        }
+        //}else if(msg.command == 4){ // 4 for get length
+        //  msg.sender.send(user_states.length);
+        //}
+        return true;
+      },
+      
+      (Variant v){
+        writeln("got unexpected Variant v: ", v);
+        return true;
+      }
+    );
+  }
+}
+
+
+void startCleanupTask21(){ // clean daemon
+  while(true){
+    writeln("do clean!");
+    writeln("273", user_states.keys); // show all keys
+    user_state_pid.send(Msg1(0)); // 0 for delete inactive clients
+    writeln("275", user_states.keys); // show all keys
+    sleep(dur!"seconds"(30)); // clean every 30 seconds
+  }
+}
++/
 
 
 void main(){
@@ -241,6 +352,12 @@ void main(){
   
   //spawn(&startCleanupTask); // clean inactive user state -- with mutex
   
+  /*
+  user_state_pid = spawn(&startCleanupTask2); // worker - clean inactive user state -- with message passing (actors model like in Erlang) -- err here - do not tick every 30 sec
+  spawn(&startCleanupTask21); // daemon for worker
+  //auto clean_daemon_pid = spawn(&startCleanupTask21, thisTid);
+  sleep(dur!"seconds"(5));
+  */
   
   logInfo("Please open http://127.0.0.1:8080/ in your browser.");
   runApplication();
@@ -276,6 +393,18 @@ void ws_handle(scope WebSocket sock){
   userStateManager.addOrUpdate(client_id);
   */
   
+  /*
+  user_state_pid.send(Msg3(thisTid, 1, client_id)); // 1 for check is client exists -- err here - do not tick every 30 sec
+  //enforce(receiveOnly!Tid() == tid);
+  //auto response = receiveOnly!(Tid, bool)(); // response[0] = Tid of worker; response[1] = bool result
+  bool is_new_client = receiveOnly!bool();
+  if(is_new_client){
+    writeln("New client connected: ", client_id);
+  }else{
+    writeln("Reconnection from existing client: ", client_id);
+  }
+  user_state_pid.send(Msg21(2, client_id));
+  */
   
   
   /*