221V 3 weeks ago
parent
commit
54463f46bb
4 changed files with 151 additions and 4 deletions
  1. 21 2
      vtest2/source/app.d
  2. 2 1
      vtest2/source/test1.d
  3. 1 1
      vtest2/source/test2.d
  4. 127 0
      vtest2/source/test3.d

+ 21 - 2
vtest2/source/app.d

@@ -3,6 +3,7 @@ import std.stdio;
 
 /*
 // test1
+//import std.parallelism : defaultPoolThreads;
 import std.concurrency : spawn;
 import core.time : Duration, dur;
 import core.thread : Thread;
@@ -11,20 +12,38 @@ import test1 : test1_spawner;
 */
 
 
+/*
 // test2
 import std.concurrency : spawn;
 import core.time : Duration, dur;
 import core.thread : Thread;
 
 import test2 : test2_spawner;
+*/
 
 
+/**/
+// test3
+//import std.parallelism : defaultPoolThreads;
+import std.concurrency : spawn, send;
+import core.time : Duration, dur;
+import core.thread : Thread;
+
+import test3 : test3_spawner;
+/**/
+
 
 void main(){
+  //writeln("defaultPoolThreads = ", defaultPoolThreads()); // 19
+  //defaultPoolThreads(8); // changes nothing because all threads works on 1 CPU core, not on multicores..(
+  //writeln("defaultPoolThreads = ", defaultPoolThreads()); // 8
+  
   //spawn(&test1_spawner); // test1 run
-  spawn(&test2_spawner); // test2 run
+  //spawn(&test2_spawner); // test2 run
+  spawn(&test3_spawner); // test3 run
   
-  Thread.sleep(dur!"seconds"(125));
+  //Thread.sleep(dur!"seconds"(125));
+  Thread.sleep(dur!"seconds"(30));
   
   writeln("hello here!");
 }

+ 2 - 1
vtest2/source/test1.d

@@ -2,6 +2,7 @@
 alias uint8  = ubyte;
 
 import std.stdio;
+//import std.parallelism : defaultPoolThreads;
 import std.concurrency : spawn;
 import core.time : Duration, dur;
 import core.thread;
@@ -30,7 +31,7 @@ void test1_spawner(){
 
 
 /*
-// 1 OS process, 10 threads
+// 10 threads (10 OS processes ?) - but with problem for 100_000 threads - but only on one CPU core (
 
 > make run
 ./vtest2

+ 1 - 1
vtest2/source/test2.d

@@ -83,7 +83,7 @@ void test2_spawner(){
 
 
 /*
-// message passing
+// message passing - async send
 
 > make run
 ./vtest2

+ 127 - 0
vtest2/source/test3.d

@@ -0,0 +1,127 @@
+
+alias uint8  = ubyte;
+
+import std.stdio;
+import std.concurrency : receive, receiveOnly, receiveTimeout, send, spawn, thisTid, Tid;
+import core.time : Duration, dur;
+import core.thread : Thread;
+
+Tid worker_ticker;
+Tid[4] workers_tids;
+
+
+struct Msg1{
+  uint8 command;
+}
+
+struct Msg2{
+  Tid sender;
+  string client_id;
+}
+
+struct Msg21{
+  uint8 command;
+  string client_id;
+}
+
+struct Msg3{
+  Tid sender;
+  uint8 command;
+  string client_id;
+}
+
+
+
+void test3_worker(uint8 n){
+  writeln("started worker = ", n);
+  bool go_work = true;
+  
+  // https://dlang.org/phobos/std_concurrency.html#.receive
+  while(go_work){
+    //Thread.sleep(dur!"seconds"(120));
+    receive(
+      (Msg1 m) =>  writeln("got Msg1: command = ", m.command),
+      (Msg2 m){ writeln("got Msg2: from ", m.sender, ", client_id = ", m.client_id); send(m.sender, "Msg2 = ok"); },
+      (Msg21 m) => writeln("got Msg21: command = ", m.command, ", client_id = ", m.client_id),
+      (Msg3 m){ writeln("got Msg3: from ", m.sender, ", command ", m.command); send(m.sender, "Msg3 = ok"); go_work = false; }
+    );
+  }
+  writeln("stopped worker = ", n); // unreachable
+}
+
+
+void test3_ticker(){
+  writeln("started ticker");
+  bool go_work = true;
+  uint8 counter = 0;
+  
+  while(go_work){
+    //receive(
+    receiveTimeout(dur!"seconds"(1),
+      (uint8 msg){ if(msg == 0){ writeln("ticker stop command received"); go_work = false; } }
+    );
+    
+    counter += 1;
+    if(counter % 5 == 0){
+      counter = 0;
+      writeln("tick (5 seconds passed)");
+    }
+    
+    Thread.sleep(dur!"seconds"(1));
+  }
+}
+
+
+void test3_spawner(){
+  worker_ticker = spawn(&test3_ticker);
+  
+  uint8 i = 1;
+  workers_tids[0] = spawn(&test3_worker, i);
+  string answer;
+  
+  //Thread.sleep(dur!"seconds"(30));
+  Thread.sleep(dur!"seconds"(3));
+  
+  send(workers_tids[0], Msg1(42)); // async command ?
+  
+  send(workers_tids[0], Msg2(thisTid, "client_123"));
+  answer = receiveOnly!string();
+  writeln("answer = ", answer);
+  
+  send(workers_tids[0], Msg21(5, "client_456"));
+  send(workers_tids[0], Msg3(thisTid, 99, "client_789"));
+  answer = receiveOnly!string();
+  writeln("answer = ", answer);
+  
+  Thread.sleep(dur!"seconds"(25));
+  
+  uint8 stop_command = 0;
+  send(worker_ticker, stop_command); // send stop command for ticker worker
+  
+  writeln("the end");
+}
+
+
+/*
+// message passing (sync + responce) + ticker with stop command + app ok stop
+
+> make run
+./vtest2
+started ticker
+started worker = 1
+got Msg1: command = 42
+got Msg2: from Tid(7ff9edfa48f0), client_id = client_123
+answer = Msg2 = ok
+got Msg21: command = 5, client_id = client_456
+got Msg3: from Tid(7ff9edfa48f0), command 99
+stopped worker = 1
+answer = Msg3 = ok
+tick (5 seconds passed)
+tick (5 seconds passed)
+the end
+ticker stop command received
+tick (5 seconds passed)
+hello here!
+
+*/
+