1 module hunt.cache.adapter.RocksdbAdapter;
2 
3 // dfmt off
4 version (WITH_HUNT_ROCKSDB)  : 
5 // dfmt on
6 
7 import hunt.cache.adapter.Adapter;
8 import hunt.cache.Store;
9 import hunt.cache.CacheOptions;
10 
11 import core.time;
12 import core.stdc.time;
13 import core.stdc.string;
14 import core.thread;
15 
16 import std.file;
17 import std.string;
18 import hunt.cache.Nullable;
19 
20 import rocksdb;
21 
22 class RocksdbAdapter : Adapter
23 {
24     this(CacheOptions.rocksdb config)
25     {
26         create(config.file);
27     }
28 
29     ~this()
30     {
31         _rocksdb.close();
32     }
33 
34     Nullable!V get(V) (string key)
35     {
36         synchronized (this) {
37             auto data = _rocksdb.get(cast(ubyte[]) key);
38 
39             return get_inter!V(data);
40 
41         }
42     }
43 
44     Nullable!V[string] getAll(V) (string[] key)
45     {
46         synchronized (this) {
47             Nullable!V[string] mapv;
48             ubyte[][] datas = _rocksdb.multiGet(cast(ubyte[][]) key);
49             foreach (i, d; datas) {
50                 mapv[key[i]] = get_inter!V(d);
51             }
52             return mapv;
53         }
54     }
55 
56     bool hasKey(string key)
57     {
58         synchronized (this)
59         {
60             auto data = _rocksdb.get(cast(ubyte[]) key);
61             if (data == null)
62                 return false;
63             if (check_is_expired(data)) {
64                 _rocksdb.remove(cast(ubyte[]) key);
65                 return false;
66             }
67             return true;
68         }
69     }
70 
71     void set(V) (string key, V v, uint expired = 0)
72     {
73         synchronized (this) {
74             _rocksdb.put(cast(ubyte[]) key,
75                     generator_expired(expired) ~ cast(ubyte[]) SerializeToByte!V(v));
76         }
77     }
78 
79     // rocksdb no putifaabsent, so this function not atomic.
80     bool setIfAbsent(V) (string key, V v)
81     {
82         synchronized (this) {
83             auto data = _rocksdb.get(cast(ubyte[]) key);
84             if (data == null || check_is_expired(data)) {
85                 put(key, v);
86                 return true;
87             }
88             return false;
89         }
90     }
91 
92     void set(V) (V[string] maps, uint expired)
93     {
94         synchronized (this) {
95             string[] datas;
96             if (maps.length == 0)
97                 return;
98             auto expired_data = generator_expired(expired);
99             _rocksdb.withBatch((batch) {
100                 foreach (k, v; maps)
101                     batch.put(cast(ubyte[]) k, expired_data ~ cast(ubyte[]) SerializeToByte(v));
102             });
103         }
104     }
105 
106     bool remove(string key)
107     {
108         synchronized (this) {
109             // rocksdb's remove api not return the value.
110             auto data = _rocksdb.get(cast(ubyte[]) key);
111             if (data == null) {
112                 return false;
113             }
114 
115             if (check_is_expired(data)) {
116                 _rocksdb.remove(cast(ubyte[]) key);
117                 return false;
118             }
119 
120             _rocksdb.remove(cast(ubyte[]) key);
121             return true;
122         }
123     }
124 
125     void remove(string[] keys)
126     {
127         synchronized (this) {
128             foreach (k; keys) {
129                 _rocksdb.remove(cast(ubyte[]) k);
130             }
131         }
132     }
133 
134     void clear()
135     {
136         _rocksdb.close();
137         std.file.rmdirRecurse(_dir);
138         create(_dir);
139     }
140 
141 protected:
142 
143     void create(string dir) {
144         auto opts = new DBOptions;
145         opts.createIfMissing = true;
146         opts.errorIfExists = false;
147 
148         _rocksdb = new Database(opts, dir);
149         _dir = dir;
150     }
151 
152     Nullable!V get_inter(V) (ubyte[] data) {
153         if (data == null)
154             return Nullable!V.init;
155 
156         if (check_is_expired(data)) {
157             _rocksdb.remove(cast(ubyte[]) data);
158             return Nullable!V.init;
159         }
160 
161         return DeserializeToObject!V(cast(byte[]) data[8 .. $]);
162     }
163 
164     ubyte[] generator_expired(uint expired) {
165         byte[8] byExpired;
166         if (expired == 0) {
167             return cast(ubyte[]) byExpired.idup;
168         } else {
169             ulong stamp = time(null) + expired;
170             memcpy(byExpired.ptr, cast(void*)&stamp, byExpired.sizeof);
171         }
172         return cast(ubyte[]) byExpired.idup;
173     }
174 
175     ulong get_expired(ubyte[] data) {
176         ulong stamp;
177         memcpy(&stamp, data.ptr, 8);
178         return stamp;
179     }
180 
181     bool check_is_expired(ubyte[] data) {
182         ulong stamp = time(null);
183         ulong expired = get_expired(data);
184         if (expired > 0 && expired < stamp) {
185             return true;
186         }
187         return false;
188     }
189 
190     Database _rocksdb;
191     string _dir;
192 }