View Javadoc

1   package net.obsearch.storage.tc;
2   
3   /*
4    * OBSearch: a distributed similarity search engine This project is to
5    * similarity search what 'bit-torrent' is to downloads. Copyright (C) 2008
6    * Arnoldo Jose Muller Molina
7    * 
8    * This program is free software: you can redistribute it and/or modify it
9    * under the terms of the GNU General Public License as published by the
10   * Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   * 
13   * This program is distributed in the hope that it will be useful, but
14   * WITHOUT ANY WARRANTY; without even the implied warranty of
15   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
16   * Public License for more details.
17   * 
18   * You should have received a copy of the GNU General Public License along
19   * with this program. If not, see <http://www.gnu.org/licenses/>.
20   */
21  
22  import net.obsearch.storage.OBStore;
23  
24  import hep.aida.bin.StaticBin1D;
25  
26  import java.io.File;
27  import java.nio.ByteBuffer;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.Arrays;
31  
32  import tokyocabinet.BDB;
33  import tokyocabinet.DBM;
34  import tokyocabinet.FDB;
35  import tokyocabinet.HDB;
36  
37  import net.obsearch.Index;
38  import net.obsearch.Status;
39  import net.obsearch.asserts.OBAsserts;
40  import net.obsearch.exception.OBException;
41  import net.obsearch.exception.OBStorageException;
42  import net.obsearch.index.utils.ByteArrayComparator;
43  import net.obsearch.storage.CloseIterator;
44  import net.obsearch.storage.OBStorageConfig;
45  import net.obsearch.storage.OBStore;
46  import net.obsearch.storage.Tuple;
47  import net.obsearch.storage.TupleBytes;
48  import net.obsearch.utils.bytes.ByteConversion;
49  import net.obsearch.storage.OBStoreFactory;
50  import net.obsearch.storage.OBStorageConfig.IndexType;
51  
52  public abstract class AbstractTCOBStorage<T extends Tuple> implements
53  		OBStore<T> {
54  
55  	protected StaticBin1D stats = new StaticBin1D();
56  
57  	/**
58  	 * Tokyo cabinet database
59  	 */
60  	private DBM db;
61  	
62  	/**
63  	 * Used for special async operations available on the HDB 
64  	 */
65  	private HDB hdb;
66  
67  	/**
68  	 * Name of the database.
69  	 */
70  	private String name;
71  
72  	/**
73  	 * Factory of this storage device.
74  	 */
75  	protected OBStoreFactory fact;
76  
77  	/**
78  	 * Sequences are stored in this record.
79  	 */
80  	private final int SEQUENCE_RECORD = 0;
81  
82  	/**
83  	 * METADATA info.
84  	 */
85  	private FDB metadata;
86  
87  	private long currentId = 0;
88  
89  	/**
90  	 * Only one cursor operation allowed at once.
91  	 */
92  	private boolean doingCursor = false;
93  
94  	private OBStorageConfig storageConf;
95  
96  	/**
97  	 * Builds a new Storage system by receiving a Berkeley DB database.
98  	 * 
99  	 * @param db
100 	 *            The database to be stored.
101 	 * @param name
102 	 *            Name of the database.
103 	 * @param sequences
104 	 *            Database used to store sequences.
105 	 * @throws OBException
106 	 * @throws OBStorageException
107 	 * @throws DatabaseException
108 	 *             if something goes wrong with the database.
109 	 */
110 	public AbstractTCOBStorage(String name, DBM db, OBStoreFactory fact,
111 			OBStorageConfig storageConf) throws OBStorageException, OBException {
112 		this.db = db;
113 		this.name = name;
114 		this.fact = fact;
115 		OBStorageConfig conf = new OBStorageConfig();
116 		//conf.setFixedSizeIndex(true);
117 		conf.setRecordSize(Index.ID_SIZE);
118 		metadata = new FDB();
119 		metadata.open(fact.getFactoryLocation() + File.separator + name
120 				+ "_META", FDB.OWRITER | FDB.OCREAT | FDB.OREADER);
121 		currentId = getSequence();
122 		if(storageConf.getIndexType() == IndexType.HASH){
123 			this.hdb = (HDB)db;
124 		}
125 		this.storageConf = storageConf;
126 	}
127 	
128 	public Object getStats() throws  OBException{
129 		return null;
130 	}
131 
132 	/**
133 	 * Return the current sequence id.
134 	 * 
135 	 * @return the current sequence id.
136 	 */
137 	private long getSequence() {
138 		byte[] rec = metadata.get(fact.serializeInt(SEQUENCE_RECORD));
139 		if (rec == null) {
140 			return 0;
141 		} else {
142 			return fact.deSerializeLong(rec);
143 		}
144 
145 	}
146 
147 	private void putSequence(long id) {
148 		metadata
149 				.put(fact.serializeInt(SEQUENCE_RECORD), fact.serializeLong(id));
150 	}
151 
152 	public OBStoreFactory getFactory() {
153 		return fact;
154 	}
155 
156 	public void close() throws OBStorageException {
157 		boolean res;
158 		String code;
159 		if (db instanceof FDB) {
160 			FDB dbx = (FDB) db;
161 			res = dbx.close();
162 			code = lastErrorString();
163 		} else if (db instanceof HDB) {
164 			res = ((HDB) db).close();
165 			code = lastErrorString();
166 		} else if (db instanceof BDB) {
167 			res = ((BDB) db).close();
168 			code = lastErrorString();
169 		} else {
170 			throw new OBStorageException("Invalid method");
171 		}
172 		if (!res) {
173 			throw new OBStorageException("Unable to close the database: "
174 					+ code);
175 		}
176 		this.putSequence(currentId);
177 		OBAsserts.chkAssertStorage(metadata.close(),
178 				"Could not close metadata DB");
179 	}
180 
181 	private int lastErrorCode() {
182 		int res = Integer.MIN_VALUE;
183 		if (db instanceof FDB) {
184 			res = ((FDB) db).ecode();
185 		} else if (db instanceof HDB) {
186 			res = ((HDB) db).ecode();
187 		} else if (db instanceof BDB) {
188 			res = ((BDB) db).ecode();
189 		}
190 		return res;
191 	}
192 	
193 	public void optimize() throws OBStorageException{
194 		boolean res = false;
195 		if (db instanceof FDB) {
196 			res = ((FDB) db).optimize();
197 		} else if (db instanceof HDB) {
198 			res = ((HDB) db).optimize();
199 		} else if (db instanceof BDB) {
200 			res = ((BDB) db).optimize();
201 		}
202 		if(! res){
203 			throw new OBStorageException(this.lastErrorString());
204 		}
205 	}
206 
207 	private String lastErrorString() {
208 		String res = "";
209 		if (db instanceof FDB) {
210 			res = ((FDB) db).errmsg();
211 		} else if (db instanceof HDB) {
212 			res = ((HDB) db).errmsg();
213 		} else if (db instanceof BDB) {
214 			res = ((BDB) db).errmsg();
215 		}
216 		return res;
217 	}
218 
219 	public net.obsearch.OperationStatus delete(byte[] key)
220 			throws OBStorageException {
221 		net.obsearch.OperationStatus r = new net.obsearch.OperationStatus();
222 		if (db.out(key)) {
223 			r.setStatus(Status.OK);
224 		} else {
225 			r.setStatus(Status.NOT_EXISTS);
226 			r.setMsg(this.lastErrorString());
227 		}
228 		return r;
229 	}
230 
231 	public void deleteAll() throws OBStorageException {
232 
233 		boolean res = false;
234 		if (db instanceof FDB) {
235 			res = ((FDB) db).vanish();
236 		} else if (db instanceof HDB) {
237 			res = ((HDB) db).vanish();
238 		} else if (db instanceof BDB) {
239 			res = ((BDB) db).vanish();
240 		}
241 
242 		if (!res) {
243 			throw new OBStorageException("Could not truncate the database: "
244 					+ this.lastErrorString());
245 		}
246 	}
247 
248 	public String getName() {
249 		return this.name;
250 	}
251 
252 	public byte[] getValue(byte[] key) throws IllegalArgumentException,
253 			OBStorageException {
254 		if (storageConf.isDuplicates()) {
255 			throw new IllegalArgumentException();
256 		}
257 
258 		byte[] value = db.get(key);
259 		return value;
260 
261 	}
262 
263 	public net.obsearch.OperationStatus put(byte[] key, byte[] value)
264 			throws OBStorageException {
265 		checkFixedRecord(value);
266 		net.obsearch.OperationStatus res = new net.obsearch.OperationStatus();
267 		boolean ok = false;
268 		if(this.storageConf.getIndexType() == IndexType.HASH){
269 			ok = hdb.putasync(key, value);
270 		}else{
271 			ok = db.put(key, value);
272 		}
273 		
274 		if (ok) {
275 			res.setStatus(Status.OK);
276 		} else {
277 			res.setMsg(this.lastErrorString());
278 			res.setStatus(Status.ERROR);
279 		}
280 		return res;
281 	}
282 	
283 	private void checkFixedRecord(byte[] value) throws OBStorageException{
284 		OBAsserts
285 		.chkAssertStorage(
286 				storageConf.getIndexType() != IndexType.FIXED_RECORD
287 						|| (storageConf.getIndexType() == IndexType.FIXED_RECORD && value.length == storageConf
288 								.getRecordSize()),
289 				"Record size does not match the size for this index");
290 	}
291 
292 	public net.obsearch.OperationStatus putIfNew(byte[] key, byte[] value)
293 			throws OBStorageException {
294 		checkFixedRecord(value);
295 		net.obsearch.OperationStatus res = new net.obsearch.OperationStatus();
296 		if (db.putkeep(key, value)) {
297 			res.setStatus(Status.OK);
298 		} else {
299 			res.setMsg(this.lastErrorString());
300 			if(this.lastErrorCode() == BDB.EKEEP){
301 				res.setStatus(Status.EXISTS);
302 			}else{
303 				res.setStatus(Status.ERROR);
304 			}
305 		}
306 		return res;
307 	}
308 
309 	public boolean allowsDuplicatedData() {
310 		return storageConf.isDuplicates();
311 	}
312 
313 	public CloseIterator<TupleBytes> processRange(byte[] low, byte[] high)
314 			throws OBStorageException {
315 		throw new IllegalArgumentException();
316 	}
317 
318 	public CloseIterator<TupleBytes> processRangeNoDup(byte[] low, byte[] high)
319 			throws OBStorageException {
320 		throw new IllegalArgumentException();
321 	}
322 
323 	public CloseIterator<TupleBytes> processRangeReverse(byte[] low, byte[] high)
324 			throws OBStorageException {
325 		throw new IllegalArgumentException();
326 	}
327 
328 	public CloseIterator<TupleBytes> processRangeReverseNoDup(byte[] low,
329 			byte[] high) throws OBStorageException {
330 		throw new IllegalArgumentException();
331 	}
332 
333 	public CloseIterator<byte[]> processAllKeys() throws OBStorageException {
334 		return new ByteArrayKeyIterator();
335 	}
336 
337 	/**
338 	 * Base class used to iterate over cursors. Only supports full search
339 	 * 
340 	 * @param <O>
341 	 *            The type of tuple that will be returned by the iterator.
342 	 */
343 	protected abstract class CursorIterator<T> implements CloseIterator<T> {
344 
345 		protected byte[] nextKey;
346 
347 		protected byte[] nextValue;
348 
349 		protected byte[] lastReturnedKey;
350 
351 		protected CursorIterator() throws OBStorageException {
352 			if (doingCursor) {
353 				throw new OBStorageException("Only one cursor at a time");
354 			}
355 			doingCursor = true;
356 			db.iterinit();
357 
358 			loadNext();
359 
360 		}
361 
362 		public boolean hasNext() {
363 			return nextKey != null;
364 		}
365 
366 		/**
367 		 * Loads data from keyEntry and dataEntry and puts it into next. If we
368 		 * go beyond max, we set next to null so that everybody will work
369 		 * properly.
370 		 */
371 		protected void loadNext() throws NoSuchElementException {
372 			nextKey = db.iternext();
373 			if (nextKey != null) {
374 				nextValue = db.get(nextKey);
375 			} else {
376 				// stop the cursor.
377 				doingCursor = false;
378 			}
379 		}
380 
381 		protected T createT(byte[] key, byte[] value) {
382 			return createTuple(key, value);
383 		}
384 
385 		/**
386 		 * Creates a tuple from the given key and value.
387 		 * 
388 		 * @param key
389 		 *            raw key.
390 		 * @param value
391 		 *            raw value.
392 		 * @return A new tuple of type T created from the raw data key and
393 		 *         value.
394 		 */
395 		protected abstract T createTuple(byte[] key, byte[] value);
396 
397 		public T next() {
398 			T res = createT(nextKey, nextValue);
399 			lastReturnedKey = nextKey;
400 			loadNext();
401 			return res;
402 
403 		}
404 
405 		public void closeCursor() throws OBException {
406 			doingCursor = false;
407 		}
408 
409 		public void remove() {
410 			if (lastReturnedKey != null) {
411 				db.out(lastReturnedKey);
412 			}
413 
414 		}
415 
416 	}
417 
418 	public long size() throws OBStorageException {
419 
420 		long res = db.rnum();
421 
422 		return res;
423 
424 	}
425 
426 	/**
427 	 * Returns the next id from the database (incrementing sequences).
428 	 * 
429 	 * @return The next id that can be inserted.
430 	 */
431 	public long nextId() throws OBStorageException {
432 		synchronized (metadata) {
433 			long res = currentId;
434 			this.currentId++;
435 			return res;
436 		}
437 	}
438 
439 	@Override
440 	public StaticBin1D getReadStats() {
441 		return this.stats;
442 	}
443 
444 	@Override
445 	public void setReadStats(StaticBin1D stats) {
446 		this.stats = stats;
447 	}
448 
449 	protected class ByteArrayIterator extends CursorIterator<TupleBytes> {
450 
451 		protected ByteArrayIterator() throws OBStorageException {
452 			super();
453 		}
454 
455 		@Override
456 		protected TupleBytes createTuple(byte[] key, byte[] value) {
457 			return new TupleBytes(key, value);
458 		}
459 	}
460 
461 	protected class ByteArrayKeyIterator extends CursorIterator<byte[]> {
462 
463 		protected ByteArrayKeyIterator() throws OBStorageException {
464 			super();
465 		}
466 
467 		@Override
468 		protected byte[] createTuple(byte[] key, byte[] value) {
469 			return key;
470 		}
471 
472 		protected void loadNext() throws NoSuchElementException {
473 			nextKey = db.iternext();
474 		}
475 
476 	}
477 
478 }