1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sourceforge.domian.util.concurrent.locks;
17
18
19 import static java.lang.Boolean.FALSE;
20 import static java.lang.Boolean.TRUE;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.slf4j.Logger;
26 import static org.slf4j.LoggerFactory.getLogger;
27
28 import static net.sourceforge.domian.util.InstrumentationUtils.TRACE_LEVEL_INDENTATION;
29 import static net.sourceforge.domian.util.InstrumentationUtils.buildThreadNumberAndMessage;
30
31
32
33
34
35
36
37
38 public final class SemaphoreSynchronizer implements Synchronizer {
39
40 private static final Logger log = getLogger(SemaphoreSynchronizer.class);
41
42 private static final int MAX_NUMBER_OF_CONCURRENT_PERMITS = Integer.MAX_VALUE;
43 private static final boolean FAIR = TRUE;
44 private static final boolean NON_FAIR = FALSE;
45
46
47 private final Semaphore concurrentAccessSemaphore = new Semaphore(MAX_NUMBER_OF_CONCURRENT_PERMITS, NON_FAIR);
48
49
50 private final Semaphore exclusiveAccessSemaphore = new Semaphore(1, FAIR);
51
52 private static final AtomicBoolean hasAcquiredExecutionPermitAtomicBoolean = new AtomicBoolean(FALSE);
53
54
55
56
57
58 private static final ThreadLocal<Boolean> hasAcquiredExecutionPermitThreadLocal =
59 new ThreadLocal<Boolean>() {
60 @Override
61 protected Boolean initialValue() {
62 return hasAcquiredExecutionPermitAtomicBoolean.get();
63 }
64 };
65
66 private static Boolean thisThreadHasAlreadyAcquiredExecutionPermit() {
67 return hasAcquiredExecutionPermitThreadLocal.get();
68 }
69
70 private static void setThisThreadHasAcquiredExecutionPermitTo(final Boolean booleanValue) {
71 hasAcquiredExecutionPermitThreadLocal.set(booleanValue);
72 }
73
74
75 public <R extends Runnable> void runConcurrently(final R runnable) {
76 if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
77 runnable.run();
78
79 } else {
80 this.concurrentAccessSemaphore.acquireUninterruptibly();
81 if (log.isTraceEnabled()) {
82 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runConcurrently", "concurrentAccessSemaphore permit acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
83 }
84 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
85 try {
86 runnable.run();
87
88 } finally {
89 setThisThreadHasAcquiredExecutionPermitTo(FALSE);
90 this.concurrentAccessSemaphore.release();
91 if (log.isTraceEnabled()) {
92 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runConcurrently", "concurrentAccessSemaphore permit released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
93 }
94 }
95 }
96 }
97
98
99 public <T, C extends Callable<T>> T callConcurrently(final C callable) {
100 try {
101 if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
102 return callable.call();
103
104 } else {
105 this.concurrentAccessSemaphore.acquireUninterruptibly();
106 if (log.isTraceEnabled()) {
107 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callConcurrently", "concurrentAccessSemaphore permit acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
108 }
109 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
110 try {
111 return callable.call();
112
113 } finally {
114 setThisThreadHasAcquiredExecutionPermitTo(FALSE);
115 this.concurrentAccessSemaphore.release();
116 if (log.isTraceEnabled()) {
117 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callConcurrently", "concurrentAccessSemaphore permit released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
118 }
119 }
120 }
121
122 } catch (Throwable t) {
123
124
125
126
127 throw new RuntimeException(t);
128 }
129 }
130
131
132 public <R extends Runnable> void runExclusively(final R runnable) {
133 if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
134 runnable.run();
135
136 } else {
137
138 this.exclusiveAccessSemaphore.acquireUninterruptibly();
139 if (log.isTraceEnabled()) {
140 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "exclusiveAccessSemaphore permit acquired, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
141 }
142
143
144 this.concurrentAccessSemaphore.acquireUninterruptibly(MAX_NUMBER_OF_CONCURRENT_PERMITS);
145 if (log.isTraceEnabled()) {
146 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "all concurrentMethodSemaphore permits acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
147 }
148
149 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
150 try {
151 runnable.run();
152
153 } finally {
154 setThisThreadHasAcquiredExecutionPermitTo(FALSE);
155
156
157 this.concurrentAccessSemaphore.release(MAX_NUMBER_OF_CONCURRENT_PERMITS);
158 if (log.isTraceEnabled()) {
159 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "all concurrentAccessSemaphore permits released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
160 }
161
162
163 this.exclusiveAccessSemaphore.release();
164 if (log.isTraceEnabled()) {
165 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "exclusiveAccessSemaphore permit released, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
166 }
167 }
168 }
169 }
170
171
172 public <T, C extends Callable<T>> T callExclusively(final C callable) {
173 try {
174 if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
175 return callable.call();
176
177 } else {
178
179 this.exclusiveAccessSemaphore.acquire();
180 if (log.isTraceEnabled()) {
181 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "exclusiveAccessSemaphore permit acquired, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
182 }
183
184
185 this.concurrentAccessSemaphore.acquire(MAX_NUMBER_OF_CONCURRENT_PERMITS);
186 if (log.isTraceEnabled()) {
187 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "all concurrentMethodSemaphore permits acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
188 }
189
190 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
191 try {
192 return callable.call();
193
194 } finally {
195 setThisThreadHasAcquiredExecutionPermitTo(FALSE);
196
197
198 if (log.isTraceEnabled()) {
199 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "releasing all concurrentAccessSemaphore permits, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
200 }
201 this.concurrentAccessSemaphore.release(MAX_NUMBER_OF_CONCURRENT_PERMITS);
202 if (this.concurrentAccessSemaphore.availablePermits() < 10) {
203 log.info("Low number of permits detected, available permits=" + this.concurrentAccessSemaphore.availablePermits());
204 log.info("Draining permits!");
205 this.concurrentAccessSemaphore.drainPermits();
206 log.info("Permits drained! Available permits now=" + this.concurrentAccessSemaphore.availablePermits());
207 }
208 if (log.isTraceEnabled()) {
209 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "all concurrentAccessSemaphore permits released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
210 }
211
212
213 this.exclusiveAccessSemaphore.release();
214 if (log.isTraceEnabled()) {
215 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "exclusiveAccessSemaphore permit released, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
216 }
217 }
218 }
219
220 } catch (Throwable t) {
221
222
223
224
225 throw new RuntimeException(t);
226 }
227 }
228 }