123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- // mutex example
- // http://127.0.0.1:8080/ + 'ws://' + window.location.host + '/ws_mutex';
- import std.stdio : writeln;
- import std.string;
- import std.array;
- import std.algorithm;
- import vibe.core.core;
- import vibe.http.router;
- import vibe.http.server;
- import vibe.http.fileserver;
- import vibe.http.websockets;
- import vibe.core.log;
- import core.sync.mutex : Mutex;
- import std.datetime : SysTime, Clock;
- import std.concurrency : spawn;
- import vibe.core.concurrency;
- import vibe.core.core : Task, sleep;
- import core.time : Duration, dur;
- struct UserState{
- string client_id;
- SysTime last_online_at;
-
- this(string client_id, SysTime last_online_at) pure nothrow @safe{
- this.client_id = client_id;
- this.last_online_at = last_online_at;
- }
- }
- alias UserStateMap = UserState[string];
- class UserStateManager{
- private UserStateMap states;
- private Object lockObj;
-
- this(){
- lockObj = new Object();
- }
-
- bool contains(string client_id){
- synchronized(lockObj){
- return (client_id in states) !is null;
- }
- }
-
- void addOrUpdate(string client_id){
- synchronized(lockObj){
- auto now = Clock.currTime();
- states.remove(client_id);
- states[client_id] = UserState(client_id, now);
- }
- }
-
- void cleanup(){
- synchronized(lockObj){
- auto now = Clock.currTime();
- auto toRemove = states.byKey
- .filter!(k => (now - states[k].last_online_at).total!"seconds" > 30)
- .array;
-
- foreach(client_id; toRemove){
- states.remove(client_id);
- }
- }
- }
-
- size_t length() const{
- synchronized(lockObj){
- return states.length;
- }
- }
- }
- __gshared UserStateManager userStateManager;
- void startCleanupTask(){
- while(true){
- writeln("startCleanupTask();");
- userStateManager.cleanup();
- //writeln("88 userStateManager.states is null? ", userStateManager.states is null);
- //sleep(dur!"hours"(1)); // every 1 hour = 3600 sec
- sleep(dur!"seconds"(30)); // every 30 sec
- }
- }
- void init_mutex(){
- userStateManager = new UserStateManager();
- //userStateManager.addOrUpdate("123");
-
- //writeln("99 userStateManager.states is null? ", userStateManager.states is null);
- //sleep(dur!"seconds"(1));
-
- spawn(&startCleanupTask); // clean inactive user state
- }
- void ws_mutex_handle(scope WebSocket sock){
- string client_id = "";
- if("client_id" in sock.request.query){
- client_id = sock.request.query["client_id"];
- //writeln("found client_id = ", client_id);
- }else{
- //writeln("client_id not found");
- throw new StringException("client_id not found");
- }
-
-
- bool is_new_client = true;
- if(userStateManager.contains(client_id)){
- is_new_client = false;
- writeln("Reconnection from existing client: ", client_id);
- }else{
- writeln("New client connected: ", client_id);
- }
- userStateManager.addOrUpdate(client_id);
-
-
- while(sock.waitForData()){
- auto msg = sock.receiveText();
- sock.send(msg ~ " :)");
- }
- writeln("after disconnect");
- }
- /*
- ./vtest
- Listening for requests on http://127.0.0.1:8080/
- //99 userStateManager.states is null? true
- Please open http://127.0.0.1:8080/ in your browser.
- startCleanupTask();
- //88 userStateManager.states is null? true
- startCleanupTask();
- //88 userStateManager.states is null? true
- New client connected: zkytkcKzeGaDfvfDs6y5
- after disconnect
- Reconnection from existing client: zkytkcKzeGaDfvfDs6y5
- after disconnect
- startCleanupTask();
- //88 userStateManager.states is null? false
- New client connected: zkytkcKzeGaDfvfDs6y5
- after disconnect
- startCleanupTask();
- //88 userStateManager.states is null? false
- */
|