View Javadoc

1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.sourceforge.domian.util.concurrent.locks;
17  
18  
19  import static java.lang.Boolean.FALSE;
20  import static java.lang.Boolean.TRUE;
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.Semaphore;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.slf4j.Logger;
26  import static org.slf4j.LoggerFactory.getLogger;
27  
28  import static net.sourceforge.domian.util.InstrumentationUtils.TRACE_LEVEL_INDENTATION;
29  import static net.sourceforge.domian.util.InstrumentationUtils.buildThreadNumberAndMessage;
30  
31  
32  /**
33   * A {@link Synchronizer} implementation using two {@link Semaphore} instances.
34   *
35   * @author Eirik Torske
36   * @since 0.4
37   */
38  public final class SemaphoreSynchronizer implements Synchronizer {
39  
40      private static final Logger log = getLogger(SemaphoreSynchronizer.class);
41  
42      private static final int MAX_NUMBER_OF_CONCURRENT_PERMITS = Integer.MAX_VALUE;
43      private static final boolean FAIR = TRUE;
44      private static final boolean NON_FAIR = FALSE;
45  
46      /** Semaphore managing permits and blocking queue for concurrent-mode blocks. */
47      private final Semaphore concurrentAccessSemaphore = new Semaphore(MAX_NUMBER_OF_CONCURRENT_PERMITS, NON_FAIR);
48  
49      /** Semaphore managing one single permit and blocking queue for exclusive-mode blocks. */
50      private final Semaphore exclusiveAccessSemaphore = new Semaphore(1, FAIR);
51  
52      private static final AtomicBoolean hasAcquiredExecutionPermitAtomicBoolean = new AtomicBoolean(FALSE);
53  
54      /**
55       * Permits must only be acquired once per thread.
56       * This is needed to prevent deadlock for nested invocations.
57       */
58      private static final ThreadLocal<Boolean> hasAcquiredExecutionPermitThreadLocal =
59              new ThreadLocal<Boolean>() {
60                  @Override
61                  protected Boolean initialValue() {
62                      return hasAcquiredExecutionPermitAtomicBoolean.get();
63                  }
64              };
65  
66      private static Boolean thisThreadHasAlreadyAcquiredExecutionPermit() {
67          return hasAcquiredExecutionPermitThreadLocal.get();
68      }
69  
70      private static void setThisThreadHasAcquiredExecutionPermitTo(final Boolean booleanValue) {
71          hasAcquiredExecutionPermitThreadLocal.set(booleanValue);
72      }
73  
74  
75      public <R extends Runnable> void runConcurrently(final R runnable) {
76          if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
77              runnable.run();
78  
79          } else {
80              this.concurrentAccessSemaphore.acquireUninterruptibly();
81              if (log.isTraceEnabled()) {
82                  log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runConcurrently", "concurrentAccessSemaphore permit acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
83              }
84              setThisThreadHasAcquiredExecutionPermitTo(TRUE);
85              try {
86                  runnable.run();
87  
88              } finally {
89                  setThisThreadHasAcquiredExecutionPermitTo(FALSE);
90                  this.concurrentAccessSemaphore.release();
91                  if (log.isTraceEnabled()) {
92                      log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runConcurrently", "concurrentAccessSemaphore permit released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
93                  }
94              }
95          }
96      }
97  
98  
99      public <T, C extends Callable<T>> T callConcurrently(final C callable) {
100         try {
101             if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
102                 return callable.call();
103 
104             } else {
105                 this.concurrentAccessSemaphore.acquireUninterruptibly();
106                 if (log.isTraceEnabled()) {
107                     log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callConcurrently", "concurrentAccessSemaphore permit acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
108                 }
109                 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
110                 try {
111                     return callable.call();
112 
113                 } finally {
114                     setThisThreadHasAcquiredExecutionPermitTo(FALSE);
115                     this.concurrentAccessSemaphore.release();
116                     if (log.isTraceEnabled()) {
117                         log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callConcurrently", "concurrentAccessSemaphore permit released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
118                     }
119                 }
120             }
121 
122         } catch (Throwable t) {
123             /* TODO: illegal package reference direction
124             if (t instanceof RepositoryException) {
125                 throw (RepositoryException) t;
126             } */
127             throw new RuntimeException(t);
128         }
129     }
130 
131 
132     public <R extends Runnable> void runExclusively(final R runnable) {
133         if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
134             runnable.run();
135 
136         } else {
137             /* Acquire exclusive access permit */
138             this.exclusiveAccessSemaphore.acquireUninterruptibly();
139             if (log.isTraceEnabled()) {
140                 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "exclusiveAccessSemaphore permit acquired, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
141             }
142 
143             /* Acquire all concurrent access permits denying access for all runnable invocations, but let already running threads finish */
144             this.concurrentAccessSemaphore.acquireUninterruptibly(MAX_NUMBER_OF_CONCURRENT_PERMITS);
145             if (log.isTraceEnabled()) {
146                 log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "all concurrentMethodSemaphore permits acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
147             }
148 
149             setThisThreadHasAcquiredExecutionPermitTo(TRUE);
150             try {
151                 runnable.run();
152 
153             } finally {
154                 setThisThreadHasAcquiredExecutionPermitTo(FALSE);
155 
156                 /* Release all concurrent access permits */
157                 this.concurrentAccessSemaphore.release(MAX_NUMBER_OF_CONCURRENT_PERMITS);
158                 if (log.isTraceEnabled()) {
159                     log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "all concurrentAccessSemaphore permits released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
160                 }
161 
162                 /* Release exclusive access permit */
163                 this.exclusiveAccessSemaphore.release();
164                 if (log.isTraceEnabled()) {
165                     log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "runExclusively", "exclusiveAccessSemaphore permit released, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
166                 }
167             }
168         }
169     }
170 
171 
172     public <T, C extends Callable<T>> T callExclusively(final C callable) {
173         try {
174             if (thisThreadHasAlreadyAcquiredExecutionPermit()) {
175                 return callable.call();
176 
177             } else {
178                 /* Acquire exclusive access permit */
179                 this.exclusiveAccessSemaphore.acquire();
180                 if (log.isTraceEnabled()) {
181                     log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "exclusiveAccessSemaphore permit acquired, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
182                 }
183 
184                 /* Acquire all concurrent access permits denying access for all callable invocations, but let already running threads finish */
185                 this.concurrentAccessSemaphore.acquire(MAX_NUMBER_OF_CONCURRENT_PERMITS);
186                 if (log.isTraceEnabled()) {
187                     log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "all concurrentMethodSemaphore permits acquired, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
188                 }
189 
190                 setThisThreadHasAcquiredExecutionPermitTo(TRUE);
191                 try {
192                     return callable.call();
193 
194                 } finally {
195                     setThisThreadHasAcquiredExecutionPermitTo(FALSE);
196 
197                     /* Release all concurrent access permits */
198                     if (log.isTraceEnabled()) {
199                         log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "releasing all concurrentAccessSemaphore permits, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
200                     }
201                     this.concurrentAccessSemaphore.release(MAX_NUMBER_OF_CONCURRENT_PERMITS);
202                     if (this.concurrentAccessSemaphore.availablePermits() < 10) {
203                         log.info("Low number of permits detected, available permits=" + this.concurrentAccessSemaphore.availablePermits());
204                         log.info("Draining permits!");
205                         this.concurrentAccessSemaphore.drainPermits();
206                         log.info("Permits drained! Available permits now=" + this.concurrentAccessSemaphore.availablePermits());
207                     }
208                     if (log.isTraceEnabled()) {
209                         log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "all concurrentAccessSemaphore permits released, available permits=" + this.concurrentAccessSemaphore.availablePermits()));
210                     }
211 
212                     /* Release exclusive access permit */
213                     this.exclusiveAccessSemaphore.release();
214                     if (log.isTraceEnabled()) {
215                         log.trace(TRACE_LEVEL_INDENTATION + buildThreadNumberAndMessage(this, "callExclusively", "exclusiveAccessSemaphore permit released, available permits=" + this.exclusiveAccessSemaphore.availablePermits()));
216                     }
217                 }
218             }
219 
220         } catch (Throwable t) {
221             /* TODO: illegal package reference direction
222             if (t instanceof RepositoryException) {
223                 throw (RepositoryException) t;
224             } */
225             throw new RuntimeException(t);
226         }
227     }
228 }