1 module hunt.cache.rocksdb; 2 3 // dfmt off 4 version (WITH_HUNT_ROCKSDB) : 5 // dfmt on 6 7 import hunt.cache.cache; 8 import hunt.cache.nullable; 9 import hunt.cache.store; 10 11 import std.string; 12 import core.time; 13 import core.stdc.time; 14 import core.stdc.string; 15 import core.thread; 16 import std.file; 17 18 import rocksdb; 19 20 class RocksdbCache { 21 this(string dir) { 22 23 create(dir); 24 } 25 26 ~this() { 27 _rocksdb.close(); 28 } 29 30 Nullable!V get(V)(string key) { 31 synchronized (this) { 32 auto data = _rocksdb.get(cast(ubyte[]) key); 33 34 return get_inter!V(data); 35 36 } 37 } 38 39 Nullable!V[string] getall(V)(string[] key) { 40 synchronized (this) { 41 Nullable!V[string] mapv; 42 ubyte[][] datas = _rocksdb.multiGet(cast(ubyte[][]) key); 43 foreach (i, d; datas) { 44 mapv[key[i]] = get_inter!V(d); 45 } 46 return mapv; 47 } 48 } 49 50 bool containsKey(string key) { 51 synchronized (this) { 52 auto data = _rocksdb.get(cast(ubyte[]) key); 53 if (data == null) 54 return false; 55 if (check_is_expired(data)) { 56 _rocksdb.remove(cast(ubyte[]) key); 57 return false; 58 } 59 return true; 60 } 61 } 62 63 void put(V)(string key, V v, uint expired = 0) { 64 synchronized (this) { 65 _rocksdb.put(cast(ubyte[]) key, 66 generator_expired(expired) ~ cast(ubyte[]) SerializeToByte!V(v)); 67 } 68 } 69 70 // rocksdb no putifaabsent , so this function not atomic. 71 bool putifAbsent(V)(string key, V v) { 72 synchronized (this) { 73 auto data = _rocksdb.get(cast(ubyte[]) key); 74 if (data == null || check_is_expired(data)) { 75 put(key, v); 76 return true; 77 } 78 return false; 79 } 80 } 81 82 void putAll(V)(V[string] maps, uint expired) { 83 synchronized (this) { 84 string[] datas; 85 if (maps.length == 0) 86 return; 87 auto expired_data = generator_expired(expired); 88 _rocksdb.withBatch((batch) { 89 foreach (k, v; maps) 90 batch.put(cast(ubyte[]) k, expired_data ~ cast(ubyte[]) SerializeToByte(v)); 91 }); 92 } 93 } 94 95 bool remove(string key) { 96 synchronized (this) { 97 // rocksdb's remove api not return the value. 98 auto data = _rocksdb.get(cast(ubyte[]) key); 99 if (data == null) { 100 return false; 101 } 102 103 if (check_is_expired(data)) { 104 _rocksdb.remove(cast(ubyte[]) key); 105 return false; 106 } 107 108 _rocksdb.remove(cast(ubyte[]) key); 109 return true; 110 } 111 112 } 113 114 void removeAll(string[] keys) { 115 synchronized (this) { 116 foreach (k; keys) { 117 _rocksdb.remove(cast(ubyte[]) k); 118 } 119 } 120 } 121 122 void clear() { 123 _rocksdb.close(); 124 std.file.rmdirRecurse(_dir); 125 create(_dir); 126 127 } 128 129 protected: 130 131 void create(string dir) { 132 auto opts = new DBOptions; 133 opts.createIfMissing = true; 134 opts.errorIfExists = false; 135 136 _rocksdb = new Database(opts, dir); 137 _dir = dir; 138 } 139 140 Nullable!V get_inter(V)(ubyte[] data) { 141 if (data == null) 142 return Nullable!V.init; 143 144 if (check_is_expired(data)) { 145 _rocksdb.remove(cast(ubyte[]) data); 146 return Nullable!V.init; 147 } 148 149 return DeserializeToObject!V(cast(byte[]) data[8 .. $]); 150 } 151 152 ubyte[] generator_expired(uint expired) { 153 byte[8] byExpired; 154 if (expired == 0) { 155 return cast(ubyte[]) byExpired.idup; 156 } else { 157 ulong stamp = time(null) + expired; 158 memcpy(byExpired.ptr, cast(void*)&stamp, byExpired.sizeof); 159 } 160 return cast(ubyte[]) byExpired.idup; 161 } 162 163 ulong get_expired(ubyte[] data) { 164 ulong stamp; 165 memcpy(&stamp, data.ptr, 8); 166 return stamp; 167 } 168 169 bool check_is_expired(ubyte[] data) { 170 ulong stamp = time(null); 171 ulong expired = get_expired(data); 172 if (expired > 0 && expired < stamp) { 173 return true; 174 } 175 return false; 176 } 177 178 Database _rocksdb; 179 string _dir; 180 181 }