test3.d 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. alias uint8 = ubyte;
  2. import std.stdio;
  3. import std.concurrency : receive, receiveOnly, receiveTimeout, send, spawn, thisTid, Tid;
  4. import core.time : Duration, dur;
  5. import core.thread : Thread;
  6. Tid worker_ticker;
  7. Tid[4] workers_tids;
  8. struct Msg1{
  9. uint8 command;
  10. }
  11. struct Msg2{
  12. Tid sender;
  13. string client_id;
  14. }
  15. struct Msg21{
  16. uint8 command;
  17. string client_id;
  18. }
  19. struct Msg3{
  20. Tid sender;
  21. uint8 command;
  22. string client_id;
  23. }
  24. void test3_worker(uint8 n){
  25. writeln("started worker = ", n);
  26. bool go_work = true;
  27. // https://git.4dev.win/221V/dlang_book/src/master/book/13-параллельные-вычисления
  28. // https://dlang.org/phobos/std_concurrency.html#.receive
  29. while(go_work){
  30. //Thread.sleep(dur!"seconds"(120));
  31. receive(
  32. (Msg1 m) => writeln("got Msg1: command = ", m.command),
  33. (Msg2 m){ writeln("got Msg2: from ", m.sender, ", client_id = ", m.client_id); send(m.sender, "Msg2 = ok"); },
  34. (Msg21 m) => writeln("got Msg21: command = ", m.command, ", client_id = ", m.client_id),
  35. (Msg3 m){ writeln("got Msg3: from ", m.sender, ", command ", m.command); send(m.sender, "Msg3 = ok"); go_work = false; }
  36. );
  37. }
  38. writeln("stopped worker = ", n); // unreachable
  39. }
  40. void test3_ticker(){
  41. writeln("started ticker");
  42. bool go_work = true;
  43. uint8 counter = 0;
  44. while(go_work){
  45. //receive(
  46. receiveTimeout(dur!"seconds"(1),
  47. (uint8 msg){ if(msg == 0){ writeln("ticker stop command received"); go_work = false; } }
  48. );
  49. counter += 1;
  50. if(counter % 5 == 0){
  51. counter = 0;
  52. writeln("tick (5 seconds passed)");
  53. }
  54. Thread.sleep(dur!"seconds"(1));
  55. }
  56. }
  57. void test3_spawner(){
  58. worker_ticker = spawn(&test3_ticker);
  59. uint8 i = 1;
  60. workers_tids[0] = spawn(&test3_worker, i);
  61. string answer;
  62. //Thread.sleep(dur!"seconds"(30));
  63. Thread.sleep(dur!"seconds"(3));
  64. send(workers_tids[0], Msg1(42)); // async command ?
  65. send(workers_tids[0], Msg2(thisTid, "client_123"));
  66. answer = receiveOnly!string();
  67. writeln("answer = ", answer);
  68. send(workers_tids[0], Msg21(5, "client_456"));
  69. send(workers_tids[0], Msg3(thisTid, 99, "client_789"));
  70. answer = receiveOnly!string();
  71. writeln("answer = ", answer);
  72. Thread.sleep(dur!"seconds"(25));
  73. uint8 stop_command = 0;
  74. send(worker_ticker, stop_command); // send stop command for ticker worker
  75. writeln("the end");
  76. }
  77. /*
  78. // message passing (sync + responce) + ticker with stop command + app ok stop
  79. > make run
  80. ./vtest2
  81. started ticker
  82. started worker = 1
  83. got Msg1: command = 42
  84. got Msg2: from Tid(7ff9edfa48f0), client_id = client_123
  85. answer = Msg2 = ok
  86. got Msg21: command = 5, client_id = client_456
  87. got Msg3: from Tid(7ff9edfa48f0), command 99
  88. stopped worker = 1
  89. answer = Msg3 = ok
  90. tick (5 seconds passed)
  91. tick (5 seconds passed)
  92. the end
  93. ticker stop command received
  94. tick (5 seconds passed)
  95. hello here!
  96. */