1 module hunt.cache.adapter.RocksdbAdapter; 2 3 // dfmt off 4 version (WITH_HUNT_ROCKSDB) : 5 // dfmt on 6 7 import hunt.cache.adapter.Adapter; 8 import hunt.cache.Store; 9 import hunt.cache.CacheOptions; 10 11 import core.time; 12 import core.stdc.time; 13 import core.stdc.string; 14 import core.thread; 15 16 import std.file; 17 import std.string; 18 import hunt.cache.Nullable; 19 20 import rocksdb; 21 22 class RocksdbAdapter : Adapter 23 { 24 this(CacheOptions.rocksdb config) 25 { 26 create(config.file); 27 } 28 29 ~this() 30 { 31 _rocksdb.close(); 32 } 33 34 Nullable!V get(V) (string key) 35 { 36 synchronized (this) { 37 auto data = _rocksdb.get(cast(ubyte[]) key); 38 39 return get_inter!V(data); 40 41 } 42 } 43 44 Nullable!V[string] getAll(V) (string[] key) 45 { 46 synchronized (this) { 47 Nullable!V[string] mapv; 48 ubyte[][] datas = _rocksdb.multiGet(cast(ubyte[][]) key); 49 foreach (i, d; datas) { 50 mapv[key[i]] = get_inter!V(d); 51 } 52 return mapv; 53 } 54 } 55 56 bool hasKey(string key) 57 { 58 synchronized (this) 59 { 60 auto data = _rocksdb.get(cast(ubyte[]) key); 61 if (data == null) 62 return false; 63 if (check_is_expired(data)) { 64 _rocksdb.remove(cast(ubyte[]) key); 65 return false; 66 } 67 return true; 68 } 69 } 70 71 void set(V) (string key, V v, uint expired = 0) 72 { 73 synchronized (this) { 74 _rocksdb.put(cast(ubyte[]) key, 75 generator_expired(expired) ~ cast(ubyte[]) SerializeToByte!V(v)); 76 } 77 } 78 79 // rocksdb no putifaabsent, so this function not atomic. 80 bool setIfAbsent(V) (string key, V v) 81 { 82 synchronized (this) { 83 auto data = _rocksdb.get(cast(ubyte[]) key); 84 if (data == null || check_is_expired(data)) { 85 put(key, v); 86 return true; 87 } 88 return false; 89 } 90 } 91 92 void set(V) (V[string] maps, uint expired) 93 { 94 synchronized (this) { 95 string[] datas; 96 if (maps.length == 0) 97 return; 98 auto expired_data = generator_expired(expired); 99 _rocksdb.withBatch((batch) { 100 foreach (k, v; maps) 101 batch.put(cast(ubyte[]) k, expired_data ~ cast(ubyte[]) SerializeToByte(v)); 102 }); 103 } 104 } 105 106 bool remove(string key) 107 { 108 synchronized (this) { 109 // rocksdb's remove api not return the value. 110 auto data = _rocksdb.get(cast(ubyte[]) key); 111 if (data == null) { 112 return false; 113 } 114 115 if (check_is_expired(data)) { 116 _rocksdb.remove(cast(ubyte[]) key); 117 return false; 118 } 119 120 _rocksdb.remove(cast(ubyte[]) key); 121 return true; 122 } 123 } 124 125 void remove(string[] keys) 126 { 127 synchronized (this) { 128 foreach (k; keys) { 129 _rocksdb.remove(cast(ubyte[]) k); 130 } 131 } 132 } 133 134 void clear() 135 { 136 _rocksdb.close(); 137 std.file.rmdirRecurse(_dir); 138 create(_dir); 139 } 140 141 protected: 142 143 void create(string dir) { 144 auto opts = new DBOptions; 145 opts.createIfMissing = true; 146 opts.errorIfExists = false; 147 148 _rocksdb = new Database(opts, dir); 149 _dir = dir; 150 } 151 152 Nullable!V get_inter(V) (ubyte[] data) { 153 if (data == null) 154 return Nullable!V.init; 155 156 if (check_is_expired(data)) { 157 _rocksdb.remove(cast(ubyte[]) data); 158 return Nullable!V.init; 159 } 160 161 return DeserializeToObject!V(cast(byte[]) data[8 .. $]); 162 } 163 164 ubyte[] generator_expired(uint expired) { 165 byte[8] byExpired; 166 if (expired == 0) { 167 return cast(ubyte[]) byExpired.idup; 168 } else { 169 ulong stamp = time(null) + expired; 170 memcpy(byExpired.ptr, cast(void*)&stamp, byExpired.sizeof); 171 } 172 return cast(ubyte[]) byExpired.idup; 173 } 174 175 ulong get_expired(ubyte[] data) { 176 ulong stamp; 177 memcpy(&stamp, data.ptr, 8); 178 return stamp; 179 } 180 181 bool check_is_expired(ubyte[] data) { 182 ulong stamp = time(null); 183 ulong expired = get_expired(data); 184 if (expired > 0 && expired < stamp) { 185 return true; 186 } 187 return false; 188 } 189 190 Database _rocksdb; 191 string _dir; 192 }