View Javadoc

1   package net.obsearch.index.bucket.sleek;
2   
3   import java.io.IOException;
4   import java.nio.ByteBuffer;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.ListIterator;
8   
9   import org.junit.Test;
10  
11  import net.obsearch.Index;
12  import net.obsearch.OB;
13  import net.obsearch.OperationStatus;
14  import net.obsearch.Status;
15  import net.obsearch.asserts.OBAsserts;
16  import net.obsearch.constants.ByteConstants;
17  import net.obsearch.exception.IllegalIdException;
18  import net.obsearch.exception.OBException;
19  import net.obsearch.filter.Filter;
20  import net.obsearch.index.bucket.BucketContainer;
21  import net.obsearch.index.bucket.impl.BucketObjectDouble;
22  import net.obsearch.ob.OBDouble;
23  import net.obsearch.query.AbstractOBQuery;
24  import net.obsearch.query.OBQueryDouble;
25  import net.obsearch.stats.Statistics;
26  import net.obsearch.utils.bytes.ByteConversion;
27  
28  /*
29   OBSearch: a distributed similarity search engine This project is to
30   similarity search what 'bit-torrent' is to downloads. 
31   Copyright (C) 2009 Arnoldo Jose Muller Molina
32  
33   This program is free software: you can redistribute it and/or modify
34   it under the terms of the GNU General Public License as published by
35   the Free Software Foundation, either version 3 of the License, or
36   (at your option) any later version.
37  
38   This program is distributed in the hope that it will be useful,
39   but WITHOUT ANY WARRANTY; without even the implied warranty of
40   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
41   GNU General Public License for more details.
42  
43   You should have received a copy of the GNU General Public License
44   along with this program.  If not, see <http://www.gnu.org/licenses/>.
45   */
46  
47  /**
48   * AbstractSleekBucket is designed to hold objects that belong to one bucket.
49   * The bucket will hold ALL the buckets. Optionally, objects inside the bucket
50   * can be used as pivotCount. Format: <object count> <pivot 1> <pivot 2> ...
51   * <pivot n> <distance to pivot 1> <distance to pivot 2> ... <distance to pivot
52   * n> <obj2> .... This bucket design focuses on pivotCount local to a partition
53   * instead of global pivotCount. Also, the storage and reading of the container
54   * is responsability of the index. The class does not access directly the index
55   * unlike the bucket.* classes we now have something that partitions the data
56   * properly.
57   * 
58   * @author Arnoldo Jose Muller Molina
59   */
60  
61  public class SleekBucketDouble<O extends OBDouble> implements
62  		BucketContainer<O, BucketObjectDouble<O>, OBQueryDouble<O>> {
63  
64  	/**
65  	 * Number of objects in the bucket.
66  	 */
67  	private int count = 0;
68  	/**
69  	 * Number of pivotCount in the bucket.
70  	 */
71  	private final static int pivotCount = 0;
72  
73  	/**
74  	 * The list of pivot objects;
75  	 */
76  	private List<BucketObjectDouble<O>> pivots;
77  
78  	/**
79  	 * The list of objects.
80  	 */
81  	private List<BucketObjectDouble<O>> objects;
82  
83  	/**
84  	 * Type of the object that is being stored.
85  	 */
86  	private Class<O> type;
87  	
88  	/**
89  	 * Flag used to know when the data has been modified.
90  	 */
91  	private boolean modified = false;
92  
93  	/**
94  	 * Positive means that records are fixed. Negative means the maximum size of
95  	 * the records in this bucket. Zero means that no objects have been added.
96  	 * Mode is only updated before writing a bucket.
97  	 */
98  	private int mode = 0;
99  
100 	/**
101 	 * Create an empty, new bucket.
102 	 * 
103 	 * @param pivotCount
104 	 *            number of pivotCount to use.
105 	 */
106 	public SleekBucketDouble(Class<O> type, int pivots) {
107 		//this.pivotCount = pivots;
108 		this.type = type;
109 		this.pivots = new ArrayList<BucketObjectDouble<O>>(pivotCount);
110 		objects = new ArrayList<BucketObjectDouble<O>>(count);
111 	}
112 
113 
114 	public List<BucketObjectDouble<O>> getObjects(){
115 			return objects;
116 	}
117 
118 	/**
119 	 * Create a new SleekBucket based on the given data.
120 	 * 
121 	 * @param pivotCount
122 	 * @param data
123 	 * @throws IOException
124 	 * @throws OBException
125 	 * @throws IllegalAccessException
126 	 * @throws InstantiationException
127 	 * @throws IOException 
128 	 */
129 	public SleekBucketDouble(Class<O> type, int pivots, byte[] data)
130 			throws InstantiationException, IllegalAccessException, OBException, IOException
131 			 {
132 		this(type, pivots);
133 		parseData(pivots, data);
134 	}
135 
136 	/**
137 	 * Parses the data from the given byte array.
138 	 * 
139 	 * @param data
140 	 * @throws IllegalAccessException
141 	 * @throws InstantiationException
142 	 * @throws IOException
143 	 * @throws OBException
144 	 */
145 	public void parseData(int pivotCount, byte[] data)
146 			throws InstantiationException, IllegalAccessException, OBException,
147 			IOException {
148 		pivots = new ArrayList<BucketObjectDouble<O>>(pivotCount);
149 		ByteBuffer buf = ByteConversion.createByteBuffer(data);
150 		// read the number of objects included in the bucket.
151 		count = buf.getInt();
152 		mode = buf.getInt();
153 		objects = new ArrayList<BucketObjectDouble<O>>(count);
154 		// get the pivots!
155 		int i = 0;
156 		while (i < Math.min(pivotCount , count)) {			
157 			pivots.add(readBucketObject(0, buf));
158 			i++;
159 		}
160 		// now we can start getting objects.
161 		// continue loading objects
162 		while (i < count) {			
163 			objects.add(readBucketObject(pivotCount, buf));
164 			i++;
165 		}
166 		assert count == (pivots.size() + objects.size());	
167 		assert (! buf.hasRemaining()) : "Remaining: " + buf.remaining() + " Position: " + buf.position() + " capacity: " + buf.capacity() + " count: " + count;
168 		
169 	}
170 	
171 	/**
172 	 * Read the next object in the given buffer
173 	 * @param pivots number of pivots to use
174 	 * @return A new BucketObject
175 	 * @throws IOException 
176 	 * @throws OBException 
177 	 * @throws IllegalAccessException 
178 	 * @throws InstantiationException 
179 	 */
180 	private BucketObjectDouble<O> readBucketObject(int pivots, ByteBuffer buf) throws OBException, IOException, InstantiationException, IllegalAccessException{
181 		O obj = type.newInstance();
182 		BucketObjectDouble<O> bucket = new BucketObjectDouble<O>();
183 		bucket.read(buf, pivots);
184 		byte[] objectRawData = getNextObjectChunk(buf);
185 		obj.load(objectRawData);
186 		bucket.setObject(obj);
187 		return bucket;
188 	}
189 
190 	private ByteConstants getAppropiate(int size) {
191 		if (size <= Byte.MAX_VALUE) {
192 			return ByteConstants.Byte;
193 		} else if (size <= Short.MAX_VALUE) {
194 			return ByteConstants.Short;
195 		} else if (size <= Integer.MAX_VALUE) {
196 			return ByteConstants.Int;
197 		} else {
198 			return null;
199 		}
200 	}
201 
202 	public boolean equals(Object o) {
203 		SleekBucketDouble<O> another = (SleekBucketDouble<O>) o;
204 		if(another.count != this.count){
205 			return false;
206 		}
207 		try {
208 			for (BucketObjectDouble<O> p : pivots) {
209 				if (! another.pivots.contains(p)) {
210 					return false;
211 				}
212 			}
213 			
214 			for(BucketObjectDouble<O> b : objects){
215 				if (! another.objects.contains(b)) {
216 					return false;
217 				}
218 			}
219 			
220 			return true;
221 
222 		} catch (Exception e) {
223 			throw new IllegalArgumentException(e);
224 		}
225 	}
226 
227 	private byte[] getNextObjectChunk(ByteBuffer buf) {
228 
229 		byte[] res;
230 		int size;
231 		if (mode > 0) {
232 			size = mode;
233 		} else {
234 			assert mode != 0;
235 			ByteConstants mo = getAppropiate(Math.abs(mode));
236 			if (mo == ByteConstants.Byte) {
237 				size = buf.get();
238 			} else if (mo == ByteConstants.Short) {
239 				size = buf.getShort();
240 			} else if (mo == ByteConstants.Int) {
241 				size = buf.getInt();
242 			} else {
243 					assert false ;
244 				return null;
245 			}
246 
247 		}
248 		res = new byte[size];
249 		buf.get(res);
250 		return res;
251 	}
252 
253 	private void putNextObjectChunk(ByteBuffer buf, byte[] data) {
254 
255 		int size = data.length;
256 		if (mode > 0) {
257 			assert data.length == mode;
258 		} else {
259 			assert mode != 0;
260 			ByteConstants mo = getAppropiate(Math.abs(mode));
261 			if (mo == ByteConstants.Byte) {
262 				assert size <= Byte.MAX_VALUE;
263 				buf.put((byte) size);				
264 			} else if (mo == ByteConstants.Short) {
265 				assert size <= Short.MAX_VALUE;
266 				buf.putShort((short) size);
267 			} else if (mo == ByteConstants.Int) {
268 				assert size <= Integer.MAX_VALUE;
269 				buf.putInt(size);
270 			} else {
271 				assert false;
272 			}
273 
274 		}
275 		buf.put(data);
276 
277 	}
278 
279 	private void removeAll() {
280 		pivots.clear();
281 		objects.clear();
282 		count = 0;
283 	}
284 	
285 	/**
286 	 * Remove the object if it is found in the list
287 	 * @param list
288 	 * @param object
289 	 * @return the object removed or null otherwise
290 	 */
291 	private BucketObjectDouble<O> removeObject(List<BucketObjectDouble<O>> list, O object){
292 		ListIterator<BucketObjectDouble<O>> it = list.listIterator();
293 		while (it.hasNext()) {
294 			BucketObjectDouble<O> b = it.next();
295 			if (b.getObject().equals(object)) {
296 				it.remove();
297 				return b;
298 			}
299 		}
300 		return null;
301 	}
302 
303 	@Override
304 	public OperationStatus delete(BucketObjectDouble<O> bucket, O object)
305 			throws OBException, IllegalIdException, IllegalAccessException,
306 			InstantiationException {
307 		OperationStatus result = new OperationStatus();
308 		result.setStatus(Status.NOT_EXISTS);
309 		// if we have to remove a pivot we have to remove all objects,
310 		// shuffle them and re-insert them again.
311 		// start with the pivots.
312 		BucketObjectDouble<O> removed = removeObject(pivots, object);
313 		if (removed != null) {
314 			
315 			// removed one of the pivots, we have to re-build the container.
316 			result.setStatus(Status.OK);
317 			result.setId(removed.getId());
318 			assert this.existsObjects(object, pivots) == null;
319 			List<BucketObjectDouble<O>> objs = new ArrayList<BucketObjectDouble<O>>(count);
320 			for (BucketObjectDouble<O> p : pivots) {
321 				objs.add(p);
322 			}
323 			for (BucketObjectDouble<O> b : objects) {
324 				objs.add(b);
325 			}
326 
327 			removeAll(); // empty the bucket.
328 			for (BucketObjectDouble<O> o : objs) {
329 				insertBulk(o, o.getObject());
330 			}
331 		} else {
332 			// bucket.
333 			removed = removeObject(objects, object);
334 			if(removed != null){
335 				result.setStatus(Status.OK);
336 				result.setId(removed.getId());
337 			}
338 		}
339 		if(result.getStatus() == Status.OK){
340 			this.modified = true;
341 		}
342 		count = pivots.size() + objects.size();
343 		assert count == (pivots.size() + objects.size());
344 		return result;
345 	}
346 	
347 	public boolean isModified(){
348 		return modified;
349 	}
350 
351 	@Override
352 	public OperationStatus exists(BucketObjectDouble<O> bucket, O object)
353 			throws OBException, IllegalIdException, IllegalAccessException,
354 			InstantiationException {
355 		OperationStatus res = new OperationStatus();
356 		BucketObjectDouble<O> inPivots = existsObjects(object, pivots);
357 		BucketObjectDouble<O> inObjects = existsObjects(object, objects);
358 		if (inPivots != null || inObjects != null) {
359 			res.setStatus(Status.EXISTS);
360 			if(inPivots != null){
361 				res.setId(inPivots.getId());
362 			}else if(inObjects != null){
363 				res.setId(inObjects.getId());
364 			}
365 		} else {
366 			res.setStatus(Status.NOT_EXISTS);
367 		}
368 		return res;
369 	}
370 	/**
371 	 * Check if the given object exists on the given list
372 	 * @param object the object to search
373 	 * @return the bucket if it exists otherwise null
374 	 */
375 	private BucketObjectDouble<O> existsObjects(O object, List<BucketObjectDouble<O>> list){
376 		for(BucketObjectDouble<O> o : list){
377 			if( o.getObject().equals(object)){
378 				return o;
379 			}
380 		}
381 		return null;
382 	}
383 
384 	@Override
385 	public int getPivots() {
386 		return pivotCount;
387 	}
388 
389 	@Override
390 	public OperationStatus insert(BucketObjectDouble<O> bucket, O object)
391 			throws OBException, IllegalIdException, IllegalAccessException,
392 			InstantiationException {
393 		OperationStatus exists = exists(bucket, object);
394 		if (exists.getStatus() == Status.EXISTS) {
395 			return exists;
396 		} else {
397 			return insertBulk(bucket, object);
398 		}
399 
400 	}
401 
402 	@Override
403 	public OperationStatus insertBulk(BucketObjectDouble<O> bucket, O object)
404 			throws OBException, IllegalIdException, IllegalAccessException,
405 			InstantiationException {
406 		OperationStatus res = new OperationStatus();
407 		if (pivots.size() < pivotCount) { // not enough pivots? we add it to the
408 			// pivots.
409 			//OBAsserts.chkAssert(bucket.getObject().equals(object), "Both objects should be the same");
410 			bucket.setObject(object);
411 			
412 			pivots.add(bucket);
413 			res.setStatus(Status.OK);
414 		} else {
415 			// enough pivots, we calculate the pivot vector
416 			objects.add(createBucket(bucket, object));
417 			res.setStatus(Status.OK);
418 		}
419 		modified = true;
420 		if(res.getStatus() == Status.OK){
421 			// inserted, update id
422 			res.setId(bucket.getId());
423 		}
424 		count++;
425 		assert count == pivots.size() + objects.size();
426 		return res;
427 	}
428 
429 	private BucketObjectDouble<O> createBucket(BucketObjectDouble<O> bucket, O object) throws OBException {
430 		double[] pivotVector = new double[pivotCount];
431 		int i = 0;
432 		while (i < pivotCount) {
433 			pivotVector[i] = object.distance(pivots.get(i).getObject());
434 			i++;
435 		}
436 		bucket.setSmapVector(pivotVector);
437 		return bucket;
438 	}
439 
440 	@Override
441 	public void search(AbstractOBQuery<O> q, BucketObjectDouble<O> bucket,
442 			Filter<O> filter, Statistics stats) throws IllegalAccessException,
443 			OBException, InstantiationException, IllegalIdException {
444 		search((OBQueryDouble) q, bucket, filter, stats);
445 	}
446 
447 	@Override
448 	public void search(OBQueryDouble<O> query, BucketObjectDouble<O> bucket,
449 			ByteBuffer b, Filter<O> filter, Statistics stats)
450 			throws IllegalAccessException, OBException, InstantiationException,
451 			IllegalIdException {
452 		throw new IllegalArgumentException();
453 	}
454 
455 	@Override
456 	public void search(OBQueryDouble<O> query, BucketObjectDouble<O> bucket,
457 			Filter<O> filter, Statistics stats) throws IllegalAccessException,
458 			OBException, InstantiationException, IllegalIdException {
459 		// must add also the pivots.
460 		
461 		int i = 0;
462 		double[] pivotVector = new double[pivotCount];
463 		while (i < pivotCount) {
464 			BucketObjectDouble<O> p = pivots.get(i); // get the pivot.
465 			double distance = query.getObject().distance(p.getObject());
466 			query.add(p.getId(), p.getObject(), distance);
467 			pivotVector[i] = distance;
468 			stats.incDistanceCount();
469 			i++;
470 		}
471 		this.createBucket(bucket, query.getObject());		
472 		BucketObjectDouble<O> b = new BucketObjectDouble<O>(pivotVector, -1,
473 				query.getObject());
474 		// now we can match the remaining of the objects.
475 		for (BucketObjectDouble<O> db : objects) {
476 			double lowerBound;
477 			if(pivotCount > 0){
478 				lowerBound = b.lInf(db);
479 			}else{
480 				lowerBound = 0;
481 			}
482 			stats.incSmapCount();
483 			if (query.isCandidate(lowerBound) && (filter == null || filter.accept(db.getObject(), query.getObject()))) {
484 				double distance = query.getObject().distance(db.getObject());
485 				stats.incDistanceCount();
486 				query.add(db.getId(), db.getObject(), distance);
487 			}
488 		}
489 	}
490 
491 	private int objectsCount(){
492 		return objects.size();
493 	}
494 	
495 	private int pivotsCount(){
496 		return pivots.size();
497 	}
498 	
499 	/**
500 	 * Serialize the bucket into a stream of bytes.
501 	 * 
502 	 * @return The list of
503 	 * @throws IOException
504 	 * @throws OBException
505 	 */
506 	public byte[] serialize() throws OBException {
507 		//OBAsserts.chkAssert(size() > 0, "Do not serialize an empty bucket");
508 		
509 		ArrayList<byte[]> serializedPivots = new ArrayList<byte[]>(pivotsCount());
510 		ArrayList<byte[]> serializedObjects = new ArrayList<byte[]>(objectsCount());
511 		int objectBytes = 0;
512 		// serialize pivots
513 		int minSize = Integer.MAX_VALUE;
514 		int maxSize = Integer.MIN_VALUE;
515 		try{
516 		for (BucketObjectDouble<O> p : pivots) {
517 			byte[] t = p.getObject().store();
518 			objectBytes += t.length;
519 			serializedPivots.add(t);
520 			minSize = Math.min(t.length, minSize);
521 			maxSize = Math.max(t.length, maxSize);
522 		}
523 		// serialize all other objects.
524 		for (BucketObjectDouble<O> p : objects) {
525 			byte[] t = p.getObject().store();
526 			objectBytes += t.length;
527 			serializedObjects.add(t);
528 			minSize = Math.min(t.length, minSize);
529 			maxSize = Math.max(t.length, maxSize);
530 		}
531 		}catch(IOException e){
532 			throw new OBException(e);
533 		}
534 		// calculate the mode of the index.
535 		if (minSize == maxSize) {
536 			mode = maxSize;
537 		} else {
538 			mode = -1 * maxSize;
539 		}
540 		int miniHeaders;
541 		if (mode > 0) {
542 			miniHeaders = 0; // no space required.
543 		} else {
544 			miniHeaders = getAppropiate(Math.abs(mode)).getSize()
545 					* size();
546 		}
547 		int bufferSize = HEADER_SIZE + objectBytes
548 				+ (objectsCount() * pivotCount * DISTANCE_SIZE)
549 				+ miniHeaders + (Index.ID_SIZE * size());
550 		ByteBuffer buf = ByteConversion.createByteBuffer(bufferSize);
551 		buf.putInt(size()); // write size
552 		buf.putInt(mode);
553 
554 		// write the pivots:
555 
556 		
557 		
558 		int i = 0;
559 		while (i < serializedPivots.size()) {
560 			BucketObjectDouble<O> b = pivots.get(i);
561 			putObject(b, serializedPivots.get(i), buf);
562 			i++;
563 		}
564 
565 		// write the rest of the objects.
566 		i = 0;
567 		while (i < serializedObjects.size()) {
568 			BucketObjectDouble<O> b = objects.get(i);
569 			putObject(b, serializedObjects.get(i), buf);
570 			i++;
571 		}
572 		assert count == pivots.size() + objects.size();
573 		//assert buf.remaining() == 0 : "Remaining: " + buf.remaining();
574 		return buf.array();
575 	}
576 	
577 	/**
578 	 * Write the given bucket and object data into buf
579 	 * @param b bucket
580 	 * @param object object data
581 	 * @param buf the buffer in which we will write everything
582 	 */
583 	private void putObject(BucketObjectDouble<O> b, byte[] object, ByteBuffer buf) throws OBException{
584 		// write the bucket
585 		OBAsserts.chkAssert(object.length > 0, "Cannot store empty objects");
586 		b.write(buf);
587 		// now we can write the object
588 		putNextObjectChunk(buf, object);
589 	}
590 
591 	/**
592 	 * Header size
593 	 */
594 	final int HEADER_SIZE = ByteConstants.Int.getSize() * 2;
595 	final int DISTANCE_SIZE = ByteConstants.Double.getSize();
596 
597 	@Override
598 	public void setKey(byte[] key) {
599 		throw new IllegalArgumentException();
600 	}
601 
602 	@Override
603 	public void setPivots(int pivots) {
604 			//this.pivotCount = pivots;
605 	}
606 
607 	@Override
608 	public int size()  {
609 		return count;
610 	}
611 	
612 	public String toString(){
613 		return "Size: " + size() + " pivots: " + pivotCount + " mode: " + mode;
614 	}
615 
616 }
617