1 module hunt.cache.rocksdb;
2 
3 // dfmt off
4 version (WITH_HUNT_ROCKSDB)  : 
5 // dfmt on
6 
7 import hunt.cache.cache;
8 import hunt.cache.nullable;
9 import hunt.cache.store;
10 
11 import std.string;
12 import core.time;
13 import core.stdc.time;
14 import core.stdc.string;
15 import core.thread;
16 import std.file;
17 
18 import rocksdb;
19 
20 class RocksdbCache {
21 	this(string dir) {
22 
23 		create(dir);
24 	}
25 
26 	~this() {
27 		_rocksdb.close();
28 	}
29 
30 	Nullable!V get(V)(string key) {
31 		synchronized (this) {
32 			auto data = _rocksdb.get(cast(ubyte[]) key);
33 
34 			return get_inter!V(data);
35 
36 		}
37 	}
38 
39 	Nullable!V[string] getall(V)(string[] key) {
40 		synchronized (this) {
41 			Nullable!V[string] mapv;
42 			ubyte[][] datas = _rocksdb.multiGet(cast(ubyte[][]) key);
43 			foreach (i, d; datas) {
44 				mapv[key[i]] = get_inter!V(d);
45 			}
46 			return mapv;
47 		}
48 	}
49 
50 	bool containsKey(string key) {
51 		synchronized (this) {
52 			auto data = _rocksdb.get(cast(ubyte[]) key);
53 			if (data == null)
54 				return false;
55 			if (check_is_expired(data)) {
56 				_rocksdb.remove(cast(ubyte[]) key);
57 				return false;
58 			}
59 			return true;
60 		}
61 	}
62 
63 	void put(V)(string key, V v, uint expired = 0) {
64 		synchronized (this) {
65 			_rocksdb.put(cast(ubyte[]) key,
66 					generator_expired(expired) ~ cast(ubyte[]) SerializeToByte!V(v));
67 		}
68 	}
69 
70 	// rocksdb no putifaabsent , so this function not atomic.
71 	bool putifAbsent(V)(string key, V v) {
72 		synchronized (this) {
73 			auto data = _rocksdb.get(cast(ubyte[]) key);
74 			if (data == null || check_is_expired(data)) {
75 				put(key, v);
76 				return true;
77 			}
78 			return false;
79 		}
80 	}
81 
82 	void putAll(V)(V[string] maps, uint expired) {
83 		synchronized (this) {
84 			string[] datas;
85 			if (maps.length == 0)
86 				return;
87 			auto expired_data = generator_expired(expired);
88 			_rocksdb.withBatch((batch) {
89 				foreach (k, v; maps)
90 					batch.put(cast(ubyte[]) k, expired_data ~ cast(ubyte[]) SerializeToByte(v));
91 			});
92 		}
93 	}
94 
95 	bool remove(string key) {
96 		synchronized (this) {
97 			// rocksdb's remove api not return the value.
98 			auto data = _rocksdb.get(cast(ubyte[]) key);
99 			if (data == null) {
100 				return false;
101 			}
102 
103 			if (check_is_expired(data)) {
104 				_rocksdb.remove(cast(ubyte[]) key);
105 				return false;
106 			}
107 
108 			_rocksdb.remove(cast(ubyte[]) key);
109 			return true;
110 		}
111 
112 	}
113 
114 	void removeAll(string[] keys) {
115 		synchronized (this) {
116 			foreach (k; keys) {
117 				_rocksdb.remove(cast(ubyte[]) k);
118 			}
119 		}
120 	}
121 
122 	void clear() {
123 		_rocksdb.close();
124 		std.file.rmdirRecurse(_dir);
125 		create(_dir);
126 
127 	}
128 
129 protected:
130 
131 	void create(string dir) {
132 		auto opts = new DBOptions;
133 		opts.createIfMissing = true;
134 		opts.errorIfExists = false;
135 
136 		_rocksdb = new Database(opts, dir);
137 		_dir = dir;
138 	}
139 
140 	Nullable!V get_inter(V)(ubyte[] data) {
141 		if (data == null)
142 			return Nullable!V.init;
143 
144 		if (check_is_expired(data)) {
145 			_rocksdb.remove(cast(ubyte[]) data);
146 			return Nullable!V.init;
147 		}
148 
149 		return DeserializeToObject!V(cast(byte[]) data[8 .. $]);
150 	}
151 
152 	ubyte[] generator_expired(uint expired) {
153 		byte[8] byExpired;
154 		if (expired == 0) {
155 			return cast(ubyte[]) byExpired.idup;
156 		} else {
157 			ulong stamp = time(null) + expired;
158 			memcpy(byExpired.ptr, cast(void*)&stamp, byExpired.sizeof);
159 		}
160 		return cast(ubyte[]) byExpired.idup;
161 	}
162 
163 	ulong get_expired(ubyte[] data) {
164 		ulong stamp;
165 		memcpy(&stamp, data.ptr, 8);
166 		return stamp;
167 	}
168 
169 	bool check_is_expired(ubyte[] data) {
170 		ulong stamp = time(null);
171 		ulong expired = get_expired(data);
172 		if (expired > 0 && expired < stamp) {
173 			return true;
174 		}
175 		return false;
176 	}
177 
178 	Database _rocksdb;
179 	string _dir;
180 
181 }