View Javadoc

1   package net.obsearch.distance;
2   
3   import java.util.ArrayList;
4   import java.util.concurrent.Semaphore;
5   
6   import org.apache.log4j.Logger;
7   
8   import net.obsearch.exception.OBException;
9   import net.obsearch.filter.Filter;
10  import net.obsearch.ob.OBInt;
11  import net.obsearch.query.OBQueryInt;
12  
13  public final class OBDistanceCalculatorInt<O extends OBInt> {
14  
15  	private static final transient Logger logger = Logger
16  			.getLogger(OBDistanceCalculatorInt.class);
17  
18  	private boolean[] available;
19  	private Exception e = null;
20  	private Exec<O>[] execs;
21  	private Semaphore sem;
22  	private final int threadCount;
23  
24  	public OBDistanceCalculatorInt(int threads) {
25  		available = new boolean[threads];
26  
27  		execs = new Exec[threads];
28  
29  		int i = 0;
30  		while (i < available.length) {
31  			available[i] = true;
32  			execs[i] = new Exec<O>(i);
33  			new Thread(execs[i], "Dist-" + i).start();
34  			i++;
35  		}
36  		this.threadCount = threads;
37  		sem = new Semaphore(threads);
38  	}
39  
40  	/**
41  	 * Process asyncrhonously a and b.
42  	 * 
43  	 * @param a
44  	 * @param b
45  	 * @param query
46  	 * @throws Exception
47  	 */
48  	public void process(long idObj, O obj, O q, OBQueryInt<O> query,
49  			Filter<O> filter) throws OBException {
50  		if (e != null) {
51  			throw new OBException(e);
52  		}
53  		sem.acquireUninterruptibly(); // only work if there are free threads.
54  		// free permit implies that at least one thread is waiting
55  		// to receive orders.
56  		int i = 0;
57  		while (i < threadCount) {
58  			if (available[i]) {
59  				break;
60  			}
61  			i++;
62  		}
63  		// thread i is ready to be used.
64  		Exec<O> e = execs[i];
65  		e.init(idObj, obj, q, query, filter);
66  		e.go();
67  
68  	}
69  
70  	private final class Exec<OB extends OBInt> implements Runnable {
71  		private OB obj;
72  		private OB q;
73  		private OBQueryInt<OB> queryResult;
74  		private long idObj;
75  		private int threadId;
76  		private Semaphore control;
77  		private Filter<OB> filter;
78  
79  		public Exec(int threadId) {
80  			this.threadId = threadId;
81  			control = new Semaphore(0);
82  		}
83  
84  		private void go() {
85  			control.release();
86  		}
87  
88  		public void init(long idObj, OB obj, OB q, OBQueryInt<OB> query,
89  				Filter<OB> filter) {
90  			this.idObj = idObj;
91  			this.obj = obj;
92  			this.q = q;
93  			this.queryResult = query;
94  			this.filter = filter;
95  		}
96  
97  		@Override
98  		public void run() {
99  			try {
100 				while (true) {
101 					control.acquireUninterruptibly();
102 					if (filter == null || filter.accept(obj, q)) {
103 						int realDistance = obj.distance(q);
104 						if (realDistance <= queryResult.getDistance()) {
105 							queryResult.add(idObj, obj, realDistance);
106 						}
107 					}
108 					available[threadId] = true;
109 					sem.release();
110 				}
111 			} catch (Exception ex) {
112 				logger.fatal(ex);
113 				synchronized (available) {
114 					e = ex;
115 				}
116 
117 			}
118 		}
119 	}
120 
121 }