test3.d 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. alias uint8 = ubyte;
  2. import std.stdio;
  3. import std.concurrency : receive, receiveOnly, receiveTimeout, send, spawn, thisTid, Tid;
  4. import core.time : Duration, dur;
  5. import core.thread : Thread;
  6. Tid worker_ticker;
  7. Tid[4] workers_tids;
  8. struct Msg1{
  9. uint8 command;
  10. }
  11. struct Msg2{
  12. Tid sender;
  13. string client_id;
  14. }
  15. struct Msg21{
  16. uint8 command;
  17. string client_id;
  18. }
  19. struct Msg3{
  20. Tid sender;
  21. uint8 command;
  22. string client_id;
  23. }
  24. void test3_worker(uint8 n){
  25. writeln("started worker = ", n);
  26. bool go_work = true;
  27. // https://dlang.org/phobos/std_concurrency.html#.receive
  28. while(go_work){
  29. //Thread.sleep(dur!"seconds"(120));
  30. receive(
  31. (Msg1 m) => writeln("got Msg1: command = ", m.command),
  32. (Msg2 m){ writeln("got Msg2: from ", m.sender, ", client_id = ", m.client_id); send(m.sender, "Msg2 = ok"); },
  33. (Msg21 m) => writeln("got Msg21: command = ", m.command, ", client_id = ", m.client_id),
  34. (Msg3 m){ writeln("got Msg3: from ", m.sender, ", command ", m.command); send(m.sender, "Msg3 = ok"); go_work = false; }
  35. );
  36. }
  37. writeln("stopped worker = ", n); // unreachable
  38. }
  39. void test3_ticker(){
  40. writeln("started ticker");
  41. bool go_work = true;
  42. uint8 counter = 0;
  43. while(go_work){
  44. //receive(
  45. receiveTimeout(dur!"seconds"(1),
  46. (uint8 msg){ if(msg == 0){ writeln("ticker stop command received"); go_work = false; } }
  47. );
  48. counter += 1;
  49. if(counter % 5 == 0){
  50. counter = 0;
  51. writeln("tick (5 seconds passed)");
  52. }
  53. Thread.sleep(dur!"seconds"(1));
  54. }
  55. }
  56. void test3_spawner(){
  57. worker_ticker = spawn(&test3_ticker);
  58. uint8 i = 1;
  59. workers_tids[0] = spawn(&test3_worker, i);
  60. string answer;
  61. //Thread.sleep(dur!"seconds"(30));
  62. Thread.sleep(dur!"seconds"(3));
  63. send(workers_tids[0], Msg1(42)); // async command ?
  64. send(workers_tids[0], Msg2(thisTid, "client_123"));
  65. answer = receiveOnly!string();
  66. writeln("answer = ", answer);
  67. send(workers_tids[0], Msg21(5, "client_456"));
  68. send(workers_tids[0], Msg3(thisTid, 99, "client_789"));
  69. answer = receiveOnly!string();
  70. writeln("answer = ", answer);
  71. Thread.sleep(dur!"seconds"(25));
  72. uint8 stop_command = 0;
  73. send(worker_ticker, stop_command); // send stop command for ticker worker
  74. writeln("the end");
  75. }
  76. /*
  77. // message passing (sync + responce) + ticker with stop command + app ok stop
  78. > make run
  79. ./vtest2
  80. started ticker
  81. started worker = 1
  82. got Msg1: command = 42
  83. got Msg2: from Tid(7ff9edfa48f0), client_id = client_123
  84. answer = Msg2 = ok
  85. got Msg21: command = 5, client_id = client_456
  86. got Msg3: from Tid(7ff9edfa48f0), command 99
  87. stopped worker = 1
  88. answer = Msg3 = ok
  89. tick (5 seconds passed)
  90. tick (5 seconds passed)
  91. the end
  92. ticker stop command received
  93. tick (5 seconds passed)
  94. hello here!
  95. */