|
@@ -0,0 +1,437 @@
|
|
|
+
|
|
|
+module memcached4d;
|
|
|
+
|
|
|
+// TODO:
|
|
|
+// documentation
|
|
|
+// unit tests
|
|
|
+//
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+Memcached client for the D programming language.
|
|
|
+memcached is a distributed caching system (http://www.memcached.org)
|
|
|
+
|
|
|
+The basic idea is this: if you need to share/cache objects between applications you dump them into memcache in a serialized form.
|
|
|
+the data can be read back from other programs - even from other programming language - provided that the reader knows how to deserialize the data.
|
|
|
+a common way to serialize data is json. Memcached4d uses vibe serialization library to dump your data to json, but you can provide your own serialization method
|
|
|
+by implementing a JSON toJson method in your objects.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+A similar tool - with a lot more features is Redis - you may have a look at vibe.db.redis
|
|
|
+
|
|
|
+
|
|
|
+usage
|
|
|
+ auto cache = memcachedConnect('127.0.0.1');
|
|
|
+ auto cache = memcachedConnect('127.0.0.1:11211');
|
|
|
+ auto cache = memcachedConnect( ['127.0.0.1', '127.0.0.1'] ); // you can connect the the same server multiplie times
|
|
|
+ auto cache = memcachedConnect( '127.0.0.1, 127.0.0.1' );
|
|
|
+ auto cache = memcachedConnect( '127.0.0.1:11211, 127.0.0.1:11212' );
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if( cache.store("str_var", "lorem ipsum") == RETURN_STATE.SUCCESS ) {
|
|
|
+ writeln("stored successfully");
|
|
|
+ writeln( " get back the stored data : {", cache.get!string("str_var") , "}" );
|
|
|
+ }else {
|
|
|
+ writeln("not stored")
|
|
|
+ }
|
|
|
+
|
|
|
+*/
|
|
|
+
|
|
|
+
|
|
|
+import std.stdio;
|
|
|
+import std.string,
|
|
|
+ std.conv,
|
|
|
+ std.digest.md;
|
|
|
+
|
|
|
+static import std.array;
|
|
|
+
|
|
|
+import vibe.core.net,
|
|
|
+ vibe.stream.operations,
|
|
|
+ vibe.data.serialization,
|
|
|
+ vibe.data.json;
|
|
|
+
|
|
|
+import std.traits : isNumeric, isBoolean, isSomeString, Unqual;
|
|
|
+
|
|
|
+
|
|
|
+struct MemcachedServer {
|
|
|
+ string host;
|
|
|
+ ushort port = 11211;
|
|
|
+ TCPConnection conn;
|
|
|
+ this(string hostString){
|
|
|
+ if( hostString.indexOf(':') != -1){
|
|
|
+ auto parts = split(hostString, ':');
|
|
|
+ this.host = strip(parts[0]);
|
|
|
+ this.port = strip(parts[1]).to!ushort;
|
|
|
+ } else {
|
|
|
+ this.host = strip(hostString);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+alias MemcachedServer[] MemcachedServers;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * Use this to connectect to a memcached cluster
|
|
|
+ *
|
|
|
+ *
|
|
|
+*/
|
|
|
+class MemcachedClusterClient : MemcachedClient {
|
|
|
+
|
|
|
+
|
|
|
+ protected MemcachedServers servers;
|
|
|
+
|
|
|
+ this(MemcachedServers servers) {
|
|
|
+ this.servers = servers;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ override void connect(string key) {
|
|
|
+ auto server = getServer(key);
|
|
|
+ this.conn = server.conn;
|
|
|
+ if (!conn || !conn.connected) {
|
|
|
+ try conn = connectTCP(server.host, server.port);
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new Exception(format("Failed to connect to memcached server at %s:%s.", server.host, server.port), __FILE__, __LINE__, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ override void disconnect() {
|
|
|
+ foreach(server; servers) {
|
|
|
+ if (server.conn && server.conn.connected)
|
|
|
+ server.conn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ MemcachedServer getServer(string key) {
|
|
|
+ return servers[ determineServer(key) ];
|
|
|
+ }
|
|
|
+
|
|
|
+ int determineServer(string key){
|
|
|
+ return equalWeights(key);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * use this for sharding - when all servers have equal weights *
|
|
|
+ *
|
|
|
+ * hash the key, get the last byte
|
|
|
+ * get the modulo of that last byte to the length of servers
|
|
|
+ */
|
|
|
+ int equalWeights(string key){
|
|
|
+ return md5Of(key)[$-1] % servers.length;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+enum RETURN_STATE { SUCCESS, ERROR, STORED, EXISTS, NOT_FOUND, NOT_STORED };
|
|
|
+
|
|
|
+/**
|
|
|
+ * Use this to connect and query a memcached server
|
|
|
+ *
|
|
|
+ *
|
|
|
+ */
|
|
|
+class MemcachedClient {
|
|
|
+ TCPConnection conn;
|
|
|
+ protected MemcachedServer server;
|
|
|
+
|
|
|
+ this() {} // allow adding a different constructor in subclasses
|
|
|
+
|
|
|
+ this(string host, ushort port = 11211) {
|
|
|
+ server.host = host;
|
|
|
+ server.port = port;
|
|
|
+ conn = server.conn;
|
|
|
+ }
|
|
|
+
|
|
|
+ this(MemcachedServer server){
|
|
|
+ this.server = server;
|
|
|
+ conn = server.conn;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ void connect(string key) {
|
|
|
+ if (!conn || !conn.connected) {
|
|
|
+ try conn = connectTCP(server.host, server.port);
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new Exception(format("Failed to connect to memcached server at %s:%s.", server.host, server.port), __FILE__, __LINE__, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void disconnect() {
|
|
|
+ if( conn && conn.connected ){
|
|
|
+ conn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * convert User data type to a string representation
|
|
|
+ */
|
|
|
+ protected string serialize(T)(T data) {
|
|
|
+ import std.traits : isNumeric, isBoolean, Unqual;
|
|
|
+ import vibe.data.json : serializeToJsonString;
|
|
|
+ alias Unqual!T Unqualified;
|
|
|
+
|
|
|
+ static if (is(Unqualified == string)) {
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+ else static if (isNumeric!Unqualified || isBoolean!Unqualified) {
|
|
|
+ return data.to!string;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // Use Vibe.d's JSON serialization
|
|
|
+ return serializeToJsonString(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * store data with key, make it expire after expires secconds
|
|
|
+ * if expires == 0 - data will not expire
|
|
|
+ *
|
|
|
+ */
|
|
|
+ RETURN_STATE store(T)(string key, T data, int expires = 0) {
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("set %s 0 %s %s\r\n%s\r\n", key, expires, value.length.to!string, value) );
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE add(T)(string key, T data, int expires = 0){
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("add %s 0 %s %s\r\n%s\r\n", key, expires, value.length.to!string, value) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_STORED") { return RETURN_STATE.NOT_STORED ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+ RETURN_STATE replace(T)(string key, T data, int expires = 0){
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("replace %s 0 %s %s\r\n%s\r\n", key, expires, value.length.to!string, value) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_STORED") { return RETURN_STATE.NOT_STORED ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE append(T)(string key, T data){
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("append %s 0 0 %s\r\n%s\r\n", key, value.length.to!string, value) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_STORED") { return RETURN_STATE.NOT_STORED ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+ RETURN_STATE prepend(T)(string key, T data){
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("prepend %s 0 0 %s\r\n%s\r\n", key, value.length.to!string, value) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_STORED") { return RETURN_STATE.NOT_STORED ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE cas(T)(string key, T data, int casId, int expires = 0){
|
|
|
+ connect(key);
|
|
|
+ string value = serialize(data);
|
|
|
+ conn.write( format("cas %s 0 %s %s %s\r\n%s\r\n", key, expires, value.length.to!string, casId, value) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ if(retval == "STORED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_FOUND") { return RETURN_STATE.NOT_FOUND ; }
|
|
|
+ else if(retval == "EXISTS") { return RETURN_STATE.EXISTS ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE remove(string key, int time = 0){
|
|
|
+ connect(key);
|
|
|
+ conn.write( format("delete %s %s\r\n", key, time));
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ if(retval == "DELETED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_FOUND") { return RETURN_STATE.NOT_FOUND ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+ alias remove del;
|
|
|
+
|
|
|
+ RETURN_STATE increment(string key, int inc) {
|
|
|
+ connect(key);
|
|
|
+ conn.write(format("incr %s %s\r\n", key, inc));
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ import std.ascii : isDigit;
|
|
|
+ import std.algorithm : all;
|
|
|
+
|
|
|
+ if (retval.length > 0 && retval[].all!(c => isDigit(c))) {
|
|
|
+ return RETURN_STATE.SUCCESS;
|
|
|
+ }
|
|
|
+ else if(retval == "NOT_FOUND") {
|
|
|
+ return RETURN_STATE.NOT_FOUND;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ return RETURN_STATE.ERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ alias increment incr;
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE decrement(string key, int dec) {
|
|
|
+ connect(key);
|
|
|
+ conn.write(format("decr %s %s\r\n", key, dec));
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ import std.ascii : isDigit;
|
|
|
+ import std.algorithm : all;
|
|
|
+
|
|
|
+ if (retval.length > 0 && retval[].all!(c => isDigit(c))) {
|
|
|
+ return RETURN_STATE.SUCCESS;
|
|
|
+ }
|
|
|
+ else if(retval == "NOT_FOUND") {
|
|
|
+ return RETURN_STATE.NOT_FOUND;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ return RETURN_STATE.ERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ alias decrement decr;
|
|
|
+
|
|
|
+
|
|
|
+ RETURN_STATE touch(string key, int expires){
|
|
|
+ connect(key);
|
|
|
+ conn.write( format("touch %s %s\r\n", key, expires) );
|
|
|
+
|
|
|
+ auto retval = cast(string) conn.readLine();
|
|
|
+ if(retval == "TOUCHED") { return RETURN_STATE.SUCCESS ; }
|
|
|
+ else if(retval == "NOT_FOUND") { return RETURN_STATE.NOT_FOUND ; }
|
|
|
+ else { return RETURN_STATE.ERROR; }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * convert data stored in memcached to the requested type
|
|
|
+ */
|
|
|
+ protected T deserialize(T)(string data) {
|
|
|
+ import std.traits : isNumeric, isBoolean, Unqual;
|
|
|
+ import vibe.data.json : parseJsonString, deserializeJson;
|
|
|
+ alias Unqual!T Unqualified;
|
|
|
+
|
|
|
+ static if (is(Unqualified == string)) {
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+ else static if (isNumeric!Unqualified || isBoolean!Unqualified) {
|
|
|
+ return chomp(data).to!T;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ return parseJsonString(data).deserializeJson!T();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * get back data from the memcached server,
|
|
|
+ * return it as type T
|
|
|
+ *
|
|
|
+ * if the key is not found, it will return an empty string
|
|
|
+ */
|
|
|
+ T get(T)(string key){
|
|
|
+ connect(key);
|
|
|
+ conn.write( std.string.format("get %s \r\n", key ) );
|
|
|
+ auto tmp = std.array.appender!string;
|
|
|
+ do{
|
|
|
+ string ln = cast(string) conn.readLine();
|
|
|
+ if(ln.startsWith("VALUE " ~ key))
|
|
|
+ continue;
|
|
|
+ else if(ln == "END")
|
|
|
+ break;
|
|
|
+
|
|
|
+ tmp.put(ln ~ "\r\n");
|
|
|
+
|
|
|
+ } while( true);
|
|
|
+
|
|
|
+ return deserialize!T( tmp.data );
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * get back data form the memcached server,
|
|
|
+ * return it as type T,
|
|
|
+ *
|
|
|
+ * populate the casId variable with the cas_unique_id
|
|
|
+ *
|
|
|
+ */
|
|
|
+ T gets(T)(string key, out int casId) {
|
|
|
+ connect(key);
|
|
|
+ conn.write( std.string.format("gets %s \r\n", key ) );
|
|
|
+
|
|
|
+ auto tmp = std.array.appender!string;
|
|
|
+ do{
|
|
|
+ string ln = cast(string) conn.readLine();
|
|
|
+ if(ln.startsWith("VALUE " ~ key)) {
|
|
|
+ auto parts = split(ln);
|
|
|
+ casId = parts[ $-1 ].to!int;
|
|
|
+ continue;
|
|
|
+ } else if(ln == "END") {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp.put(ln ~ "\r\n");
|
|
|
+
|
|
|
+ } while( true);
|
|
|
+
|
|
|
+ return deserialize!T( tmp.data );
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * connect to a memcached server or server cluster
|
|
|
+ *
|
|
|
+ *
|
|
|
+ */
|
|
|
+MemcachedClient memcachedConnect(string hostString){
|
|
|
+ if( hostString.indexOf(',') >=0 ) {
|
|
|
+ auto hosts = split(hostString, ',');
|
|
|
+ return memcachedConnect( hosts);
|
|
|
+ }
|
|
|
+ return new MemcachedClient( MemcachedServer(hostString) );
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * connect to a memcached cluster
|
|
|
+ *
|
|
|
+ *
|
|
|
+ */
|
|
|
+MemcachedClusterClient memcachedConnect( string[] hostStrings ){
|
|
|
+ MemcachedServers servers;
|
|
|
+ foreach(host; hostStrings){
|
|
|
+ servers ~= MemcachedServer(host);
|
|
|
+ }
|
|
|
+ return new MemcachedClusterClient(servers);
|
|
|
+}
|
|
|
+
|