1 package net.obsearch.storage.tc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import net.obsearch.storage.OBStore;
23
24 import hep.aida.bin.StaticBin1D;
25
26 import java.io.File;
27 import java.nio.ByteBuffer;
28 import java.util.Iterator;
29 import java.util.NoSuchElementException;
30 import java.util.Arrays;
31
32 import tokyocabinet.BDB;
33 import tokyocabinet.DBM;
34 import tokyocabinet.FDB;
35 import tokyocabinet.HDB;
36
37 import net.obsearch.Index;
38 import net.obsearch.Status;
39 import net.obsearch.asserts.OBAsserts;
40 import net.obsearch.exception.OBException;
41 import net.obsearch.exception.OBStorageException;
42 import net.obsearch.index.utils.ByteArrayComparator;
43 import net.obsearch.storage.CloseIterator;
44 import net.obsearch.storage.OBStorageConfig;
45 import net.obsearch.storage.OBStore;
46 import net.obsearch.storage.Tuple;
47 import net.obsearch.storage.TupleBytes;
48 import net.obsearch.utils.bytes.ByteConversion;
49 import net.obsearch.storage.OBStoreFactory;
50 import net.obsearch.storage.OBStorageConfig.IndexType;
51
52 public abstract class AbstractTCOBStorage<T extends Tuple> implements
53 OBStore<T> {
54
55 protected StaticBin1D stats = new StaticBin1D();
56
57
58
59
60 private DBM db;
61
62
63
64
65 private HDB hdb;
66
67
68
69
70 private String name;
71
72
73
74
75 protected OBStoreFactory fact;
76
77
78
79
80 private final int SEQUENCE_RECORD = 0;
81
82
83
84
85 private FDB metadata;
86
87 private long currentId = 0;
88
89
90
91
92 private boolean doingCursor = false;
93
94 private OBStorageConfig storageConf;
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 public AbstractTCOBStorage(String name, DBM db, OBStoreFactory fact,
111 OBStorageConfig storageConf) throws OBStorageException, OBException {
112 this.db = db;
113 this.name = name;
114 this.fact = fact;
115 OBStorageConfig conf = new OBStorageConfig();
116
117 conf.setRecordSize(Index.ID_SIZE);
118 metadata = new FDB();
119 metadata.open(fact.getFactoryLocation() + File.separator + name
120 + "_META", FDB.OWRITER | FDB.OCREAT | FDB.OREADER);
121 currentId = getSequence();
122 if(storageConf.getIndexType() == IndexType.HASH){
123 this.hdb = (HDB)db;
124 }
125 this.storageConf = storageConf;
126 }
127
128 public Object getStats() throws OBException{
129 return null;
130 }
131
132
133
134
135
136
137 private long getSequence() {
138 byte[] rec = metadata.get(fact.serializeInt(SEQUENCE_RECORD));
139 if (rec == null) {
140 return 0;
141 } else {
142 return fact.deSerializeLong(rec);
143 }
144
145 }
146
147 private void putSequence(long id) {
148 metadata
149 .put(fact.serializeInt(SEQUENCE_RECORD), fact.serializeLong(id));
150 }
151
152 public OBStoreFactory getFactory() {
153 return fact;
154 }
155
156 public void close() throws OBStorageException {
157 boolean res;
158 String code;
159 if (db instanceof FDB) {
160 FDB dbx = (FDB) db;
161 res = dbx.close();
162 code = lastErrorString();
163 } else if (db instanceof HDB) {
164 res = ((HDB) db).close();
165 code = lastErrorString();
166 } else if (db instanceof BDB) {
167 res = ((BDB) db).close();
168 code = lastErrorString();
169 } else {
170 throw new OBStorageException("Invalid method");
171 }
172 if (!res) {
173 throw new OBStorageException("Unable to close the database: "
174 + code);
175 }
176 this.putSequence(currentId);
177 OBAsserts.chkAssertStorage(metadata.close(),
178 "Could not close metadata DB");
179 }
180
181 private int lastErrorCode() {
182 int res = Integer.MIN_VALUE;
183 if (db instanceof FDB) {
184 res = ((FDB) db).ecode();
185 } else if (db instanceof HDB) {
186 res = ((HDB) db).ecode();
187 } else if (db instanceof BDB) {
188 res = ((BDB) db).ecode();
189 }
190 return res;
191 }
192
193 public void optimize() throws OBStorageException{
194 boolean res = false;
195 if (db instanceof FDB) {
196 res = ((FDB) db).optimize();
197 } else if (db instanceof HDB) {
198 res = ((HDB) db).optimize();
199 } else if (db instanceof BDB) {
200 res = ((BDB) db).optimize();
201 }
202 if(! res){
203 throw new OBStorageException(this.lastErrorString());
204 }
205 }
206
207 private String lastErrorString() {
208 String res = "";
209 if (db instanceof FDB) {
210 res = ((FDB) db).errmsg();
211 } else if (db instanceof HDB) {
212 res = ((HDB) db).errmsg();
213 } else if (db instanceof BDB) {
214 res = ((BDB) db).errmsg();
215 }
216 return res;
217 }
218
219 public net.obsearch.OperationStatus delete(byte[] key)
220 throws OBStorageException {
221 net.obsearch.OperationStatus r = new net.obsearch.OperationStatus();
222 if (db.out(key)) {
223 r.setStatus(Status.OK);
224 } else {
225 r.setStatus(Status.NOT_EXISTS);
226 r.setMsg(this.lastErrorString());
227 }
228 return r;
229 }
230
231 public void deleteAll() throws OBStorageException {
232
233 boolean res = false;
234 if (db instanceof FDB) {
235 res = ((FDB) db).vanish();
236 } else if (db instanceof HDB) {
237 res = ((HDB) db).vanish();
238 } else if (db instanceof BDB) {
239 res = ((BDB) db).vanish();
240 }
241
242 if (!res) {
243 throw new OBStorageException("Could not truncate the database: "
244 + this.lastErrorString());
245 }
246 }
247
248 public String getName() {
249 return this.name;
250 }
251
252 public byte[] getValue(byte[] key) throws IllegalArgumentException,
253 OBStorageException {
254 if (storageConf.isDuplicates()) {
255 throw new IllegalArgumentException();
256 }
257
258 byte[] value = db.get(key);
259 return value;
260
261 }
262
263 public net.obsearch.OperationStatus put(byte[] key, byte[] value)
264 throws OBStorageException {
265 checkFixedRecord(value);
266 net.obsearch.OperationStatus res = new net.obsearch.OperationStatus();
267 boolean ok = false;
268 if(this.storageConf.getIndexType() == IndexType.HASH){
269 ok = hdb.putasync(key, value);
270 }else{
271 ok = db.put(key, value);
272 }
273
274 if (ok) {
275 res.setStatus(Status.OK);
276 } else {
277 res.setMsg(this.lastErrorString());
278 res.setStatus(Status.ERROR);
279 }
280 return res;
281 }
282
283 private void checkFixedRecord(byte[] value) throws OBStorageException{
284 OBAsserts
285 .chkAssertStorage(
286 storageConf.getIndexType() != IndexType.FIXED_RECORD
287 || (storageConf.getIndexType() == IndexType.FIXED_RECORD && value.length == storageConf
288 .getRecordSize()),
289 "Record size does not match the size for this index");
290 }
291
292 public net.obsearch.OperationStatus putIfNew(byte[] key, byte[] value)
293 throws OBStorageException {
294 checkFixedRecord(value);
295 net.obsearch.OperationStatus res = new net.obsearch.OperationStatus();
296 if (db.putkeep(key, value)) {
297 res.setStatus(Status.OK);
298 } else {
299 res.setMsg(this.lastErrorString());
300 if(this.lastErrorCode() == BDB.EKEEP){
301 res.setStatus(Status.EXISTS);
302 }else{
303 res.setStatus(Status.ERROR);
304 }
305 }
306 return res;
307 }
308
309 public boolean allowsDuplicatedData() {
310 return storageConf.isDuplicates();
311 }
312
313 public CloseIterator<TupleBytes> processRange(byte[] low, byte[] high)
314 throws OBStorageException {
315 throw new IllegalArgumentException();
316 }
317
318 public CloseIterator<TupleBytes> processRangeNoDup(byte[] low, byte[] high)
319 throws OBStorageException {
320 throw new IllegalArgumentException();
321 }
322
323 public CloseIterator<TupleBytes> processRangeReverse(byte[] low, byte[] high)
324 throws OBStorageException {
325 throw new IllegalArgumentException();
326 }
327
328 public CloseIterator<TupleBytes> processRangeReverseNoDup(byte[] low,
329 byte[] high) throws OBStorageException {
330 throw new IllegalArgumentException();
331 }
332
333 public CloseIterator<byte[]> processAllKeys() throws OBStorageException {
334 return new ByteArrayKeyIterator();
335 }
336
337
338
339
340
341
342
343 protected abstract class CursorIterator<T> implements CloseIterator<T> {
344
345 protected byte[] nextKey;
346
347 protected byte[] nextValue;
348
349 protected byte[] lastReturnedKey;
350
351 protected CursorIterator() throws OBStorageException {
352 if (doingCursor) {
353 throw new OBStorageException("Only one cursor at a time");
354 }
355 doingCursor = true;
356 db.iterinit();
357
358 loadNext();
359
360 }
361
362 public boolean hasNext() {
363 return nextKey != null;
364 }
365
366
367
368
369
370
371 protected void loadNext() throws NoSuchElementException {
372 nextKey = db.iternext();
373 if (nextKey != null) {
374 nextValue = db.get(nextKey);
375 } else {
376
377 doingCursor = false;
378 }
379 }
380
381 protected T createT(byte[] key, byte[] value) {
382 return createTuple(key, value);
383 }
384
385
386
387
388
389
390
391
392
393
394
395 protected abstract T createTuple(byte[] key, byte[] value);
396
397 public T next() {
398 T res = createT(nextKey, nextValue);
399 lastReturnedKey = nextKey;
400 loadNext();
401 return res;
402
403 }
404
405 public void closeCursor() throws OBException {
406 doingCursor = false;
407 }
408
409 public void remove() {
410 if (lastReturnedKey != null) {
411 db.out(lastReturnedKey);
412 }
413
414 }
415
416 }
417
418 public long size() throws OBStorageException {
419
420 long res = db.rnum();
421
422 return res;
423
424 }
425
426
427
428
429
430
431 public long nextId() throws OBStorageException {
432 synchronized (metadata) {
433 long res = currentId;
434 this.currentId++;
435 return res;
436 }
437 }
438
439 @Override
440 public StaticBin1D getReadStats() {
441 return this.stats;
442 }
443
444 @Override
445 public void setReadStats(StaticBin1D stats) {
446 this.stats = stats;
447 }
448
449 protected class ByteArrayIterator extends CursorIterator<TupleBytes> {
450
451 protected ByteArrayIterator() throws OBStorageException {
452 super();
453 }
454
455 @Override
456 protected TupleBytes createTuple(byte[] key, byte[] value) {
457 return new TupleBytes(key, value);
458 }
459 }
460
461 protected class ByteArrayKeyIterator extends CursorIterator<byte[]> {
462
463 protected ByteArrayKeyIterator() throws OBStorageException {
464 super();
465 }
466
467 @Override
468 protected byte[] createTuple(byte[] key, byte[] value) {
469 return key;
470 }
471
472 protected void loadNext() throws NoSuchElementException {
473 nextKey = db.iternext();
474 }
475
476 }
477
478 }