1 package net.obsearch.distance;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5
6 import org.apache.log4j.Logger;
7
8 import net.obsearch.exception.OBException;
9 import net.obsearch.filter.Filter;
10 import net.obsearch.ob.OBInt;
11 import net.obsearch.query.OBQueryInt;
12
13 public final class OBDistanceCalculatorInt<O extends OBInt> {
14
15 private static final transient Logger logger = Logger
16 .getLogger(OBDistanceCalculatorInt.class);
17
18 private boolean[] available;
19 private Exception e = null;
20 private Exec<O>[] execs;
21 private Semaphore sem;
22 private final int threadCount;
23
24 public OBDistanceCalculatorInt(int threads) {
25 available = new boolean[threads];
26
27 execs = new Exec[threads];
28
29 int i = 0;
30 while (i < available.length) {
31 available[i] = true;
32 execs[i] = new Exec<O>(i);
33 new Thread(execs[i], "Dist-" + i).start();
34 i++;
35 }
36 this.threadCount = threads;
37 sem = new Semaphore(threads);
38 }
39
40
41
42
43
44
45
46
47
48 public void process(long idObj, O obj, O q, OBQueryInt<O> query,
49 Filter<O> filter) throws OBException {
50 if (e != null) {
51 throw new OBException(e);
52 }
53 sem.acquireUninterruptibly();
54
55
56 int i = 0;
57 while (i < threadCount) {
58 if (available[i]) {
59 break;
60 }
61 i++;
62 }
63
64 Exec<O> e = execs[i];
65 e.init(idObj, obj, q, query, filter);
66 e.go();
67
68 }
69
70 private final class Exec<OB extends OBInt> implements Runnable {
71 private OB obj;
72 private OB q;
73 private OBQueryInt<OB> queryResult;
74 private long idObj;
75 private int threadId;
76 private Semaphore control;
77 private Filter<OB> filter;
78
79 public Exec(int threadId) {
80 this.threadId = threadId;
81 control = new Semaphore(0);
82 }
83
84 private void go() {
85 control.release();
86 }
87
88 public void init(long idObj, OB obj, OB q, OBQueryInt<OB> query,
89 Filter<OB> filter) {
90 this.idObj = idObj;
91 this.obj = obj;
92 this.q = q;
93 this.queryResult = query;
94 this.filter = filter;
95 }
96
97 @Override
98 public void run() {
99 try {
100 while (true) {
101 control.acquireUninterruptibly();
102 if (filter == null || filter.accept(obj, q)) {
103 int realDistance = obj.distance(q);
104 if (realDistance <= queryResult.getDistance()) {
105 queryResult.add(idObj, obj, realDistance);
106 }
107 }
108 available[threadId] = true;
109 sem.release();
110 }
111 } catch (Exception ex) {
112 logger.fatal(ex);
113 synchronized (available) {
114 e = ex;
115 }
116
117 }
118 }
119 }
120
121 }