View Javadoc

1   /*
2    * Copyright 2006-2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.sourceforge.domian.repository;
17  
18  
19  import java.io.File;
20  import java.io.Serializable;
21  import java.lang.ref.WeakReference;
22  import java.lang.reflect.Constructor;
23  import java.lang.reflect.InvocationHandler;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.Map;
32  import java.util.NoSuchElementException;
33  import java.util.Set;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  
37  import org.apache.commons.lang.NotImplementedException;
38  import org.apache.commons.lang.Validate;
39  
40  import net.sourceforge.domian.entity.Entity;
41  import net.sourceforge.domian.specification.CompositeSpecification;
42  import net.sourceforge.domian.specification.Specification;
43  import net.sourceforge.domian.util.StopWatch;
44  import net.sourceforge.domian.util.concurrent.locks.SemaphoreSynchronizer;
45  import net.sourceforge.domian.util.concurrent.locks.Synchronizer;
46  
47  import static java.lang.Boolean.FALSE;
48  import static java.lang.Boolean.TRUE;
49  import static java.util.Collections.emptySet;
50  import static net.sourceforge.domian.specification.SpecificationFactory.allEntities;
51  import static net.sourceforge.domian.specification.SpecificationFactory.allObjects;
52  import static net.sourceforge.domian.specification.SpecificationFactory.createSpecificationFor;
53  import static net.sourceforge.domian.specification.SpecificationUtils.typeSafeIsSatisfiedBy;
54  import static net.sourceforge.domian.util.InstrumentationUtils.DEBUG_LEVEL_INDENTATION;
55  import static net.sourceforge.domian.util.InstrumentationUtils.TRACE_LEVEL_INDENTATION;
56  import static net.sourceforge.domian.util.ReflectionUtils.canCastFrom_To;
57  import static org.apache.commons.lang.StringUtils.isNotBlank;
58  
59  
60  /**
61   * {@link InvocationHandler} implementation for <i>partition repositories</i>.
62   * All repositories either consisting of, or belonging to a {@link PartitionRepository} may use this repository wrapper.
63   *
64   * @author Eirik Torske
65   * @since 0.4
66   */
67  public class PartitionRepositoryInvocationHandler<T extends Entity>
68          extends AbstractDomianCoreRepository<T>
69          implements InvocationHandler, PartitionRepository<T>, PersistentRepository<T> {
70  
71      /*
72       * TODO v0.5.x: try a flattened Map<Specification, PartitionRepository> (only in root partition) for very quick indexed (exact) partition retrieval
73       * TODO v0.5.x: try a Map<Entity, Set<PartitionRepository>> (only in root partition) for reflecting member changes. All members must be recursively indexed here.
74       */
75  
76      /**
77       * A synchronizer used to control concurrent and exclusive access to repository methods.
78       * E.g. all methods dealing with partitioning are run in an exclusive manner.
79       */
80      protected final Synchronizer synchronizer;
81  
82      /** A internal registry keeping track of all active iterators. */
83      protected final IteratorRegistry iteratorRegistry = new IteratorRegistry();
84  
85      protected boolean executeOperationsExclusivelyOnly;
86      protected boolean repositoryDelegateIsAlreadyPartitioned = false;
87  
88      protected Specification<? super T> superPartitionSpecification = null;
89      protected PartitionRepository<? super T> superPartitionRepository = null;
90  
91      protected Specification<T> partitionSpecification;
92      protected Repository<T> repositoryDelegate;
93  
94      /** All sub-partition repositories, indexed by a unique specification. */
95      protected final Map<Specification<? extends T>, PartitionRepository> subPartitions =
96              new ConcurrentHashMap<Specification<? extends T>, PartitionRepository>();
97  
98  
99      PartitionRepositoryInvocationHandler(final Repository<T> repositoryDelegate,
100                                          final Synchronizer synchronizer,
101                                          final Boolean executeOperationsExclusivelyOnly) {
102         Validate.notNull(repositoryDelegate, "Partition repository parameter cannot be null");
103         Validate.notNull(executeOperationsExclusivelyOnly, "Exclusive-operations-only flag parameter cannot be null");
104 
105         if (repositoryDelegate instanceof PartitionRepository) {
106             log.warn("Partitioning already partitioned repository " + repositoryDelegate.getClass().getName());
107             this.repositoryDelegateIsAlreadyPartitioned = true;
108         }
109         this.repositoryDelegate = repositoryDelegate;
110         this.synchronizer = (synchronizer == null) ? new SemaphoreSynchronizer() : synchronizer;
111         this.executeOperationsExclusivelyOnly = executeOperationsExclusivelyOnly;
112 
113         ((AbstractDomianCoreRepository) this.repositoryDelegate).setSynchronizer(this.synchronizer);
114     }
115 
116 
117     @Override
118     public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
119         try {
120             return method.invoke(this, args);
121 
122         } catch (InvocationTargetException e) {
123             throw e.getCause();
124         }
125     }
126 
127 
128     @Override
129     public Class<T> getType() {
130         return this.partitionSpecification.getType();
131     }
132 
133 
134     @Override
135     public PartitionRepository<? super T> getRootRepository() {
136         PartitionRepository<? super T> rootPartitionRepository = this;
137         while (true) {
138             try {
139                 PartitionRepository<? super T> superPartitionedRepository = rootPartitionRepository.getSuperPartitionRepository();
140                 if (superPartitionedRepository == null) {
141                     return rootPartitionRepository;
142                 } else {
143                     rootPartitionRepository = superPartitionedRepository;
144                 }
145             } catch (UnsupportedOperationException e) {
146                 return rootPartitionRepository;
147             }
148         }
149     }
150 
151 
152     public Specification<? super T> getSuperPartitionSpecification() {
153         return this.superPartitionSpecification;
154     }
155 
156 
157     @Override
158     public void setSuperPartitionSpecification(final Specification<? super T> specification) {
159         this.superPartitionSpecification = specification;
160     }
161 
162 
163     @Override
164     public PartitionRepository<? super T> getSuperPartitionRepository() {
165         return this.superPartitionRepository;
166     }
167 
168 
169     @Override
170     public void setSuperPartitionRepository(final PartitionRepository<? super T> partitionRepository) {
171         this.superPartitionRepository = partitionRepository;
172     }
173 
174 
175     @Override
176     public Specification<T> getPartitionSpecification() {
177         return this.partitionSpecification;
178     }
179 
180 
181     @Override
182     public void setPartitionSpecification(final Specification<T> partitionSpecification) {
183         this.partitionSpecification = partitionSpecification;
184     }
185 
186 
187     @Override
188     public Repository<T> getTargetRepository() {
189         return this.repositoryDelegate;
190     }
191 
192 
193     @Override
194     public Boolean isRootPartition() {
195         // Because partition repos might be the same instance, e.g. for HibernateRepository
196         return this.superPartitionSpecification == null;
197     }
198 
199 
200     @Override
201     public Boolean isLeafPartition() {
202         return this.subPartitions.isEmpty();
203     }
204 
205 
206     protected boolean repositoryOperationsNeedToBeExecutedExclusively() {
207         return this.executeOperationsExclusivelyOnly ||
208                (getPersistenceDefinition() != null && getPersistenceDefinition().isFileBasedOnly());
209     }
210 
211 
212     protected boolean repositoryExemptFromSyncronization() {
213         return this.repositoryDelegate.getClass().equals(HashSetRepository.class);
214     }
215 
216 
217     protected PartitionRepository wireUpPartition(Specification superPartitionSpecification,
218                                                   final PartitionRepository superPartitionRepository,
219                                                   final Specification partitionSpecification,
220                                                   final PartitionRepository partitionRepository) {
221         if (superPartitionSpecification == null) {
222             // Default super spec, just to make isRootPartition to work...
223             superPartitionSpecification = allObjects();
224         }
225         partitionRepository.setSuperPartitionSpecification(superPartitionSpecification);
226         partitionRepository.setSuperPartitionRepository(superPartitionRepository);
227         partitionRepository.setPartitionSpecification(partitionSpecification);
228         return partitionRepository;
229     }
230 
231 
232     protected <V> V conditionalSynchronizedExecutionOf(final Callable<V> operation) throws Exception {
233         if (repositoryExemptFromSyncronization()) {
234             return operation.call(); // No synchronization at all (local, single-threaded, fast mode)
235         }
236         if (repositoryOperationsNeedToBeExecutedExclusively()) {
237             return this.synchronizer.callExclusively(operation);
238         }
239         return this.synchronizer.callConcurrently(operation);
240     }
241 
242 
243     ///////////////////////////////////////////////////////////
244     // Repository operations                                 //
245     ///////////////////////////////////////////////////////////
246 
247     protected void handlePartitionRepositoryException(final Exception e, final String errorMessage) {
248         if (e instanceof IllegalArgumentException) {
249             throw (IllegalArgumentException) e;
250         }
251         throw new RepositoryException(this.getClass().getName() + "." + errorMessage + " failed", e);
252     }
253 
254 
255     @Override
256     public <V extends T> Iterator<V> iterateAllEntitiesSpecifiedBy(final Specification<V> specification) {
257         Validate.notNull(specification, "Specification parameter cannot be null");
258         try {
259             return conditionalSynchronizedExecutionOf(new IterateAllEntitiesSpecifiedBy<V>(specification));
260 
261         } catch (Exception e) {
262             handlePartitionRepositoryException(e, "iterateAllEntitiesSpecifiedBy(Specification)");
263             return new Iterator<V>() {
264                 @Override
265                 public boolean hasNext() {
266                     return false;
267                 }
268                 @Override
269                 public V next() {
270                     throw new NoSuchElementException();
271                 }
272                 @Override
273                 public void remove() {
274                     throw new IllegalStateException();
275                 }
276             };
277         }
278     }
279 
280 
281     /** Special count method without sharpening original specifications to fit partition specifications. */
282     protected <V extends T> Long countAllEntitiesWithoutSpecificationSharpening(final Specification<V> specification) {
283         return (long) findAllEntitiesSpecifiedBy(specification, FALSE).size();
284     }
285 
286 
287     // Hmm, must be overridden from AbstractSpecification to hit this findAllEntitiesSpecifiedBy() method... 
288     @Override
289     public <V extends T> V findSingleEntitySpecifiedBy(final Specification<V> specification) {
290         Validate.notNull(specification, "Specification parameter cannot be null");
291         final Collection<V> selection = findAllEntitiesSpecifiedBy(specification);
292         if (selection.size() > 1) {
293             throw new IllegalArgumentException("More than one entity were found when only one was expected");
294         }
295         if (selection.size() == 1) {
296             return selection.iterator().next();
297         }
298         return null;
299     }
300 
301 
302     @Override
303     public <V extends T> Collection<V> findAllEntitiesSpecifiedBy(final Specification<V> specification) {
304         return findAllEntitiesSpecifiedBy(specification, TRUE);
305     }
306 
307 
308     protected <V extends T> Collection<V> findAllEntitiesSpecifiedBy(final Specification<V> specification, final Boolean sharpenOriginalSpecificationsToFitPartitionSpecification) {
309         Validate.notNull(specification, "Specification parameter cannot be null");
310         try {
311             final Collection<V> result;
312             result = conditionalSynchronizedExecutionOf(new FindAllEntitiesSpecifiedBy<V>(specification, sharpenOriginalSpecificationsToFitPartitionSpecification));
313             /*
314             if (sharpenOriginalSpecificationsToFitPartitionSpecification) {
315                 result = conditionalSynchronizedExecutionOf(new FindAllEntitiesSpecifiedBy<V>(specification, TRUE));
316             } else {
317                 result = conditionalSynchronizedExecutionOf(new FindAllEntitiesSpecifiedBy<V>(specification, FALSE));
318             }
319             */
320             //if (result.isEmpty()) { // && !this.isRootPartition()) {
321             /* Do a flat search from the repository root partition in case of missing partitioning */
322             //    return ((PartitionRepositoryInvocationHandler) this.getRootRepository()).doFlatSearch(specification);
323             //}
324             return result;
325 
326         } catch (Exception e) {
327             handlePartitionRepositoryException(e, "findAllEntitiesSpecifiedBy(Specification)");
328             return emptySet();
329         }
330     }
331 
332 
333     /* Not recommended...
334     protected <V extends T> Collection<V> doFlatSearch(final Specification<V> specification) {
335         final Collection<V> result = new HashSet<V>();
336         final Map<Specification<? extends T>, PartitionRepository> partitions = new HashMap<Specification<? extends T>, PartitionRepository>();
337         collectAllPartitions(partitions);
338         for (final Map.Entry<Specification<? extends T>, PartitionRepository> partition : partitions.entrySet()) {
339             final Collection<V> partitionOnlyEntities = partition.getValue().getPartitionOnlyEntities();
340             for (final V entity : partitionOnlyEntities) {
341                 if (specification.isSatisfiedBy(entity)) {
342                     result.add(entity);
343                 }
344             }
345         }
346         return result;
347     }
348     */
349 
350 
351     @Override
352     public <V extends T> void put(final V entity) {
353         try {
354             conditionalSynchronizedExecutionOf(new Put<V>(entity));
355 
356         } catch (Exception e) {
357             handlePartitionRepositoryException(e, "put(Entity)");
358         }
359     }
360 
361 
362     @Override
363     public <V extends T> void update(final V entity) {
364         this.synchronizer.callExclusively(new Callable<Void>() {
365             @Override
366             public Void call() {
367                 repositoryDelegate.update(entity);
368                 repartition(entity);
369                 return null;
370             }
371         });
372     }
373 
374 
375     @Override
376     public <V extends T> Long removeAllEntitiesSpecifiedBy(final Specification<V> specification) {
377         Validate.notNull(specification, "Specification parameter cannot be null");
378         try {
379             return conditionalSynchronizedExecutionOf(new RemoveAllEntitiesSpecifiedBy<V>(specification));
380 
381         } catch (Exception e) {
382             handlePartitionRepositoryException(e, "removeAllEntitiesSpecifiedBy(Specification)");
383             return 0L;
384         }
385     }
386 
387 
388     @Override
389     public <V extends T> Boolean remove(final V entity) {
390         try {
391             return conditionalSynchronizedExecutionOf(new Remove<V>(entity));
392 
393         } catch (Exception e) {
394             handlePartitionRepositoryException(e, "remove(Entity)");
395             return FALSE;
396         }
397     }
398 
399 
400     @Override
401     public Boolean isPartitioningNatively() {
402         return this.repositoryDelegate.isPartitioningNatively();
403     }
404 
405 
406     @Override
407     public Boolean isIndexingEntitiesRecursively() {
408         return this.repositoryDelegate.isIndexingEntitiesRecursively();
409     }
410 
411 
412     ///////////////////////////////////////////////////////////
413     // Persistent repository operations                      //
414     ///////////////////////////////////////////////////////////
415 
416     /** <i>This method applies to this partition only!</i> */
417     @Override
418     public File getRepositoryDirectory() {
419         if (this.repositoryDelegate instanceof PersistentRepository) {
420             return ((PersistentRepository) this.repositoryDelegate).getRepositoryDirectory();
421         } else {
422             return null;
423         }
424     }
425 
426 
427     /** <i>This method applies to this partition only!</i> */
428     @Override
429     public String getRepositoryId() {
430         if (this.repositoryDelegate instanceof PersistentRepository) {
431             return ((PersistentRepository) this.repositoryDelegate).getRepositoryId();
432         } else {
433             return null;
434         }
435     }
436 
437 
438     /** <i>This method applies to this partition only!</i> */
439     @Override
440     public PersistenceDefinition getPersistenceDefinition() {
441         if (this.repositoryDelegate instanceof PersistentRepository) {
442             return ((PersistentRepository) this.repositoryDelegate).getPersistenceDefinition();
443         } else {
444             return null;
445         }
446     }
447 
448 
449     /** <i>This method applies to this partition only!</i> */
450     @Override
451     public String getFormat() {
452         if (this.repositoryDelegate instanceof PersistentRepository) {
453             return ((PersistentRepository) this.repositoryDelegate).getFormat();
454         } else {
455             return null;
456         }
457     }
458 
459 
460     @Override
461     public void load() {
462         if (this.repositoryDelegate instanceof PersistentRepository) {
463             ((PersistentRepository) this.repositoryDelegate).load();
464         }
465         /* No, persistent repo below non-persistent repo will not be invoked
466         for (final Repository subPartitonRepository : this.subPartitions.values()) {
467             if (subPartitonRepository instanceof PersistentRepository) {
468                 ((PersistentRepository) subPartitonRepository).load();
469             }
470         }
471         */
472         for (final Repository subPartitonRepository : getAllPartitions().values()) {
473             if (subPartitonRepository instanceof PersistentRepository) {
474                 ((PersistentRepository) subPartitonRepository).load();
475             }
476         }
477     }
478 
479 
480     @Override
481     public void persist() {
482         if (this.repositoryDelegate instanceof PersistentRepository) {
483             ((PersistentRepository) this.repositoryDelegate).persist();
484         }
485         for (final Repository subPartitonRepository : getAllPartitions().values()) {
486             if (subPartitonRepository instanceof PersistentRepository) {
487                 ((PersistentRepository) subPartitonRepository).persist();
488             }
489         }
490     }
491 
492 
493     @Override
494     public EntityPersistenceMetaData getMetaDataFor(T entity) {
495         throw new NotImplementedException();
496     }
497 
498 
499     @Override
500     public void close() {
501         if (this.repositoryDelegate instanceof PersistentRepository) {
502             ((PersistentRepository) this.repositoryDelegate).close();
503         }
504         for (final Repository subPartitonRepository : getAllPartitions().values()) {
505             if (subPartitonRepository instanceof PersistentRepository) {
506                 ((PersistentRepository) subPartitonRepository).close();
507             }
508         }
509     }
510 
511 
512     ///////////////////////////////////////////////////////////
513     // More partitioned repository operations                //
514     ///////////////////////////////////////////////////////////
515 
516     protected PartitionRepository getCorrectPartitionRepository(final PartitionRepository originalPartitionRepository,
517                                                                 final PartitionRepository toBeReusedPartitionRepository) {
518         return (originalPartitionRepository == null) ? toBeReusedPartitionRepository : originalPartitionRepository;
519     }
520 
521 
522     protected <V extends T> PartitionRepository<V> addPartition(final Specification<V> partitionSpecification,
523                                                                 final PartitionRepository<? extends V> partitionRepository) {
524         final PartitionRepository<? extends V> reusedPartitionRepository = ((AbstractDomianCoreRepository<? extends V>) this.repositoryDelegate).makePartition();
525 
526         if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository) instanceof PersistentRepository
527             && this.repositoryDelegate instanceof PersistentRepository) {
528 
529             PersistentRepository repositoryDelegate = (PersistentRepository) this.repositoryDelegate;
530             PersistentRepository partitionRepositoryDelegate = (PersistentRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository);
531             //if (!partitionRepositoryDelegate.isIndexingEntitiesRecursively()) { // -> means common repo for all partition at the moment... WHAT!? this is not accurate anymore is it!?
532             if (!(getTargetRepository() == reusedPartitionRepository.getTargetRepository())) {
533                 if (repositoryDelegate.getRepositoryId().equals(partitionRepositoryDelegate.getRepositoryId())) {
534                     throw new IllegalArgumentException("Partition repository parameter has the same repository id (name) as this repository ['" + repositoryDelegate.getRepositoryId() + "']");
535                 }
536                 /*} else {
537                 log.info("Partition repository '" + partitionRepositoryDelegate.getRepositoryId() + "' will be the same repository instance as parent partition '" + repositoryDelegate.getRepositoryId() + "'");
538                 */
539             }
540         }
541         if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().equals(HashSetRepository.class)) {
542             if (!(this.repositoryDelegate.getClass().equals(HashSetRepository.class))) {
543                 if (this.repositoryDelegate instanceof PersistentRepository) {
544                     log.warn("Adding non-concurrent partition [type=" + getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().getName() + "] " +
545                              "to overall concurrent repository [type=" + this.repositoryDelegate.getClass().getName() + ", id=" + ((PersistentRepository) this.repositoryDelegate).getRepositoryId() + "]." +
546                              " This partitioned repository will not be thread-safe!");
547                 } else {
548                     log.warn("Adding non-concurrent partition [type=" + getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().getName() + "] " +
549                              "to overall concurrent repository [type=" + this.repositoryDelegate.getClass().getName() + "]." +
550                              " This partitioned repository will not be thread-safe!");
551                 }
552             }
553         }
554         try {
555             return this.synchronizer.callExclusively(new Callable<PartitionRepository<V>>() {
556                 @Override
557                 public PartitionRepository<V> call() {
558                     StopWatch stopWatch = null;
559                     if (log.isInfoEnabled()) {
560                         stopWatch = new StopWatch().start();
561                     }
562 
563                     boolean put = false;
564                     PartitionRepository subPartitionRepository = null;
565                     PartitionRepository subPartitionRepositoryOnThisLevel = null;
566 
567                     ((AbstractDomianCoreRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository()).setSynchronizer(synchronizer);
568 
569                     /* For each of this repository's partitioned sub-repositories: */
570                     for (final Iterator<Specification<? extends T>> subPartitionSpecificationIterator = subPartitions.keySet().iterator();
571                          subPartitionSpecificationIterator.hasNext();) {
572 
573                         final Specification subPartitionSpecification = subPartitionSpecificationIterator.next();
574 
575                         /* Is the new specification equal to an existing partition? If so, replace it and migrate its entities (existing partitions are removed) */
576                         if (partitionSpecification.equals(subPartitionSpecification)) {
577                             subPartitionRepositoryOnThisLevel = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
578                                                                                 PartitionRepositoryInvocationHandler.this,
579                                                                                 partitionSpecification,
580                                                                                 getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
581 
582                             /* 1. get partition repo to be replaced */
583                             final PartitionRepository subPartitionRepositoryToBeReplaced = subPartitions.get(subPartitionSpecification);
584 
585                             /* 2. [DENIED] copy references for all its sub partitions to the new repo (No, don't do this) */
586 
587                             /* 3. copy references for all its entities to the new repo, include all subpartition entities */
588                             subPartitionRepositoryOnThisLevel.putAll(subPartitionRepositoryToBeReplaced.find(allEntities()));
589 
590                             /* 4. remove partitition repo to be replaced */
591                             subPartitions.remove(subPartitionSpecification);
592                             subPartitions.put(subPartitionSpecification, subPartitionRepositoryOnThisLevel);
593                             put = true;
594 
595                             /* Is the new specification a special case of an existing partition? */
596                         } else if (partitionSpecification.isSpecialCaseOf(subPartitionSpecification)) {
597                             subPartitionRepository = subPartitions.get(subPartitionSpecification).addPartitionFor(partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
598                             put = true;
599 
600                             /* Is the new specification a generalization of an existing partition? */
601                         } else if (partitionSpecification.isGeneralizationOf(subPartitionSpecification)) {
602                             if (subPartitionRepositoryOnThisLevel == null) {
603                                 subPartitionRepositoryOnThisLevel = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
604                                                                                     PartitionRepositoryInvocationHandler.this,
605                                                                                     partitionSpecification,
606                                                                                     getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
607                             }
608                             final PartitionRepository subPartitionRepositoryToBeRelocated = wireUpPartition(partitionSpecification,
609                                                                                                             subPartitionRepositoryOnThisLevel,
610                                                                                                             subPartitionSpecification,
611                                                                                                             subPartitions.get(subPartitionSpecification));
612                             subPartitionRepositoryOnThisLevel.addPartitionFor(subPartitionSpecification, subPartitionRepositoryToBeRelocated);
613                             for (final Iterator entityIterator = getPartitionOnlyEntities().iterator(); entityIterator.hasNext();) {
614                                 final Object entity = entityIterator.next();
615                                 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
616                                     subPartitionRepositoryOnThisLevel.put((Entity) entity);
617                                     entityIterator.remove();
618                                 }
619                             }
620                             subPartitionSpecificationIterator.remove();
621 
622                             /* ...or finally, is it a siebling? */
623                         } else {
624                             final PartitionRepository subPartitionWithPossibleIntersectingEntitiesOrSubPartitions = subPartitions.get(subPartitionSpecification);
625                             if (subPartitionRepository == null) {
626                                 subPartitionRepository = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
627                                                                          PartitionRepositoryInvocationHandler.this,
628                                                                          partitionSpecification,
629                                                                          getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
630                             }
631                             /* Check for intersecting entities */
632                             for (final Object entity : subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitionOnlyEntities()) {
633                                 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
634                                     subPartitionRepository.put((Entity) entity);
635                                 }
636                             }
637                             /* Check for intersecting sub partitions */
638                             for (final Object subSpecObject : subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitions().keySet()) {
639                                 final Specification subSpec = (Specification) subSpecObject;
640                                 if (subSpec.isSpecialCaseOf(partitionSpecification)) {
641                                     subPartitionRepository.addPartitionFor(subSpec, (PartitionRepository) subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitions().get(subSpec));
642                                 }
643                             }
644                         }
645                     }
646                     if (subPartitionRepository == null) {
647                         subPartitionRepository = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(), PartitionRepositoryInvocationHandler.this, partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
648                     }
649                     if (!put) {
650                         subPartitions.put(partitionSpecification, subPartitionRepository);
651                         //if (!PartitionRepositoryInvocationHandler.this.isIndexingEntitiesRecursively()) { // -> means common repo for all partition at the moment... EHAT!? this is not accurate anymore is it!?
652                         if (!(PartitionRepositoryInvocationHandler.this.getTargetRepository() == subPartitionRepository.getTargetRepository())) {
653                             for (final Entity entity : getPartitionOnlyEntities()) {
654                                 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
655                                     PartitionRepositoryInvocationHandler.this.removePartitionOnlyEntity((V) entity);
656                                     subPartitionRepository.put(entity);
657                                 }
658                             }
659                         }
660                         iteratorRegistry.updateIteratorsWith(partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
661 
662                     }
663                     if (log.isInfoEnabled()) {
664                         String repositoryId = "[not persistent repository]";
665                         if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository) instanceof PersistentRepository) {
666                             repositoryId = "'" + ((PersistentRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository)).getRepositoryId() + "'";
667                         }
668                         log.info("Repository partition " + repositoryId + " added in " + stopWatch.elapsedTimeToString() + " [partition specification type=" + partitionSpecification.getType().getName() + "]");
669                     }
670                     return subPartitionRepository;
671                 }
672             });
673 
674         } catch (Exception e) {
675             handlePartitionRepositoryException(e, "addPartitionFor(Specification)");
676             return new NullRepository<V>().makePartition();
677         }
678     }
679 
680 
681     @Override
682     public <V extends T> PartitionRepository<V> addPartitionFor(final Specification<V> partitionSpecification) {
683         return addPartitionFor(partitionSpecification, (String) null);
684     }
685 
686 
687     @Override
688     public <V extends T> PartitionRepository<V> addPartitionFor(final Specification<V> partitionSpecification, final String repositoryId) {
689         Validate.notNull(partitionSpecification, "Partition specification parameter cannot be null");
690         final Object newRepositoryInstance;
691         final Repository<T> targetRepository = this.getTargetRepository();
692         final Class<? extends Repository> targetRepositoryType = targetRepository.getClass();
693         try {
694             if (canCastFrom_To(targetRepositoryType, PersistentRepository.class)) {
695                 Validate.isTrue(isNotBlank(repositoryId), "Repository ID cannot be blank for persistent repositories");
696                 final Constructor repositoryConstructor = targetRepositoryType.getConstructor(String.class, Synchronizer.class);
697                 synchronized (this.synchronizer) {
698                     newRepositoryInstance = repositoryConstructor.newInstance(repositoryId, this.synchronizer);
699                 }
700             } else {
701                 newRepositoryInstance = targetRepositoryType.newInstance();
702             }
703 
704         } catch (NoSuchMethodException e1) {
705             throw new IllegalArgumentException("Repository implementation " + targetRepositoryType +
706                                                " lacks a public Constructor(java.lang.String repositoryId, net.sourceforge.domian.util.concurrent.locks.Synchronizer repositorySynchronizer)");
707         } catch (Exception e2) {
708             handlePartitionRepositoryException(e2, "addPartitionFor(Specification, String)");
709             return new NullRepository<V>().makePartition();
710         }
711         return addPartition(partitionSpecification, ((AbstractDomianCoreRepository<V>) newRepositoryInstance).makePartition());
712     }
713 
714 
715     @Override
716     public <V extends T> PartitionRepository<V> addPartitionFor(Specification<V> partitionSpecification, Repository<? super V> partitionRepository) {
717         Validate.notNull(partitionSpecification, "Partition specification parameter cannot be null");
718         Validate.notNull(partitionRepository, "Partition repository parameter cannot be null");
719         if (partitionRepository instanceof PartitionRepository) {
720             return addPartition(partitionSpecification, (PartitionRepository<V>) partitionRepository);
721         }
722         return addPartition(partitionSpecification, ((AbstractDomianCoreRepository<V>) partitionRepository).makePartition());
723     }
724 
725 
726     @Override
727     public <V extends T> Boolean repartition(final V entity) {
728         Validate.notNull(entity, "Entity parameter cannot be null");
729         try {
730             return this.synchronizer.callExclusively(new Callable<Boolean>() {
731                 @SuppressWarnings("unchecked")
732                 @Override
733                 public Boolean call() {
734                     StopWatch stopWatch = null;
735                     if (log.isDebugEnabled() && isRootPartition()) {
736                         stopWatch = new StopWatch().start();
737                     }
738                     boolean repartitioned = false;
739 
740                     if (entityExists_PossiblyInWrongPartition(entity)) {
741                         // Entity DOES reside in this partition!
742 
743                         if ((partitionSpecification == null && subPartitions.isEmpty()) || // root partition with no sub partitions
744                             (partitionSpecification != null && partitionSpecification.isSatisfiedBy(entity))) { // regular satisfyable partition
745                             // Entity SHOULD CONTINUE to reside in this partition!
746                             repartitioned = true;
747 
748                             // Flag to indicate whether the entity is moved to a specialized partition
749                             boolean specialized = false;
750 
751                             // Check if any sub-partition is a specialization of this partition, if so - move the entity there
752                             for (final Specification partitionSpecification : subPartitions.keySet()) {
753                                 if (partitionSpecification.isSatisfiedBy(entity)) {
754                                     final PartitionRepository partitionRepo = subPartitions.get(partitionSpecification);
755                                     partitionRepo.put(entity);
756                                     specialized = true;
757                                 }
758                             }
759                             if (specialized) {
760                                 repositoryDelegate.remove(entity);
761                             }
762                         } else {
763                             // Entity SHOULD NOT reside in this partition!
764                             repositoryDelegate.remove(entity);
765                             // ...with the exception if the entity resides in root partition and no subpartitions are satifyable
766                             // In that case the entity is re-added in the same partition
767                             getRootRepository().put(entity);
768                             repartitioned = true;
769                         }
770                         // To conclude - performing repartitioning in all sub-partitions...
771                         for (final PartitionRepository partitionedRepository : subPartitions.values()) {
772                             if (partitionedRepository.repartition(entity)) {
773                                 repartitioned = true;
774                             }
775                         }
776 
777                     } else {
778                         // Entity DOES NOT reside in this partition!
779 
780                         // Performing repartitioning in all sub-partitions...
781                         for (final PartitionRepository partitionedRepository : subPartitions.values()) {
782                             if (partitionedRepository.repartition(entity)) {
783                                 repartitioned = true;
784                             }
785                         }
786                         if (!repartitioned) {
787                             // Entity does NOT belong in any subpartitions - does it belong in this partition?
788                             if (partitionSpecification != null && partitionSpecification.isSatisfiedBy(entity)) {
789                                 // Entity should reside in this partition
790                                 repositoryDelegate.put(entity);
791                                 repartitioned = true;
792                             }
793                         }
794                     }
795                     if (log.isDebugEnabled() && isRootPartition()) {
796                         log.debug(DEBUG_LEVEL_INDENTATION + "Entity repartitioned in " + stopWatch.elapsedTimeToString() + " [" + entity + "]");
797                     }
798                     return repartitioned;
799                 }
800             });
801 
802         } catch (Exception e) {
803             handlePartitionRepositoryException(e, "repartition(Entity)");
804             return FALSE;
805         }
806     }
807 
808 
809     @Override
810     public void repartition() {
811         try {
812             this.synchronizer.callExclusively((Callable) new Callable<Void>() {
813                 @Override
814                 public Void call() {
815                     StopWatch stopWatch = null;
816                     if (log.isInfoEnabled()) {
817                         stopWatch = new StopWatch().start();
818                     }
819                     final Iterator<? extends T> entityIterator = iterate(allObjects());
820                     while (entityIterator.hasNext()) {
821                         final T entity = entityIterator.next();
822                         repartition(entity);
823                         if (log.isTraceEnabled()) {
824                             log.trace(TRACE_LEVEL_INDENTATION + "Entity repartitioned in " + stopWatch.lapTimeToString() + " [entity=" + entity + "]");
825                         }
826                     }
827                     if (log.isInfoEnabled()) {
828                         log.info("Repository repartitioned in " + stopWatch.elapsedTimeToString());
829                     }
830                     return null;
831                 }
832             });
833 
834         } catch (Exception e) {
835             handlePartitionRepositoryException(e, "repartition()");
836         }
837     }
838 
839 
840     protected Boolean entityExists_PossiblyInWrongPartition(final T entity) {
841         return countAllEntitiesWithoutSpecificationSharpening(createUniqueSpecificationFor(entity)) > 0;
842     }
843 
844 
845     @Override
846     public Map<Specification<? extends T>, PartitionRepository> getPartitions() {
847         return new HashMap<Specification<? extends T>, PartitionRepository>(this.subPartitions);
848     }
849 
850 
851     @Override
852     public Map<Specification<? extends T>, PartitionRepository> getAllPartitions() {
853         final Map<Specification<? extends T>, PartitionRepository> allPartitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
854         collectAllPartitions(allPartitionMap);
855         return allPartitionMap;
856     }
857 
858 
859     @Override
860     public void collectAllPartitions(final Map<Specification<? extends T>, PartitionRepository> partitionMap) {
861         for (final Map.Entry<Specification<? extends T>, PartitionRepository> subPartititon : this.subPartitions.entrySet()) {
862             partitionMap.put(subPartititon.getKey(), subPartititon.getValue());
863             subPartititon.getValue().collectAllPartitions(partitionMap);
864         }
865     }
866 
867 
868     @Override
869     public <R extends Repository> void collectAllPartitionsWithRepositorySatisfying(final Specification<R> specification,
870                                                                                     final Map<Specification<? extends T>, PartitionRepository> partitionMap) {
871         collectAllPartitions(partitionMap);
872         final Iterator<R> partitionedRepoIterator = (Iterator<R>) partitionMap.values().iterator();
873         while (partitionedRepoIterator.hasNext()) {
874             final R partitionedRepo = partitionedRepoIterator.next();
875             if (!specification.isSatisfiedBy(partitionedRepo)) {
876                 partitionedRepoIterator.remove();
877             }
878         }
879     }
880 
881 
882     @Override
883     public <V extends T> PartitionRepository findPartitionFor(final Specification<V> partitionSpecification) {
884         // TODO v0.5.x - use a map<spec, repo> residing in root-partition for direct retrieval of partitions...
885         for (final Specification subPartitionSpecification : this.subPartitions.keySet()) {
886             if (partitionSpecification.isSpecialCaseOf(subPartitionSpecification)) {
887                 return this.subPartitions.get(subPartitionSpecification).findPartitionFor(partitionSpecification);
888             }
889         }
890         return this;
891     }
892 
893 
894     @Override
895     public Set<T> getPartitionOnlyEntities() {
896         if (this.repositoryDelegateIsAlreadyPartitioned) {
897             return ((PartitionRepository) this.repositoryDelegate).getPartitionOnlyEntities();
898 
899         } else {
900             if (this.isRootPartition()) {
901                 /* No, this is wrong...
902                 if (this.isPartitioningNatively()) {
903                     return emptySet();
904                 } else { */
905                 return new HashSet<T>(this.repositoryDelegate.find((Specification<T>) allEntities()));
906                 //}
907 
908             } else {
909                 final Specification<T> allOfThisPartitionType = createSpecificationFor(this.partitionSpecification.getType());
910                 final Specification<T> partitionAwareSpecification = new PartitionAwareSpecification(allOfThisPartitionType, this.partitionSpecification);
911                 return new HashSet<T>(this.repositoryDelegate.find(partitionAwareSpecification));
912             }
913         }
914     }
915 
916     protected <V extends T> void removePartitionOnlyEntity(final V entity) {
917         if (this.repositoryDelegateIsAlreadyPartitioned) {
918             if (((PartitionRepository) this.repositoryDelegate).getPartitionOnlyEntities().contains(entity)) {
919                 this.repositoryDelegate.remove(entity);
920             }
921         } else {
922             this.repositoryDelegate.remove(entity);
923         }
924     }
925 
926 
927     ///////////////////////////////////////////////////////////
928     // Partitioned repository operations as Callable classes //
929     ///////////////////////////////////////////////////////////
930 
931     private class IterateAllEntitiesSpecifiedBy<V extends T> implements Callable<Iterator<V>> {
932         private final PartitionAwareSpecification<V> partitionAwareSpecification;
933 
934         private IterateAllEntitiesSpecifiedBy(final Specification<V> specification) {
935             this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
936         }
937 
938         @Override
939         public Iterator<V> call() throws Exception {
940             // Original specification of wanted entities is outside/disjoint with partition specification boundaries
941             if (this.partitionAwareSpecification.isOutsidePartitionBoundaries()) {
942                 return Collections.<V>emptySet().iterator();
943             }
944             final PartitionRepositoryIterator<T, V> iterator = new PartitionRepositoryIterator<T, V>(this.partitionAwareSpecification, PartitionRepositoryInvocationHandler.this);
945             iteratorRegistry.addIterator(iterator);
946             return iterator;
947         }
948     }
949 
950 
951     /*
952     private class FindAllEntitiesWithoutSpecificationSharpening<V extends T> implements Callable<Collection<V>> {
953        private final Specification<V> specification;
954 
955        private FindAllEntitiesWithoutSpecificationSharpening(final Specification<V> specification) {
956           this.specification = specification;
957        }
958 
959        public Collection<V> call() throws Exception {
960           final PartitionRepository<V> partition = findPartition(specification);
961           if (partition.isLeafPartition() && specification.equals(partition.getPartitionSpecification())) {
962               final Collection<V> subSelection = new HashSet<V>();
963               for (final Object entity : partition.getPartitionOnlyEntities()) {
964                   if (typeSafeIsSatisfiedBy(specification, entity)) {
965                       subSelection.add(specification.getType().cast(entity));
966                   }
967               }
968               return subSelection;
969 
970           } else {
971               final Collection<V> selection = repositoryDelegate.findAllEntitiesSpecifiedBy(specification);
972               for (final PartitionRepository partitionRepository : getPartitions().values()) {
973                   final Collection<V> subSelection = partitionRepository.findAllEntitiesSpecifiedBy(specification);
974                   selection.addAll(subSelection);
975               }
976               return selection;
977           }
978        }
979     }
980     */
981 
982 
983     private class FindAllEntitiesSpecifiedBy<V extends T> implements Callable<Collection<V>> {
984         private Boolean modifyOriginalSpecificationsToFitPartitionSpecification;
985         private Specification<V> partitionAwareSpecification;
986 
987         private FindAllEntitiesSpecifiedBy(final Specification<V> specification) {
988             this(specification, TRUE);
989         }
990 
991         private FindAllEntitiesSpecifiedBy(final Specification<V> specification, final Boolean modifyOriginalSpecificationsToFitPartitionSpecification) {
992             this.modifyOriginalSpecificationsToFitPartitionSpecification = modifyOriginalSpecificationsToFitPartitionSpecification;
993             if (this.modifyOriginalSpecificationsToFitPartitionSpecification) {
994                 this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
995             } else {
996                 this.partitionAwareSpecification = specification;
997             }
998         }
999 
1000         @Override
1001         public Collection<V> call() throws Exception {
1002             // Original specification of wanted entities is outside/disjoint with partition specification boundaries
1003             if (this.modifyOriginalSpecificationsToFitPartitionSpecification &&
1004                 ((PartitionAwareSpecification) this.partitionAwareSpecification).isOutsidePartitionBoundaries()) {
1005                 return Collections.emptySet();
1006             }
1007             /*
1008             TODO: investigate this option... take more advantage of the partition-aware spec?
1009             final PartitionRepository<V> partition = findPartitionFor(this.partitionAwareSpecification);
1010             if (partition.isLeafPartition() && this.partitionAwareSpecification.equals(partition.getPartitionSpecification())) {
1011                 // Damn, need to test for satisfaction due to possible lack of repartitioning
1012                 final Collection<V> subSelection = new HashSet<V>();
1013                 for (final V entity : partition.getPartitionOnlyEntities()) {
1014                     if (this.partitionAwareSpecification.isSatisfiedBy(entity)) {
1015                         subSelection.add(this.partitionAwareSpecification.getType().cast(entity));
1016                     }
1017                 }
1018                 return subSelection;
1019 
1020             } else {*/
1021             final Collection<V> selection = repositoryDelegate.findAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1022             for (final PartitionRepository partitionRepository : getPartitions().values()) {
1023                 final Collection<V> subSelection = partitionRepository.findAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1024                 selection.addAll(subSelection);
1025             }
1026             return selection;
1027             //}
1028         }
1029     }
1030 
1031 
1032     private class Put<V extends T> implements Callable<Void> {
1033         private final V entity;
1034 
1035         private Put(final V entity) {
1036             this.entity = entity;
1037         }
1038 
1039         @Override
1040         public Void call() throws Exception {
1041             boolean put = false;
1042             for (final Specification<? extends T> partitionSpecification : subPartitions.keySet()) {
1043                 if (typeSafeIsSatisfiedBy(partitionSpecification, this.entity)) {
1044                     subPartitions.get(partitionSpecification).put(this.entity);
1045                     put = true;
1046                 }
1047             }
1048             if (!put) {
1049                 if (partitionSpecification == null || partitionSpecification.isSatisfiedBy(this.entity)) {
1050                     repositoryDelegate.put(this.entity);
1051                 } else {
1052                     if (isRootPartition()) {
1053                         throw new IllegalArgumentException("No suitable partition exists for " + this.entity);
1054                     }
1055                     log.warn("Not finding any suitable partition to put entity at this partition sub-tree, trying upper branches");
1056                     getRootRepository().put(this.entity);
1057                 }
1058             }
1059             return null;
1060         }
1061     }
1062 
1063 
1064     private class RemoveAllEntitiesSpecifiedBy<V extends T> implements Callable<Long> {
1065         private final PartitionAwareSpecification<V> partitionAwareSpecification;
1066 
1067         private RemoveAllEntitiesSpecifiedBy(final Specification<V> specification) {
1068             this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
1069         }
1070 
1071         @Override
1072         public Long call() throws Exception {
1073             // Original specification of wanted entities is outside/disjoint with partition specification boundaries
1074             if (this.partitionAwareSpecification.isOutsidePartitionBoundaries()) {
1075                 return 0L;
1076             }
1077             long numberOfRemovedEntities = 0;
1078             final PartitionRepository partitionRepository = findPartitionFor(this.partitionAwareSpecification);
1079             final Collection allPartitionEntities = partitionRepository.getPartitionOnlyEntities();
1080             for (final Object entity : allPartitionEntities) {
1081                 if (typeSafeIsSatisfiedBy(this.partitionAwareSpecification, entity)) {
1082                     partitionRepository.remove((Entity) entity);
1083                     ++numberOfRemovedEntities;
1084                 }
1085             }
1086             for (final Object subPartitionRepositoryObject : partitionRepository.getPartitions().values()) {
1087                 final PartitionRepository subPartitionRepository = (PartitionRepository) subPartitionRepositoryObject;
1088                 numberOfRemovedEntities += subPartitionRepository.removeAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1089             }
1090             return numberOfRemovedEntities;
1091         }
1092     }
1093 
1094 
1095     private class Remove<V extends T> implements Callable<Boolean> {
1096         private final V entity;
1097 
1098         private Remove(final V entity) {
1099             this.entity = entity;
1100         }
1101 
1102         @Override
1103         public Boolean call() throws Exception {
1104             boolean removed = false;
1105             for (final Specification partitionSpecification : subPartitions.keySet()) {
1106                 if (typeSafeIsSatisfiedBy(partitionSpecification, this.entity)) {
1107                     subPartitions.get(partitionSpecification).remove(this.entity);
1108                     removed = true;
1109                     break;
1110                 }
1111             }
1112             if (!removed) {
1113                 removed = repositoryDelegate.remove(this.entity);
1114             }
1115             return removed;
1116         }
1117     }
1118 
1119 
1120     ///////////////////////////////////////////////////////////
1121     // Other inner classes                                   //
1122     ///////////////////////////////////////////////////////////
1123 
1124     /** Inner iterator class for {@link net.sourceforge.domian.repository.PartitionRepository}. */
1125     private static class PartitionRepositoryIterator<T extends Entity, V extends T> extends InMemoryRepository.InMemoryRepositoryIterator<V, T> {
1126 
1127         private PartitionRepository<T> currentRepositoryPartition;
1128 
1129         private final Map<Specification<? extends T>, PartitionRepository> partitionMap;
1130         private Iterator<PartitionRepository> partitionedRepoIterator;
1131 
1132         /** Duplicate detection of iterated entities. */
1133         private Set<Serializable> iteratedEntityRegistry = new HashSet<Serializable>();
1134 
1135         /** Extend the flattended partition list with one more, originating from a <code>PartitionRepository.addPartition()</code> invocation. */
1136         private void addPartition(final PartitionRepository partition) {
1137             this.partitionMap.put(partition.getPartitionSpecification(), partition);
1138             this.partitionedRepoIterator = this.partitionMap.values().iterator();
1139         }
1140 
1141         private PartitionRepositoryIterator(final Specification<V> specification, final PartitionRepository<T> repo) {
1142             this.specification = specification;
1143             this.currentRepositoryPartition = repo.findPartitionFor(specification);
1144             if (this.currentRepositoryPartition == null) {
1145                 this.currentRepositoryPartition = repo;
1146                 this.partitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
1147                 this.currentRepositoryPartition.collectAllPartitions(this.partitionMap);
1148                 this.partitionedRepoIterator = this.partitionMap.values().iterator();
1149             } else {
1150                 this.partitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
1151                 this.currentRepositoryPartition.collectAllPartitions(this.partitionMap);
1152                 this.partitionedRepoIterator = this.partitionMap.values().iterator();
1153                 this.entityIterator = this.currentRepositoryPartition.getPartitionOnlyEntities().iterator();
1154             }
1155             findNextEntity();
1156         }
1157 
1158         @Override
1159         protected void findNextEntity() {
1160             this.nextEntity = null;
1161             while (this.nextEntity == null && this.entityIterator != null && this.entityIterator.hasNext()) {
1162                 final Object entity = this.entityIterator.next();
1163                 if (typeSafeIsSatisfiedBy(this.specification, entity)) {
1164                     final V entityOfCorrectType = this.specification.getType().cast(entity);
1165                     if (this.specification.isSatisfiedBy(entityOfCorrectType)) {
1166                         if (!this.iteratedEntityRegistry.contains(entityOfCorrectType.getEntityId())) {
1167                             this.nextEntity = entityOfCorrectType;
1168                             this.iteratedEntityRegistry.add(this.nextEntity.getEntityId());
1169                         }
1170                     }
1171                 }
1172             }
1173             if (this.nextEntity == null && (this.entityIterator == null || !this.entityIterator.hasNext())) {
1174                 /* Move to the next partition */
1175                 while (this.nextEntity == null && this.partitionedRepoIterator.hasNext()) {
1176                     this.currentRepositoryPartition = this.partitionedRepoIterator.next();
1177                     this.entityIterator = this.currentRepositoryPartition.getPartitionOnlyEntities().iterator();
1178                     while (this.nextEntity == null && this.entityIterator.hasNext()) {
1179                         final Object entity = this.entityIterator.next();
1180                         if (typeSafeIsSatisfiedBy(this.specification, entity)) {
1181                             final V entityOfCorrectType = this.specification.getType().cast(entity);
1182                             if (this.specification.isSatisfiedBy(entityOfCorrectType)) {
1183                                 if (!this.iteratedEntityRegistry.contains(entityOfCorrectType.getEntityId())) {
1184                                     this.nextEntity = entityOfCorrectType;
1185                                     this.iteratedEntityRegistry.add(this.nextEntity.getEntityId());
1186                                     break;
1187                                 }
1188                             }
1189                         }
1190                     }
1191                 }
1192             }
1193         }
1194 
1195         @SuppressWarnings("unchecked")
1196         @Override
1197         public void remove() {
1198             if (!this.nextIsInvoked || this.removeIsInvoked) {
1199                 throw new IllegalStateException();
1200             }
1201             /* @SuppressWarnings("unchecked") -> "I solemnly swear that this statement will not throw any ClassCastException!" -eirik torske- */
1202             /* V is a subtype of T */
1203             this.currentRepositoryPartition.remove((T) this.currentEntity);
1204             this.removeIsInvoked = true;
1205         }
1206 
1207     }
1208 
1209 
1210     /** Class for keeping track of all repository iterators. */
1211     private static class IteratorRegistry {
1212 
1213         private final HashSet<WeakReference<PartitionRepositoryIterator>> activeIteratorSet =
1214                 new HashSet<WeakReference<PartitionRepositoryIterator>>();
1215 
1216         void addIterator(final PartitionRepositoryIterator iterator) {
1217             this.activeIteratorSet.add(new WeakReference<PartitionRepositoryIterator>(iterator));
1218         }
1219 
1220         void updateIteratorsWith(final Specification partitionSpecification, final PartitionRepository subPartition) {
1221             for (final WeakReference<PartitionRepositoryIterator> activeIteratorReference : this.activeIteratorSet) {
1222                 final PartitionRepositoryIterator activeIterator = activeIteratorReference.get();
1223                 if (activeIterator == null) {
1224                     /* Remove unused iterators */
1225                     this.activeIteratorSet.remove(activeIteratorReference);
1226                 } else {
1227                     final Specification currentPartitionRepoSpecn = activeIterator.currentRepositoryPartition.getPartitionSpecification();
1228                     if (currentPartitionRepoSpecn != null && currentPartitionRepoSpecn.isGeneralizationOf(partitionSpecification)) {
1229                         activeIterator.addPartition(subPartition);
1230                     }
1231                 }
1232             }
1233         }
1234     }
1235 
1236 
1237     class PartitionAwareSpecification<V extends T> implements Specification<V> {
1238         private boolean specificationIsDisjoint = false;
1239         //private boolean specificationIsDisjointOrIntersecting = false;
1240         //private boolean specificationWithinPartitionBoundaries = false;
1241         Specification<V> customSpecification;
1242         Specification<V> partitionAwareSpecification;
1243 
1244         PartitionAwareSpecification(final Specification<V> customSpecification, final Specification<T> partitionSpecification) {
1245             this.customSpecification = customSpecification;
1246 
1247             if (partitionSpecification == null) {
1248                 this.partitionAwareSpecification = this.customSpecification;
1249 
1250             } else if (customSpecification.equals(partitionSpecification)) {
1251                 this.partitionAwareSpecification = this.customSpecification;
1252 
1253             } else if (customSpecification.isGeneralizationOf((Specification<? extends V>) partitionSpecification)) {
1254                 // No, trouble with specification typing
1255                 //this.partitionAwareSpecification = (Specification<V>) SpecificationFactory.createAlwaysTrueSpecification();
1256 
1257                 // What's the point really - well, for repos with common instance for all partitions
1258                 this.partitionAwareSpecification = (Specification<V>) createSpecificationFor(partitionSpecification.getType());
1259 
1260                 // Too yielding/"ettergivende"
1261                 //this.partitionAwareSpecification = this.customSpecification;
1262 
1263                 //this.partitionAwareSpecification = (Specification<V>) partitionSpecification;
1264 
1265             } else if (customSpecification.isSpecialCaseOf(partitionSpecification)) {
1266                 this.partitionAwareSpecification = this.customSpecification;
1267 
1268             } else if (customSpecification.isDisjointWith(partitionSpecification)) {
1269                 // Too rigid!
1270                 //this.partitionAwareSpecification = SpecificationFactory.createAlwaysFalseSpecification();
1271 
1272                 this.partitionAwareSpecification = this.customSpecification;
1273                 this.specificationIsDisjoint = true;
1274 
1275             } else { //if (customSpecification.isIntersectionOf(partitionSpecification))
1276 
1277                 // No, awaiting introducing conjunction here - is it really necessary/desirable?
1278                 //this.partitionAwareSpecification = customSpecification.and(partitionSpecification);
1279 
1280                 this.partitionAwareSpecification = this.customSpecification;
1281             }
1282         }
1283 
1284         @Override
1285         public Class<V> getType() {
1286             return this.partitionAwareSpecification.getType();
1287         }
1288 
1289         @Override
1290         public Boolean isSatisfiedBy(final V candidate) {
1291             return this.partitionAwareSpecification.isSatisfiedBy(candidate);
1292         }
1293 
1294         @Override
1295         public Boolean isGeneralizationOf(final Specification<? extends V> specification) {
1296             return this.partitionAwareSpecification.isGeneralizationOf(specification);
1297         }
1298 
1299         @Override
1300         public Boolean isSpecialCaseOf(final Specification<? super V> specification) {
1301             return this.partitionAwareSpecification.isSpecialCaseOf(specification);
1302         }
1303 
1304         @Override
1305         public CompositeSpecification<V> and(final Specification<? super V> otherSpecification) {
1306             return this.partitionAwareSpecification.and(otherSpecification);
1307         }
1308 
1309         @Override
1310         public CompositeSpecification<V> or(final Specification<? super V> otherSpecification) {
1311             return this.partitionAwareSpecification.or(otherSpecification);
1312         }
1313 
1314         @Override
1315         public <F> CompositeSpecification<V> where(final String accessibleObjectName, final Specification<F> accessibleObjectSpecification) {
1316             throw new NotImplementedException();
1317         }
1318 
1319         @Override
1320         public Boolean isDisjointWith(final Specification otherSpecification) {
1321             return this.partitionAwareSpecification.isDisjointWith(otherSpecification);
1322         }
1323 
1324         private boolean isWithinPartitionBoundaries() {
1325             return !isOutsidePartitionBoundaries();
1326         }
1327 
1328         private boolean isOutsidePartitionBoundaries() {
1329             return this.specificationIsDisjoint;
1330         }
1331 
1332         @Override
1333         public int hashCode() {
1334             return this.partitionAwareSpecification.hashCode();
1335         }
1336 
1337         @Override
1338         @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
1339         public boolean equals(final Object otherObject) {
1340             return this.partitionAwareSpecification.equals(otherObject);
1341         }
1342     }
1343 }