1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sourceforge.domian.repository;
17
18
19 import java.io.File;
20 import java.io.Serializable;
21 import java.lang.ref.WeakReference;
22 import java.lang.reflect.Constructor;
23 import java.lang.reflect.InvocationHandler;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.NoSuchElementException;
33 import java.util.Set;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36
37 import org.apache.commons.lang.NotImplementedException;
38 import org.apache.commons.lang.Validate;
39
40 import net.sourceforge.domian.entity.Entity;
41 import net.sourceforge.domian.specification.CompositeSpecification;
42 import net.sourceforge.domian.specification.Specification;
43 import net.sourceforge.domian.util.StopWatch;
44 import net.sourceforge.domian.util.concurrent.locks.SemaphoreSynchronizer;
45 import net.sourceforge.domian.util.concurrent.locks.Synchronizer;
46
47 import static java.lang.Boolean.FALSE;
48 import static java.lang.Boolean.TRUE;
49 import static java.util.Collections.emptySet;
50 import static net.sourceforge.domian.specification.SpecificationFactory.allEntities;
51 import static net.sourceforge.domian.specification.SpecificationFactory.allObjects;
52 import static net.sourceforge.domian.specification.SpecificationFactory.createSpecificationFor;
53 import static net.sourceforge.domian.specification.SpecificationUtils.typeSafeIsSatisfiedBy;
54 import static net.sourceforge.domian.util.InstrumentationUtils.DEBUG_LEVEL_INDENTATION;
55 import static net.sourceforge.domian.util.InstrumentationUtils.TRACE_LEVEL_INDENTATION;
56 import static net.sourceforge.domian.util.ReflectionUtils.canCastFrom_To;
57 import static org.apache.commons.lang.StringUtils.isNotBlank;
58
59
60
61
62
63
64
65
66
67 public class PartitionRepositoryInvocationHandler<T extends Entity>
68 extends AbstractDomianCoreRepository<T>
69 implements InvocationHandler, PartitionRepository<T>, PersistentRepository<T> {
70
71
72
73
74
75
76
77
78
79
80 protected final Synchronizer synchronizer;
81
82
83 protected final IteratorRegistry iteratorRegistry = new IteratorRegistry();
84
85 protected boolean executeOperationsExclusivelyOnly;
86 protected boolean repositoryDelegateIsAlreadyPartitioned = false;
87
88 protected Specification<? super T> superPartitionSpecification = null;
89 protected PartitionRepository<? super T> superPartitionRepository = null;
90
91 protected Specification<T> partitionSpecification;
92 protected Repository<T> repositoryDelegate;
93
94
95 protected final Map<Specification<? extends T>, PartitionRepository> subPartitions =
96 new ConcurrentHashMap<Specification<? extends T>, PartitionRepository>();
97
98
99 PartitionRepositoryInvocationHandler(final Repository<T> repositoryDelegate,
100 final Synchronizer synchronizer,
101 final Boolean executeOperationsExclusivelyOnly) {
102 Validate.notNull(repositoryDelegate, "Partition repository parameter cannot be null");
103 Validate.notNull(executeOperationsExclusivelyOnly, "Exclusive-operations-only flag parameter cannot be null");
104
105 if (repositoryDelegate instanceof PartitionRepository) {
106 log.warn("Partitioning already partitioned repository " + repositoryDelegate.getClass().getName());
107 this.repositoryDelegateIsAlreadyPartitioned = true;
108 }
109 this.repositoryDelegate = repositoryDelegate;
110 this.synchronizer = (synchronizer == null) ? new SemaphoreSynchronizer() : synchronizer;
111 this.executeOperationsExclusivelyOnly = executeOperationsExclusivelyOnly;
112
113 ((AbstractDomianCoreRepository) this.repositoryDelegate).setSynchronizer(this.synchronizer);
114 }
115
116
117 @Override
118 public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
119 try {
120 return method.invoke(this, args);
121
122 } catch (InvocationTargetException e) {
123 throw e.getCause();
124 }
125 }
126
127
128 @Override
129 public Class<T> getType() {
130 return this.partitionSpecification.getType();
131 }
132
133
134 @Override
135 public PartitionRepository<? super T> getRootRepository() {
136 PartitionRepository<? super T> rootPartitionRepository = this;
137 while (true) {
138 try {
139 PartitionRepository<? super T> superPartitionedRepository = rootPartitionRepository.getSuperPartitionRepository();
140 if (superPartitionedRepository == null) {
141 return rootPartitionRepository;
142 } else {
143 rootPartitionRepository = superPartitionedRepository;
144 }
145 } catch (UnsupportedOperationException e) {
146 return rootPartitionRepository;
147 }
148 }
149 }
150
151
152 public Specification<? super T> getSuperPartitionSpecification() {
153 return this.superPartitionSpecification;
154 }
155
156
157 @Override
158 public void setSuperPartitionSpecification(final Specification<? super T> specification) {
159 this.superPartitionSpecification = specification;
160 }
161
162
163 @Override
164 public PartitionRepository<? super T> getSuperPartitionRepository() {
165 return this.superPartitionRepository;
166 }
167
168
169 @Override
170 public void setSuperPartitionRepository(final PartitionRepository<? super T> partitionRepository) {
171 this.superPartitionRepository = partitionRepository;
172 }
173
174
175 @Override
176 public Specification<T> getPartitionSpecification() {
177 return this.partitionSpecification;
178 }
179
180
181 @Override
182 public void setPartitionSpecification(final Specification<T> partitionSpecification) {
183 this.partitionSpecification = partitionSpecification;
184 }
185
186
187 @Override
188 public Repository<T> getTargetRepository() {
189 return this.repositoryDelegate;
190 }
191
192
193 @Override
194 public Boolean isRootPartition() {
195
196 return this.superPartitionSpecification == null;
197 }
198
199
200 @Override
201 public Boolean isLeafPartition() {
202 return this.subPartitions.isEmpty();
203 }
204
205
206 protected boolean repositoryOperationsNeedToBeExecutedExclusively() {
207 return this.executeOperationsExclusivelyOnly ||
208 (getPersistenceDefinition() != null && getPersistenceDefinition().isFileBasedOnly());
209 }
210
211
212 protected boolean repositoryExemptFromSyncronization() {
213 return this.repositoryDelegate.getClass().equals(HashSetRepository.class);
214 }
215
216
217 protected PartitionRepository wireUpPartition(Specification superPartitionSpecification,
218 final PartitionRepository superPartitionRepository,
219 final Specification partitionSpecification,
220 final PartitionRepository partitionRepository) {
221 if (superPartitionSpecification == null) {
222
223 superPartitionSpecification = allObjects();
224 }
225 partitionRepository.setSuperPartitionSpecification(superPartitionSpecification);
226 partitionRepository.setSuperPartitionRepository(superPartitionRepository);
227 partitionRepository.setPartitionSpecification(partitionSpecification);
228 return partitionRepository;
229 }
230
231
232 protected <V> V conditionalSynchronizedExecutionOf(final Callable<V> operation) throws Exception {
233 if (repositoryExemptFromSyncronization()) {
234 return operation.call();
235 }
236 if (repositoryOperationsNeedToBeExecutedExclusively()) {
237 return this.synchronizer.callExclusively(operation);
238 }
239 return this.synchronizer.callConcurrently(operation);
240 }
241
242
243
244
245
246
247 protected void handlePartitionRepositoryException(final Exception e, final String errorMessage) {
248 if (e instanceof IllegalArgumentException) {
249 throw (IllegalArgumentException) e;
250 }
251 throw new RepositoryException(this.getClass().getName() + "." + errorMessage + " failed", e);
252 }
253
254
255 @Override
256 public <V extends T> Iterator<V> iterateAllEntitiesSpecifiedBy(final Specification<V> specification) {
257 Validate.notNull(specification, "Specification parameter cannot be null");
258 try {
259 return conditionalSynchronizedExecutionOf(new IterateAllEntitiesSpecifiedBy<V>(specification));
260
261 } catch (Exception e) {
262 handlePartitionRepositoryException(e, "iterateAllEntitiesSpecifiedBy(Specification)");
263 return new Iterator<V>() {
264 @Override
265 public boolean hasNext() {
266 return false;
267 }
268 @Override
269 public V next() {
270 throw new NoSuchElementException();
271 }
272 @Override
273 public void remove() {
274 throw new IllegalStateException();
275 }
276 };
277 }
278 }
279
280
281
282 protected <V extends T> Long countAllEntitiesWithoutSpecificationSharpening(final Specification<V> specification) {
283 return (long) findAllEntitiesSpecifiedBy(specification, FALSE).size();
284 }
285
286
287
288 @Override
289 public <V extends T> V findSingleEntitySpecifiedBy(final Specification<V> specification) {
290 Validate.notNull(specification, "Specification parameter cannot be null");
291 final Collection<V> selection = findAllEntitiesSpecifiedBy(specification);
292 if (selection.size() > 1) {
293 throw new IllegalArgumentException("More than one entity were found when only one was expected");
294 }
295 if (selection.size() == 1) {
296 return selection.iterator().next();
297 }
298 return null;
299 }
300
301
302 @Override
303 public <V extends T> Collection<V> findAllEntitiesSpecifiedBy(final Specification<V> specification) {
304 return findAllEntitiesSpecifiedBy(specification, TRUE);
305 }
306
307
308 protected <V extends T> Collection<V> findAllEntitiesSpecifiedBy(final Specification<V> specification, final Boolean sharpenOriginalSpecificationsToFitPartitionSpecification) {
309 Validate.notNull(specification, "Specification parameter cannot be null");
310 try {
311 final Collection<V> result;
312 result = conditionalSynchronizedExecutionOf(new FindAllEntitiesSpecifiedBy<V>(specification, sharpenOriginalSpecificationsToFitPartitionSpecification));
313
314
315
316
317
318
319
320
321
322
323
324 return result;
325
326 } catch (Exception e) {
327 handlePartitionRepositoryException(e, "findAllEntitiesSpecifiedBy(Specification)");
328 return emptySet();
329 }
330 }
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351 @Override
352 public <V extends T> void put(final V entity) {
353 try {
354 conditionalSynchronizedExecutionOf(new Put<V>(entity));
355
356 } catch (Exception e) {
357 handlePartitionRepositoryException(e, "put(Entity)");
358 }
359 }
360
361
362 @Override
363 public <V extends T> void update(final V entity) {
364 this.synchronizer.callExclusively(new Callable<Void>() {
365 @Override
366 public Void call() {
367 repositoryDelegate.update(entity);
368 repartition(entity);
369 return null;
370 }
371 });
372 }
373
374
375 @Override
376 public <V extends T> Long removeAllEntitiesSpecifiedBy(final Specification<V> specification) {
377 Validate.notNull(specification, "Specification parameter cannot be null");
378 try {
379 return conditionalSynchronizedExecutionOf(new RemoveAllEntitiesSpecifiedBy<V>(specification));
380
381 } catch (Exception e) {
382 handlePartitionRepositoryException(e, "removeAllEntitiesSpecifiedBy(Specification)");
383 return 0L;
384 }
385 }
386
387
388 @Override
389 public <V extends T> Boolean remove(final V entity) {
390 try {
391 return conditionalSynchronizedExecutionOf(new Remove<V>(entity));
392
393 } catch (Exception e) {
394 handlePartitionRepositoryException(e, "remove(Entity)");
395 return FALSE;
396 }
397 }
398
399
400 @Override
401 public Boolean isPartitioningNatively() {
402 return this.repositoryDelegate.isPartitioningNatively();
403 }
404
405
406 @Override
407 public Boolean isIndexingEntitiesRecursively() {
408 return this.repositoryDelegate.isIndexingEntitiesRecursively();
409 }
410
411
412
413
414
415
416
417 @Override
418 public File getRepositoryDirectory() {
419 if (this.repositoryDelegate instanceof PersistentRepository) {
420 return ((PersistentRepository) this.repositoryDelegate).getRepositoryDirectory();
421 } else {
422 return null;
423 }
424 }
425
426
427
428 @Override
429 public String getRepositoryId() {
430 if (this.repositoryDelegate instanceof PersistentRepository) {
431 return ((PersistentRepository) this.repositoryDelegate).getRepositoryId();
432 } else {
433 return null;
434 }
435 }
436
437
438
439 @Override
440 public PersistenceDefinition getPersistenceDefinition() {
441 if (this.repositoryDelegate instanceof PersistentRepository) {
442 return ((PersistentRepository) this.repositoryDelegate).getPersistenceDefinition();
443 } else {
444 return null;
445 }
446 }
447
448
449
450 @Override
451 public String getFormat() {
452 if (this.repositoryDelegate instanceof PersistentRepository) {
453 return ((PersistentRepository) this.repositoryDelegate).getFormat();
454 } else {
455 return null;
456 }
457 }
458
459
460 @Override
461 public void load() {
462 if (this.repositoryDelegate instanceof PersistentRepository) {
463 ((PersistentRepository) this.repositoryDelegate).load();
464 }
465
466
467
468
469
470
471
472 for (final Repository subPartitonRepository : getAllPartitions().values()) {
473 if (subPartitonRepository instanceof PersistentRepository) {
474 ((PersistentRepository) subPartitonRepository).load();
475 }
476 }
477 }
478
479
480 @Override
481 public void persist() {
482 if (this.repositoryDelegate instanceof PersistentRepository) {
483 ((PersistentRepository) this.repositoryDelegate).persist();
484 }
485 for (final Repository subPartitonRepository : getAllPartitions().values()) {
486 if (subPartitonRepository instanceof PersistentRepository) {
487 ((PersistentRepository) subPartitonRepository).persist();
488 }
489 }
490 }
491
492
493 @Override
494 public EntityPersistenceMetaData getMetaDataFor(T entity) {
495 throw new NotImplementedException();
496 }
497
498
499 @Override
500 public void close() {
501 if (this.repositoryDelegate instanceof PersistentRepository) {
502 ((PersistentRepository) this.repositoryDelegate).close();
503 }
504 for (final Repository subPartitonRepository : getAllPartitions().values()) {
505 if (subPartitonRepository instanceof PersistentRepository) {
506 ((PersistentRepository) subPartitonRepository).close();
507 }
508 }
509 }
510
511
512
513
514
515
516 protected PartitionRepository getCorrectPartitionRepository(final PartitionRepository originalPartitionRepository,
517 final PartitionRepository toBeReusedPartitionRepository) {
518 return (originalPartitionRepository == null) ? toBeReusedPartitionRepository : originalPartitionRepository;
519 }
520
521
522 protected <V extends T> PartitionRepository<V> addPartition(final Specification<V> partitionSpecification,
523 final PartitionRepository<? extends V> partitionRepository) {
524 final PartitionRepository<? extends V> reusedPartitionRepository = ((AbstractDomianCoreRepository<? extends V>) this.repositoryDelegate).makePartition();
525
526 if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository) instanceof PersistentRepository
527 && this.repositoryDelegate instanceof PersistentRepository) {
528
529 PersistentRepository repositoryDelegate = (PersistentRepository) this.repositoryDelegate;
530 PersistentRepository partitionRepositoryDelegate = (PersistentRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository);
531
532 if (!(getTargetRepository() == reusedPartitionRepository.getTargetRepository())) {
533 if (repositoryDelegate.getRepositoryId().equals(partitionRepositoryDelegate.getRepositoryId())) {
534 throw new IllegalArgumentException("Partition repository parameter has the same repository id (name) as this repository ['" + repositoryDelegate.getRepositoryId() + "']");
535 }
536
537
538
539 }
540 }
541 if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().equals(HashSetRepository.class)) {
542 if (!(this.repositoryDelegate.getClass().equals(HashSetRepository.class))) {
543 if (this.repositoryDelegate instanceof PersistentRepository) {
544 log.warn("Adding non-concurrent partition [type=" + getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().getName() + "] " +
545 "to overall concurrent repository [type=" + this.repositoryDelegate.getClass().getName() + ", id=" + ((PersistentRepository) this.repositoryDelegate).getRepositoryId() + "]." +
546 " This partitioned repository will not be thread-safe!");
547 } else {
548 log.warn("Adding non-concurrent partition [type=" + getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository().getClass().getName() + "] " +
549 "to overall concurrent repository [type=" + this.repositoryDelegate.getClass().getName() + "]." +
550 " This partitioned repository will not be thread-safe!");
551 }
552 }
553 }
554 try {
555 return this.synchronizer.callExclusively(new Callable<PartitionRepository<V>>() {
556 @Override
557 public PartitionRepository<V> call() {
558 StopWatch stopWatch = null;
559 if (log.isInfoEnabled()) {
560 stopWatch = new StopWatch().start();
561 }
562
563 boolean put = false;
564 PartitionRepository subPartitionRepository = null;
565 PartitionRepository subPartitionRepositoryOnThisLevel = null;
566
567 ((AbstractDomianCoreRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository).getTargetRepository()).setSynchronizer(synchronizer);
568
569
570 for (final Iterator<Specification<? extends T>> subPartitionSpecificationIterator = subPartitions.keySet().iterator();
571 subPartitionSpecificationIterator.hasNext();) {
572
573 final Specification subPartitionSpecification = subPartitionSpecificationIterator.next();
574
575
576 if (partitionSpecification.equals(subPartitionSpecification)) {
577 subPartitionRepositoryOnThisLevel = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
578 PartitionRepositoryInvocationHandler.this,
579 partitionSpecification,
580 getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
581
582
583 final PartitionRepository subPartitionRepositoryToBeReplaced = subPartitions.get(subPartitionSpecification);
584
585
586
587
588 subPartitionRepositoryOnThisLevel.putAll(subPartitionRepositoryToBeReplaced.find(allEntities()));
589
590
591 subPartitions.remove(subPartitionSpecification);
592 subPartitions.put(subPartitionSpecification, subPartitionRepositoryOnThisLevel);
593 put = true;
594
595
596 } else if (partitionSpecification.isSpecialCaseOf(subPartitionSpecification)) {
597 subPartitionRepository = subPartitions.get(subPartitionSpecification).addPartitionFor(partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
598 put = true;
599
600
601 } else if (partitionSpecification.isGeneralizationOf(subPartitionSpecification)) {
602 if (subPartitionRepositoryOnThisLevel == null) {
603 subPartitionRepositoryOnThisLevel = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
604 PartitionRepositoryInvocationHandler.this,
605 partitionSpecification,
606 getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
607 }
608 final PartitionRepository subPartitionRepositoryToBeRelocated = wireUpPartition(partitionSpecification,
609 subPartitionRepositoryOnThisLevel,
610 subPartitionSpecification,
611 subPartitions.get(subPartitionSpecification));
612 subPartitionRepositoryOnThisLevel.addPartitionFor(subPartitionSpecification, subPartitionRepositoryToBeRelocated);
613 for (final Iterator entityIterator = getPartitionOnlyEntities().iterator(); entityIterator.hasNext();) {
614 final Object entity = entityIterator.next();
615 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
616 subPartitionRepositoryOnThisLevel.put((Entity) entity);
617 entityIterator.remove();
618 }
619 }
620 subPartitionSpecificationIterator.remove();
621
622
623 } else {
624 final PartitionRepository subPartitionWithPossibleIntersectingEntitiesOrSubPartitions = subPartitions.get(subPartitionSpecification);
625 if (subPartitionRepository == null) {
626 subPartitionRepository = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(),
627 PartitionRepositoryInvocationHandler.this,
628 partitionSpecification,
629 getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
630 }
631
632 for (final Object entity : subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitionOnlyEntities()) {
633 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
634 subPartitionRepository.put((Entity) entity);
635 }
636 }
637
638 for (final Object subSpecObject : subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitions().keySet()) {
639 final Specification subSpec = (Specification) subSpecObject;
640 if (subSpec.isSpecialCaseOf(partitionSpecification)) {
641 subPartitionRepository.addPartitionFor(subSpec, (PartitionRepository) subPartitionWithPossibleIntersectingEntitiesOrSubPartitions.getPartitions().get(subSpec));
642 }
643 }
644 }
645 }
646 if (subPartitionRepository == null) {
647 subPartitionRepository = wireUpPartition(PartitionRepositoryInvocationHandler.this.getPartitionSpecification(), PartitionRepositoryInvocationHandler.this, partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
648 }
649 if (!put) {
650 subPartitions.put(partitionSpecification, subPartitionRepository);
651
652 if (!(PartitionRepositoryInvocationHandler.this.getTargetRepository() == subPartitionRepository.getTargetRepository())) {
653 for (final Entity entity : getPartitionOnlyEntities()) {
654 if (typeSafeIsSatisfiedBy(partitionSpecification, entity)) {
655 PartitionRepositoryInvocationHandler.this.removePartitionOnlyEntity((V) entity);
656 subPartitionRepository.put(entity);
657 }
658 }
659 }
660 iteratorRegistry.updateIteratorsWith(partitionSpecification, getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository));
661
662 }
663 if (log.isInfoEnabled()) {
664 String repositoryId = "[not persistent repository]";
665 if (getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository) instanceof PersistentRepository) {
666 repositoryId = "'" + ((PersistentRepository) getCorrectPartitionRepository(partitionRepository, reusedPartitionRepository)).getRepositoryId() + "'";
667 }
668 log.info("Repository partition " + repositoryId + " added in " + stopWatch.elapsedTimeToString() + " [partition specification type=" + partitionSpecification.getType().getName() + "]");
669 }
670 return subPartitionRepository;
671 }
672 });
673
674 } catch (Exception e) {
675 handlePartitionRepositoryException(e, "addPartitionFor(Specification)");
676 return new NullRepository<V>().makePartition();
677 }
678 }
679
680
681 @Override
682 public <V extends T> PartitionRepository<V> addPartitionFor(final Specification<V> partitionSpecification) {
683 return addPartitionFor(partitionSpecification, (String) null);
684 }
685
686
687 @Override
688 public <V extends T> PartitionRepository<V> addPartitionFor(final Specification<V> partitionSpecification, final String repositoryId) {
689 Validate.notNull(partitionSpecification, "Partition specification parameter cannot be null");
690 final Object newRepositoryInstance;
691 final Repository<T> targetRepository = this.getTargetRepository();
692 final Class<? extends Repository> targetRepositoryType = targetRepository.getClass();
693 try {
694 if (canCastFrom_To(targetRepositoryType, PersistentRepository.class)) {
695 Validate.isTrue(isNotBlank(repositoryId), "Repository ID cannot be blank for persistent repositories");
696 final Constructor repositoryConstructor = targetRepositoryType.getConstructor(String.class, Synchronizer.class);
697 synchronized (this.synchronizer) {
698 newRepositoryInstance = repositoryConstructor.newInstance(repositoryId, this.synchronizer);
699 }
700 } else {
701 newRepositoryInstance = targetRepositoryType.newInstance();
702 }
703
704 } catch (NoSuchMethodException e1) {
705 throw new IllegalArgumentException("Repository implementation " + targetRepositoryType +
706 " lacks a public Constructor(java.lang.String repositoryId, net.sourceforge.domian.util.concurrent.locks.Synchronizer repositorySynchronizer)");
707 } catch (Exception e2) {
708 handlePartitionRepositoryException(e2, "addPartitionFor(Specification, String)");
709 return new NullRepository<V>().makePartition();
710 }
711 return addPartition(partitionSpecification, ((AbstractDomianCoreRepository<V>) newRepositoryInstance).makePartition());
712 }
713
714
715 @Override
716 public <V extends T> PartitionRepository<V> addPartitionFor(Specification<V> partitionSpecification, Repository<? super V> partitionRepository) {
717 Validate.notNull(partitionSpecification, "Partition specification parameter cannot be null");
718 Validate.notNull(partitionRepository, "Partition repository parameter cannot be null");
719 if (partitionRepository instanceof PartitionRepository) {
720 return addPartition(partitionSpecification, (PartitionRepository<V>) partitionRepository);
721 }
722 return addPartition(partitionSpecification, ((AbstractDomianCoreRepository<V>) partitionRepository).makePartition());
723 }
724
725
726 @Override
727 public <V extends T> Boolean repartition(final V entity) {
728 Validate.notNull(entity, "Entity parameter cannot be null");
729 try {
730 return this.synchronizer.callExclusively(new Callable<Boolean>() {
731 @SuppressWarnings("unchecked")
732 @Override
733 public Boolean call() {
734 StopWatch stopWatch = null;
735 if (log.isDebugEnabled() && isRootPartition()) {
736 stopWatch = new StopWatch().start();
737 }
738 boolean repartitioned = false;
739
740 if (entityExists_PossiblyInWrongPartition(entity)) {
741
742
743 if ((partitionSpecification == null && subPartitions.isEmpty()) ||
744 (partitionSpecification != null && partitionSpecification.isSatisfiedBy(entity))) {
745
746 repartitioned = true;
747
748
749 boolean specialized = false;
750
751
752 for (final Specification partitionSpecification : subPartitions.keySet()) {
753 if (partitionSpecification.isSatisfiedBy(entity)) {
754 final PartitionRepository partitionRepo = subPartitions.get(partitionSpecification);
755 partitionRepo.put(entity);
756 specialized = true;
757 }
758 }
759 if (specialized) {
760 repositoryDelegate.remove(entity);
761 }
762 } else {
763
764 repositoryDelegate.remove(entity);
765
766
767 getRootRepository().put(entity);
768 repartitioned = true;
769 }
770
771 for (final PartitionRepository partitionedRepository : subPartitions.values()) {
772 if (partitionedRepository.repartition(entity)) {
773 repartitioned = true;
774 }
775 }
776
777 } else {
778
779
780
781 for (final PartitionRepository partitionedRepository : subPartitions.values()) {
782 if (partitionedRepository.repartition(entity)) {
783 repartitioned = true;
784 }
785 }
786 if (!repartitioned) {
787
788 if (partitionSpecification != null && partitionSpecification.isSatisfiedBy(entity)) {
789
790 repositoryDelegate.put(entity);
791 repartitioned = true;
792 }
793 }
794 }
795 if (log.isDebugEnabled() && isRootPartition()) {
796 log.debug(DEBUG_LEVEL_INDENTATION + "Entity repartitioned in " + stopWatch.elapsedTimeToString() + " [" + entity + "]");
797 }
798 return repartitioned;
799 }
800 });
801
802 } catch (Exception e) {
803 handlePartitionRepositoryException(e, "repartition(Entity)");
804 return FALSE;
805 }
806 }
807
808
809 @Override
810 public void repartition() {
811 try {
812 this.synchronizer.callExclusively((Callable) new Callable<Void>() {
813 @Override
814 public Void call() {
815 StopWatch stopWatch = null;
816 if (log.isInfoEnabled()) {
817 stopWatch = new StopWatch().start();
818 }
819 final Iterator<? extends T> entityIterator = iterate(allObjects());
820 while (entityIterator.hasNext()) {
821 final T entity = entityIterator.next();
822 repartition(entity);
823 if (log.isTraceEnabled()) {
824 log.trace(TRACE_LEVEL_INDENTATION + "Entity repartitioned in " + stopWatch.lapTimeToString() + " [entity=" + entity + "]");
825 }
826 }
827 if (log.isInfoEnabled()) {
828 log.info("Repository repartitioned in " + stopWatch.elapsedTimeToString());
829 }
830 return null;
831 }
832 });
833
834 } catch (Exception e) {
835 handlePartitionRepositoryException(e, "repartition()");
836 }
837 }
838
839
840 protected Boolean entityExists_PossiblyInWrongPartition(final T entity) {
841 return countAllEntitiesWithoutSpecificationSharpening(createUniqueSpecificationFor(entity)) > 0;
842 }
843
844
845 @Override
846 public Map<Specification<? extends T>, PartitionRepository> getPartitions() {
847 return new HashMap<Specification<? extends T>, PartitionRepository>(this.subPartitions);
848 }
849
850
851 @Override
852 public Map<Specification<? extends T>, PartitionRepository> getAllPartitions() {
853 final Map<Specification<? extends T>, PartitionRepository> allPartitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
854 collectAllPartitions(allPartitionMap);
855 return allPartitionMap;
856 }
857
858
859 @Override
860 public void collectAllPartitions(final Map<Specification<? extends T>, PartitionRepository> partitionMap) {
861 for (final Map.Entry<Specification<? extends T>, PartitionRepository> subPartititon : this.subPartitions.entrySet()) {
862 partitionMap.put(subPartititon.getKey(), subPartititon.getValue());
863 subPartititon.getValue().collectAllPartitions(partitionMap);
864 }
865 }
866
867
868 @Override
869 public <R extends Repository> void collectAllPartitionsWithRepositorySatisfying(final Specification<R> specification,
870 final Map<Specification<? extends T>, PartitionRepository> partitionMap) {
871 collectAllPartitions(partitionMap);
872 final Iterator<R> partitionedRepoIterator = (Iterator<R>) partitionMap.values().iterator();
873 while (partitionedRepoIterator.hasNext()) {
874 final R partitionedRepo = partitionedRepoIterator.next();
875 if (!specification.isSatisfiedBy(partitionedRepo)) {
876 partitionedRepoIterator.remove();
877 }
878 }
879 }
880
881
882 @Override
883 public <V extends T> PartitionRepository findPartitionFor(final Specification<V> partitionSpecification) {
884
885 for (final Specification subPartitionSpecification : this.subPartitions.keySet()) {
886 if (partitionSpecification.isSpecialCaseOf(subPartitionSpecification)) {
887 return this.subPartitions.get(subPartitionSpecification).findPartitionFor(partitionSpecification);
888 }
889 }
890 return this;
891 }
892
893
894 @Override
895 public Set<T> getPartitionOnlyEntities() {
896 if (this.repositoryDelegateIsAlreadyPartitioned) {
897 return ((PartitionRepository) this.repositoryDelegate).getPartitionOnlyEntities();
898
899 } else {
900 if (this.isRootPartition()) {
901
902
903
904
905 return new HashSet<T>(this.repositoryDelegate.find((Specification<T>) allEntities()));
906
907
908 } else {
909 final Specification<T> allOfThisPartitionType = createSpecificationFor(this.partitionSpecification.getType());
910 final Specification<T> partitionAwareSpecification = new PartitionAwareSpecification(allOfThisPartitionType, this.partitionSpecification);
911 return new HashSet<T>(this.repositoryDelegate.find(partitionAwareSpecification));
912 }
913 }
914 }
915
916 protected <V extends T> void removePartitionOnlyEntity(final V entity) {
917 if (this.repositoryDelegateIsAlreadyPartitioned) {
918 if (((PartitionRepository) this.repositoryDelegate).getPartitionOnlyEntities().contains(entity)) {
919 this.repositoryDelegate.remove(entity);
920 }
921 } else {
922 this.repositoryDelegate.remove(entity);
923 }
924 }
925
926
927
928
929
930
931 private class IterateAllEntitiesSpecifiedBy<V extends T> implements Callable<Iterator<V>> {
932 private final PartitionAwareSpecification<V> partitionAwareSpecification;
933
934 private IterateAllEntitiesSpecifiedBy(final Specification<V> specification) {
935 this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
936 }
937
938 @Override
939 public Iterator<V> call() throws Exception {
940
941 if (this.partitionAwareSpecification.isOutsidePartitionBoundaries()) {
942 return Collections.<V>emptySet().iterator();
943 }
944 final PartitionRepositoryIterator<T, V> iterator = new PartitionRepositoryIterator<T, V>(this.partitionAwareSpecification, PartitionRepositoryInvocationHandler.this);
945 iteratorRegistry.addIterator(iterator);
946 return iterator;
947 }
948 }
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983 private class FindAllEntitiesSpecifiedBy<V extends T> implements Callable<Collection<V>> {
984 private Boolean modifyOriginalSpecificationsToFitPartitionSpecification;
985 private Specification<V> partitionAwareSpecification;
986
987 private FindAllEntitiesSpecifiedBy(final Specification<V> specification) {
988 this(specification, TRUE);
989 }
990
991 private FindAllEntitiesSpecifiedBy(final Specification<V> specification, final Boolean modifyOriginalSpecificationsToFitPartitionSpecification) {
992 this.modifyOriginalSpecificationsToFitPartitionSpecification = modifyOriginalSpecificationsToFitPartitionSpecification;
993 if (this.modifyOriginalSpecificationsToFitPartitionSpecification) {
994 this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
995 } else {
996 this.partitionAwareSpecification = specification;
997 }
998 }
999
1000 @Override
1001 public Collection<V> call() throws Exception {
1002
1003 if (this.modifyOriginalSpecificationsToFitPartitionSpecification &&
1004 ((PartitionAwareSpecification) this.partitionAwareSpecification).isOutsidePartitionBoundaries()) {
1005 return Collections.emptySet();
1006 }
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 final Collection<V> selection = repositoryDelegate.findAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1022 for (final PartitionRepository partitionRepository : getPartitions().values()) {
1023 final Collection<V> subSelection = partitionRepository.findAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1024 selection.addAll(subSelection);
1025 }
1026 return selection;
1027
1028 }
1029 }
1030
1031
1032 private class Put<V extends T> implements Callable<Void> {
1033 private final V entity;
1034
1035 private Put(final V entity) {
1036 this.entity = entity;
1037 }
1038
1039 @Override
1040 public Void call() throws Exception {
1041 boolean put = false;
1042 for (final Specification<? extends T> partitionSpecification : subPartitions.keySet()) {
1043 if (typeSafeIsSatisfiedBy(partitionSpecification, this.entity)) {
1044 subPartitions.get(partitionSpecification).put(this.entity);
1045 put = true;
1046 }
1047 }
1048 if (!put) {
1049 if (partitionSpecification == null || partitionSpecification.isSatisfiedBy(this.entity)) {
1050 repositoryDelegate.put(this.entity);
1051 } else {
1052 if (isRootPartition()) {
1053 throw new IllegalArgumentException("No suitable partition exists for " + this.entity);
1054 }
1055 log.warn("Not finding any suitable partition to put entity at this partition sub-tree, trying upper branches");
1056 getRootRepository().put(this.entity);
1057 }
1058 }
1059 return null;
1060 }
1061 }
1062
1063
1064 private class RemoveAllEntitiesSpecifiedBy<V extends T> implements Callable<Long> {
1065 private final PartitionAwareSpecification<V> partitionAwareSpecification;
1066
1067 private RemoveAllEntitiesSpecifiedBy(final Specification<V> specification) {
1068 this.partitionAwareSpecification = new PartitionAwareSpecification<V>(specification, partitionSpecification);
1069 }
1070
1071 @Override
1072 public Long call() throws Exception {
1073
1074 if (this.partitionAwareSpecification.isOutsidePartitionBoundaries()) {
1075 return 0L;
1076 }
1077 long numberOfRemovedEntities = 0;
1078 final PartitionRepository partitionRepository = findPartitionFor(this.partitionAwareSpecification);
1079 final Collection allPartitionEntities = partitionRepository.getPartitionOnlyEntities();
1080 for (final Object entity : allPartitionEntities) {
1081 if (typeSafeIsSatisfiedBy(this.partitionAwareSpecification, entity)) {
1082 partitionRepository.remove((Entity) entity);
1083 ++numberOfRemovedEntities;
1084 }
1085 }
1086 for (final Object subPartitionRepositoryObject : partitionRepository.getPartitions().values()) {
1087 final PartitionRepository subPartitionRepository = (PartitionRepository) subPartitionRepositoryObject;
1088 numberOfRemovedEntities += subPartitionRepository.removeAllEntitiesSpecifiedBy(this.partitionAwareSpecification);
1089 }
1090 return numberOfRemovedEntities;
1091 }
1092 }
1093
1094
1095 private class Remove<V extends T> implements Callable<Boolean> {
1096 private final V entity;
1097
1098 private Remove(final V entity) {
1099 this.entity = entity;
1100 }
1101
1102 @Override
1103 public Boolean call() throws Exception {
1104 boolean removed = false;
1105 for (final Specification partitionSpecification : subPartitions.keySet()) {
1106 if (typeSafeIsSatisfiedBy(partitionSpecification, this.entity)) {
1107 subPartitions.get(partitionSpecification).remove(this.entity);
1108 removed = true;
1109 break;
1110 }
1111 }
1112 if (!removed) {
1113 removed = repositoryDelegate.remove(this.entity);
1114 }
1115 return removed;
1116 }
1117 }
1118
1119
1120
1121
1122
1123
1124
1125 private static class PartitionRepositoryIterator<T extends Entity, V extends T> extends InMemoryRepository.InMemoryRepositoryIterator<V, T> {
1126
1127 private PartitionRepository<T> currentRepositoryPartition;
1128
1129 private final Map<Specification<? extends T>, PartitionRepository> partitionMap;
1130 private Iterator<PartitionRepository> partitionedRepoIterator;
1131
1132
1133 private Set<Serializable> iteratedEntityRegistry = new HashSet<Serializable>();
1134
1135
1136 private void addPartition(final PartitionRepository partition) {
1137 this.partitionMap.put(partition.getPartitionSpecification(), partition);
1138 this.partitionedRepoIterator = this.partitionMap.values().iterator();
1139 }
1140
1141 private PartitionRepositoryIterator(final Specification<V> specification, final PartitionRepository<T> repo) {
1142 this.specification = specification;
1143 this.currentRepositoryPartition = repo.findPartitionFor(specification);
1144 if (this.currentRepositoryPartition == null) {
1145 this.currentRepositoryPartition = repo;
1146 this.partitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
1147 this.currentRepositoryPartition.collectAllPartitions(this.partitionMap);
1148 this.partitionedRepoIterator = this.partitionMap.values().iterator();
1149 } else {
1150 this.partitionMap = new HashMap<Specification<? extends T>, PartitionRepository>();
1151 this.currentRepositoryPartition.collectAllPartitions(this.partitionMap);
1152 this.partitionedRepoIterator = this.partitionMap.values().iterator();
1153 this.entityIterator = this.currentRepositoryPartition.getPartitionOnlyEntities().iterator();
1154 }
1155 findNextEntity();
1156 }
1157
1158 @Override
1159 protected void findNextEntity() {
1160 this.nextEntity = null;
1161 while (this.nextEntity == null && this.entityIterator != null && this.entityIterator.hasNext()) {
1162 final Object entity = this.entityIterator.next();
1163 if (typeSafeIsSatisfiedBy(this.specification, entity)) {
1164 final V entityOfCorrectType = this.specification.getType().cast(entity);
1165 if (this.specification.isSatisfiedBy(entityOfCorrectType)) {
1166 if (!this.iteratedEntityRegistry.contains(entityOfCorrectType.getEntityId())) {
1167 this.nextEntity = entityOfCorrectType;
1168 this.iteratedEntityRegistry.add(this.nextEntity.getEntityId());
1169 }
1170 }
1171 }
1172 }
1173 if (this.nextEntity == null && (this.entityIterator == null || !this.entityIterator.hasNext())) {
1174
1175 while (this.nextEntity == null && this.partitionedRepoIterator.hasNext()) {
1176 this.currentRepositoryPartition = this.partitionedRepoIterator.next();
1177 this.entityIterator = this.currentRepositoryPartition.getPartitionOnlyEntities().iterator();
1178 while (this.nextEntity == null && this.entityIterator.hasNext()) {
1179 final Object entity = this.entityIterator.next();
1180 if (typeSafeIsSatisfiedBy(this.specification, entity)) {
1181 final V entityOfCorrectType = this.specification.getType().cast(entity);
1182 if (this.specification.isSatisfiedBy(entityOfCorrectType)) {
1183 if (!this.iteratedEntityRegistry.contains(entityOfCorrectType.getEntityId())) {
1184 this.nextEntity = entityOfCorrectType;
1185 this.iteratedEntityRegistry.add(this.nextEntity.getEntityId());
1186 break;
1187 }
1188 }
1189 }
1190 }
1191 }
1192 }
1193 }
1194
1195 @SuppressWarnings("unchecked")
1196 @Override
1197 public void remove() {
1198 if (!this.nextIsInvoked || this.removeIsInvoked) {
1199 throw new IllegalStateException();
1200 }
1201
1202
1203 this.currentRepositoryPartition.remove((T) this.currentEntity);
1204 this.removeIsInvoked = true;
1205 }
1206
1207 }
1208
1209
1210
1211 private static class IteratorRegistry {
1212
1213 private final HashSet<WeakReference<PartitionRepositoryIterator>> activeIteratorSet =
1214 new HashSet<WeakReference<PartitionRepositoryIterator>>();
1215
1216 void addIterator(final PartitionRepositoryIterator iterator) {
1217 this.activeIteratorSet.add(new WeakReference<PartitionRepositoryIterator>(iterator));
1218 }
1219
1220 void updateIteratorsWith(final Specification partitionSpecification, final PartitionRepository subPartition) {
1221 for (final WeakReference<PartitionRepositoryIterator> activeIteratorReference : this.activeIteratorSet) {
1222 final PartitionRepositoryIterator activeIterator = activeIteratorReference.get();
1223 if (activeIterator == null) {
1224
1225 this.activeIteratorSet.remove(activeIteratorReference);
1226 } else {
1227 final Specification currentPartitionRepoSpecn = activeIterator.currentRepositoryPartition.getPartitionSpecification();
1228 if (currentPartitionRepoSpecn != null && currentPartitionRepoSpecn.isGeneralizationOf(partitionSpecification)) {
1229 activeIterator.addPartition(subPartition);
1230 }
1231 }
1232 }
1233 }
1234 }
1235
1236
1237 class PartitionAwareSpecification<V extends T> implements Specification<V> {
1238 private boolean specificationIsDisjoint = false;
1239
1240
1241 Specification<V> customSpecification;
1242 Specification<V> partitionAwareSpecification;
1243
1244 PartitionAwareSpecification(final Specification<V> customSpecification, final Specification<T> partitionSpecification) {
1245 this.customSpecification = customSpecification;
1246
1247 if (partitionSpecification == null) {
1248 this.partitionAwareSpecification = this.customSpecification;
1249
1250 } else if (customSpecification.equals(partitionSpecification)) {
1251 this.partitionAwareSpecification = this.customSpecification;
1252
1253 } else if (customSpecification.isGeneralizationOf((Specification<? extends V>) partitionSpecification)) {
1254
1255
1256
1257
1258 this.partitionAwareSpecification = (Specification<V>) createSpecificationFor(partitionSpecification.getType());
1259
1260
1261
1262
1263
1264
1265 } else if (customSpecification.isSpecialCaseOf(partitionSpecification)) {
1266 this.partitionAwareSpecification = this.customSpecification;
1267
1268 } else if (customSpecification.isDisjointWith(partitionSpecification)) {
1269
1270
1271
1272 this.partitionAwareSpecification = this.customSpecification;
1273 this.specificationIsDisjoint = true;
1274
1275 } else {
1276
1277
1278
1279
1280 this.partitionAwareSpecification = this.customSpecification;
1281 }
1282 }
1283
1284 @Override
1285 public Class<V> getType() {
1286 return this.partitionAwareSpecification.getType();
1287 }
1288
1289 @Override
1290 public Boolean isSatisfiedBy(final V candidate) {
1291 return this.partitionAwareSpecification.isSatisfiedBy(candidate);
1292 }
1293
1294 @Override
1295 public Boolean isGeneralizationOf(final Specification<? extends V> specification) {
1296 return this.partitionAwareSpecification.isGeneralizationOf(specification);
1297 }
1298
1299 @Override
1300 public Boolean isSpecialCaseOf(final Specification<? super V> specification) {
1301 return this.partitionAwareSpecification.isSpecialCaseOf(specification);
1302 }
1303
1304 @Override
1305 public CompositeSpecification<V> and(final Specification<? super V> otherSpecification) {
1306 return this.partitionAwareSpecification.and(otherSpecification);
1307 }
1308
1309 @Override
1310 public CompositeSpecification<V> or(final Specification<? super V> otherSpecification) {
1311 return this.partitionAwareSpecification.or(otherSpecification);
1312 }
1313
1314 @Override
1315 public <F> CompositeSpecification<V> where(final String accessibleObjectName, final Specification<F> accessibleObjectSpecification) {
1316 throw new NotImplementedException();
1317 }
1318
1319 @Override
1320 public Boolean isDisjointWith(final Specification otherSpecification) {
1321 return this.partitionAwareSpecification.isDisjointWith(otherSpecification);
1322 }
1323
1324 private boolean isWithinPartitionBoundaries() {
1325 return !isOutsidePartitionBoundaries();
1326 }
1327
1328 private boolean isOutsidePartitionBoundaries() {
1329 return this.specificationIsDisjoint;
1330 }
1331
1332 @Override
1333 public int hashCode() {
1334 return this.partitionAwareSpecification.hashCode();
1335 }
1336
1337 @Override
1338 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
1339 public boolean equals(final Object otherObject) {
1340 return this.partitionAwareSpecification.equals(otherObject);
1341 }
1342 }
1343 }