View Javadoc

1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.sourceforge.domian.repository;
17  
18  
19  import java.io.File;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.FutureTask;
24  import static java.util.concurrent.TimeUnit.MILLISECONDS;
25  import java.util.concurrent.TimeoutException;
26  
27  import static org.apache.commons.lang.StringUtils.uncapitalize;
28  import static org.apache.commons.lang.SystemUtils.FILE_SEPARATOR;
29  
30  import net.sourceforge.domian.entity.Entity;
31  import net.sourceforge.domian.repository.AbstractDomianCoreRepository;
32  import net.sourceforge.domian.repository.HumanReadableFormatRepository;
33  import net.sourceforge.domian.repository.PersistentRepository;
34  import net.sourceforge.domian.repository.RepositoryException;
35  import static net.sourceforge.domian.util.InstrumentationUtils.buildMessageWithStackTrace;
36  import static net.sourceforge.domian.util.InstrumentationUtils.buildThreadNumberAndMessage;
37  import net.sourceforge.domian.util.concurrent.locks.Synchronizer;
38  import static net.sourceforge.domian.util.concurrent.locks.Synchronizer.MODE.CONCURRENT;
39  import static net.sourceforge.domian.util.concurrent.locks.Synchronizer.MODE.EXCLUSIVE;
40  
41  
42  /**
43   * Common functionality for all Domian XStream-based repositories,
44   * e.g. an extensive set of {@link net.sourceforge.domian.util.concurrent.locks.Synchronizer} convenience methods.
45   * <p/>
46   * File encoding UTF-8 is defined in this class.
47   *
48   * @author Eirik Torske
49   * @since 0.4
50   */
51  abstract class AbstractXStreamXmlFileRepository<T extends Entity> extends AbstractDomianCoreRepository<T> implements HumanReadableFormatRepository<T> {
52  
53      protected static final String XSTREAM_XML_FILE_SUFFIX = ".xstream-1.3.1.xml";
54  
55      private static final int ALL_STACKTRACE_LINES = 60;
56  
57      /** For file-based repositories the <code>repositoryRootPath</code> is the absolute path for all repositories of the same type. */
58      protected String repositoryRootPath;
59  
60      /**
61       * The <i>repository-ID</i> the name of the repository.
62       * It should be unique within a running <i>system</i>.
63       * <p/>
64       * For file-based repositories the <code>${repositoryRootPath}/${repositoryId}</code> forms the absolute folder path in which the repository data resides.
65       */
66      protected String repositoryId;
67  
68      public String getRepositoryRootPath() {
69          return this.repositoryRootPath;
70      }
71  
72      @Override
73      public String getRepositoryId() {
74          return this.repositoryId;
75      }
76  
77      public String getRepositoryPathString() {
78          return getRepositoryRootPath() + FILE_SEPARATOR + getRepositoryId();
79      }
80  
81      @Override
82      public File getRepositoryDirectory() {
83          return new File(getRepositoryPathString());
84      }
85  
86      /**
87       * When using the <code>*WithRetry</code> methods,
88       * this value defines the maximum number of times an operation will be retried.
89       * <p/>
90       * The default value is 2.
91       */
92      private Integer maxNumberOfRetries = 2;
93  
94      /**
95       * When using the <code>*WithRetry</code> methods,
96       * this value defines the timeout for all retried operations.
97       * <p/>
98       * The default value is 500.
99       */
100     private Integer retryTimeoutInMilliseconds = 500;
101 
102     public Integer getMaxNumberOfRetries() {
103         return maxNumberOfRetries;
104     }
105 
106     public void setMaxNumberOfRetries(final Integer maxNumberOfRetries) {
107         this.maxNumberOfRetries = maxNumberOfRetries;
108     }
109 
110     public Integer getRetryTimeoutInMilliseconds() {
111         return retryTimeoutInMilliseconds;
112     }
113 
114     public void setRetryTimeoutInMilliseconds(final Integer retryTimeoutInMilliseconds) {
115         this.retryTimeoutInMilliseconds = retryTimeoutInMilliseconds;
116     }
117 
118     /* Default serialization in XStream {@link com.thoughtworks.xstream.persistence.AbstractFilePersistenceStrategy}. */
119     @Override
120     public String getEncoding() {
121         return "UTF-8";
122     }
123 
124     /** @return the absolute path to the repository location based on root path and the repository ID */
125     protected String getRepositoryPath(final String repositoryRootPath, final PersistentRepository<T> repository) {
126         return repositoryRootPath + FILE_SEPARATOR + repository.getRepositoryId();
127     }
128 
129     ///////////////////////////////////////////////
130     // Synchronizer convenience methods
131     // TODO v0.5.x: move to some common helper class
132     ///////////////////////////////////////////////
133 
134     /** Added for completeness. */
135     protected void run(final Runnable runnable) {
136         runnable.run();
137     }
138 
139     /** Added for completeness. */
140     protected void call(final Callable<T> callable) throws Exception {
141         callable.call();
142     }
143 
144     /**
145      * Convenience method for running a {@link Runnable} in a concurrent manner.
146      *
147      * @see net.sourceforge.domian.util.concurrent.locks.Synchronizer
148      */
149     protected void runConcurrently(final Runnable runnable) {
150         this.synchronizer.runConcurrently(runnable);
151     }
152 
153     /**
154      * Convenience method for running a {@link java.util.concurrent.Callable} in a concurrent manner.
155      *
156      * @see net.sourceforge.domian.util.concurrent.locks.Synchronizer
157      */
158     protected <T> T callConcurrently(final Callable<T> callable) {
159         return this.synchronizer.callConcurrently(callable);
160     }
161 
162     /**
163      * Convenience method for running a {@link java.util.concurrent.Callable} in an exclusively manner.
164      *
165      * @see net.sourceforge.domian.util.concurrent.locks.Synchronizer
166      */
167     protected <T> T callExclusively(final Callable<T> callable) {
168         return this.synchronizer.callExclusively(callable);
169     }
170 
171     /**
172      * Convenience method for running a {@link java.util.concurrent.Callable} in a concurrent manner.
173      * If an exception is thrown, the {@link java.util.concurrent.Callable} will be immediately <i>retried</i>.
174      * The nature of the retries are further defined by the <code>maxNumberOfRetries</code> and
175      * <code>retryTimeoutInMilliseconds</code> member values.
176      */
177     protected <T> T callConcurrentlyWithRetry(final Callable<T> callable) {
178         try {
179             return callConcurrently(callable);
180 
181         } catch (Exception e) {
182             return retry(callable, e, CONCURRENT);
183         }
184     }
185 
186     /**
187      * Convenience method for running a {@link java.util.concurrent.Callable} in an exclusively manner.
188      * If an exception is thrown, the {@link java.util.concurrent.Callable} will be immediately queued for <i>retrial</i>.
189      * The nature of the retries are further defined by the <code>maxNumberOfRetries</code> and
190      * <code>retryTimeoutInMilliseconds</code> member values.
191      */
192     protected <T> T callExclusivelyWithRetry(final Callable<T> callable) {
193         try {
194             return callExclusively(callable);
195 
196         } catch (Exception e) {
197             return retry(callable, e, EXCLUSIVE);
198         }
199     }
200 
201     protected <T> T callConcurrentlyInNewThread(final Callable<T> callable) throws ExecutionException, TimeoutException, InterruptedException {
202         final Future<T> future = new FutureTask<T>(callable);
203         this.synchronizer.runConcurrently((FutureTask) future);
204         return future.get(this.retryTimeoutInMilliseconds, MILLISECONDS);
205     }
206 
207     protected <T> T callExclusivelyInNewThread(final Callable<T> callable) throws ExecutionException, TimeoutException, InterruptedException {
208         final Future<T> future = new FutureTask<T>(callable);
209         this.synchronizer.runExclusively((FutureTask) future);
210         return future.get(this.retryTimeoutInMilliseconds, MILLISECONDS);
211     }
212 
213     /**
214      * Executes the given {@link Runnable} in a fresh and independent thread.
215      * <i>This method does not block the original thread.</i>
216      * The {@link net.sourceforge.domian.util.concurrent.locks.Synchronizer} mode is <i>concurrent</i>.
217      * (An asynchronous version of {@link net.sourceforge.domian.util.concurrent.locks.Synchronizer} in <i>exclusive</i> mode has no meaning,
218      * as it is a "stop-the-world" kind of mode.)
219      */
220     protected void runAsynchronously(final Runnable runnable) {
221         runConcurrently(new FutureTask<T>(runnable, null));
222     }
223 
224     /**
225      * Executes the given {@link java.util.concurrent.Callable} in a fresh thread, with time-out set to <code>retryTimeoutInMilliseconds</code>.
226      * The {@link java.util.concurrent.Callable} will be retried <code>maxNumberOfRetries</code> times.
227      * <p/>
228      * This method will use an uncapitalized version of the given callable's simple class name as method name.
229      */
230     private <T> T retry(final Callable<T> callable, final Exception retryReason, final Synchronizer.MODE synchronizedMode) {
231         return retry(uncapitalize(callable.getClass().getSimpleName()), callable, retryReason, synchronizedMode);
232     }
233 
234     /**
235      * Executes the given {@link java.util.concurrent.Callable} in a fresh thread, with time-out set to <code>retryTimeoutInMilliseconds</code>.
236      * The {@link java.util.concurrent.Callable} will be retried <code>maxNumberOfRetries</code> times.
237      */
238     private <T> T retry(final String methodName, final Callable<T> callable, final Exception reasonForRetry, final Synchronizer.MODE synchronizedMode) {
239         if (reasonForRetry.getCause() instanceof com.thoughtworks.xstream.converters.ConversionException) {
240             if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 1a")); }
241             log.warn(buildThreadNumberAndMessage("XStream conversion failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), 0, 9)));
242             if (log.isDebugEnabled()) {
243                 log.debug(buildThreadNumberAndMessage("XStream conversion failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), ALL_STACKTRACE_LINES, 9)));
244             }
245 
246         } else if (reasonForRetry.getCause() instanceof com.thoughtworks.xstream.io.StreamException) {
247             if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 1b")); }
248             log.warn(buildThreadNumberAndMessage("XStream i/o failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), 0, 0)));
249             if (log.isDebugEnabled()) {
250                 log.debug(buildThreadNumberAndMessage("XStream i/o failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), ALL_STACKTRACE_LINES, 0)));
251             }
252 
253         } else {
254             if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 2")); }
255             log.warn(buildThreadNumberAndMessage("XStream failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), 0, 0)));
256             if (log.isDebugEnabled()) {
257                 log.debug(buildThreadNumberAndMessage("XStream failure while working with persistent data: " + buildMessageWithStackTrace(reasonForRetry, reasonForRetry.toString(), ALL_STACKTRACE_LINES, 0)));
258             }
259         }
260         log.warn(buildThreadNumberAndMessage(this.getClass().getName() + "." + methodName + "() will be retried [max " + this.maxNumberOfRetries + " times, with time-out set to " + this.retryTimeoutInMilliseconds + " ms]"));
261         int numberOfRetries = 0;
262         while (numberOfRetries <= this.maxNumberOfRetries) {
263             if (log.isWarnEnabled()) {
264                 log.warn(buildThreadNumberAndMessage(this.getClass().getName() + "." + methodName + "() retry #" + (numberOfRetries + 1)));
265             }
266             try {
267                 switch (synchronizedMode) {
268                     case CONCURRENT:
269                         return callConcurrentlyInNewThread(callable);
270                     case EXCLUSIVE:
271                         return callExclusivelyInNewThread(callable);
272                     default:
273                         throw new IllegalStateException(synchronizedMode + " is not a valid net.sourceforge.domian.util.Synchronizer mode");
274                 }
275 
276             } catch (Exception e) {
277                 ++numberOfRetries;
278                 if (e instanceof TimeoutException) {
279                     if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 5")); }
280                     log.warn(buildThreadNumberAndMessage("XStream timeout after " + this.retryTimeoutInMilliseconds + " ms while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), 0, 9)));
281                     if (log.isDebugEnabled()) {
282                         log.debug(buildThreadNumberAndMessage("XStream timeout after " + this.retryTimeoutInMilliseconds + " ms while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), ALL_STACKTRACE_LINES, 9)));
283                     }
284 
285                 } else if (reasonForRetry.getCause() instanceof com.thoughtworks.xstream.converters.ConversionException) {
286                     if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 3a")); }
287                     log.warn(buildThreadNumberAndMessage("XStream conversion failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), 0, 9)));
288                     if (log.isDebugEnabled()) {
289                         log.debug(buildThreadNumberAndMessage("XStream conversion failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), ALL_STACKTRACE_LINES, 9)));
290                     }
291 
292                 } else if (reasonForRetry.getCause() instanceof com.thoughtworks.xstream.io.StreamException) {
293                     if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 3b")); }
294                     log.warn(buildThreadNumberAndMessage("XStream i/o failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), 0, 0)));
295                     if (log.isDebugEnabled()) {
296                         log.debug(buildThreadNumberAndMessage("XStream i/o failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), ALL_STACKTRACE_LINES, 0)));
297                     }
298 
299                 } else {
300                     if (log.isDebugEnabled()) { log.debug(buildThreadNumberAndMessage("loggelinje nummer 4")); }
301                     log.warn(buildThreadNumberAndMessage("XStream failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), 0, 0)));
302                     if (log.isDebugEnabled()) {
303                         log.debug(buildThreadNumberAndMessage("XStream failure while working with persistent data: " + buildMessageWithStackTrace(e, e.toString(), ALL_STACKTRACE_LINES, 0)));
304                     }
305                 }
306 
307                 if (numberOfRetries >= this.maxNumberOfRetries) {
308                     throw new RepositoryException(this.getClass().getName() + "." + methodName + "() failed! Max number of retries [" + this.maxNumberOfRetries + "] reached, aborting operation!");
309                 }
310             }
311         }
312         throw new RepositoryException(this.getClass().getName() + "." + methodName + "() failed", reasonForRetry);
313     }
314 }