package org.axonframework.integrationtests.cache;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.axonframework.common.caching.Cache;
import org.axonframework.config.Configuration;
import org.axonframework.config.DefaultConfigurer;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.integrationtests.cache.CachedSaga;
import org.axonframework.modelling.saga.repository.CachingSagaStore;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.class */
public abstract class CachingIntegrationTestSuite {
    private static final boolean DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES = false;
    private static final int NUMBER_OF_UPDATES = 4096;
    private static final int NUMBER_OF_CONCURRENT_PUBLISHERS = 8;
    private static final int NUMBER_OF_ASSOCIATIONS = 42;
    protected Configuration config;
    private StreamingEventProcessor sagaProcessor;
    private EntryListenerValidator<SagaStore.Entry<CachedSaga>> sagaCacheListener;
    private EntryListenerValidator<Set<String>> associationsCacheListener;
    private static final String[] SAGA_NAMES = {"foo", "bar", "baz", "and", "some", "more"};
    private static final Duration DEFAULT_DELAY = Duration.ofMillis(25);
    private static final Duration ONE_SECOND = Duration.ofSeconds(1);
    private static final Duration TWO_SECONDS = Duration.ofSeconds(2);
    private static final Duration FOUR_SECONDS = Duration.ofSeconds(4);
    private static final Duration EIGHT_SECONDS = Duration.ofSeconds(8);
    private static final Duration SIXTEEN_SECONDS = Duration.ofSeconds(16);
    private static final Duration THIRTY_TWO_SECONDS = Duration.ofSeconds(32);

    /* loaded from: input_file:org/axonframework/integrationtests/cache/CachingIntegrationTestSuite$EntryListenerValidator.class */
    private static class EntryListenerValidator<V> implements Cache.EntryListener {
        private final ListenerType type;
        private final Set<String> created;
        private final Map<String, V> updates;
        private final Set<String> removed;
        private final Set<String> expired;

        private EntryListenerValidator(ListenerType listenerType) {
            this.created = new ConcurrentSkipListSet();
            this.updates = new ConcurrentHashMap();
            this.removed = new ConcurrentSkipListSet();
            this.expired = new ConcurrentSkipListSet();
            this.type = listenerType;
        }

        public void onEntryCreated(Object obj, Object obj2) {
            String obj3 = obj.toString();
            this.created.add(obj3);
            this.updates.put(obj3, obj2);
        }

        public void onEntryUpdated(Object obj, Object obj2) {
            this.updates.put(obj.toString(), obj2);
        }

        public void onEntryRemoved(Object obj) {
            this.removed.add(obj.toString());
        }

        public void onEntryExpired(Object obj) {
            this.expired.add(obj.toString());
        }

        public void onEntryRead(Object obj, Object obj2) {
        }

        public Object clone() {
            throw new UnsupportedOperationException();
        }

        public Optional<V> get(String str) {
            return Optional.ofNullable(this.updates.get(str));
        }

        public boolean isRemoved(String str) {
            return this.removed.contains(str) || this.expired.contains(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/integrationtests/cache/CachingIntegrationTestSuite$ListenerType.class */
    public enum ListenerType {
        SAGA,
        ASSOCIATIONS
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @BeforeEach
    public void setUp() {
        Cache buildCache = buildCache("saga");
        this.sagaCacheListener = new EntryListenerValidator<>(ListenerType.SAGA);
        buildCache.registerCacheEntryListener(this.sagaCacheListener);
        Cache buildCache2 = buildCache("associations");
        this.associationsCacheListener = new EntryListenerValidator<>(ListenerType.ASSOCIATIONS);
        buildCache2.registerCacheEntryListener(this.associationsCacheListener);
        Consumer consumer = sagaConfigurer -> {
            sagaConfigurer.configureSagaStore(configuration -> {
                return CachingSagaStore.builder().delegateSagaStore(new InMemorySagaStore()).sagaCache(buildCache).associationsCache(buildCache2).build();
            });
        };
        TrackingEventProcessorConfiguration andEventAvailabilityTimeout = TrackingEventProcessorConfiguration.forParallelProcessing(4).andEventAvailabilityTimeout(10L, TimeUnit.MILLISECONDS);
        this.config = DefaultConfigurer.defaultConfiguration(false).configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing(eventProcessingConfigurer -> {
            eventProcessingConfigurer.usingTrackingEventProcessors().registerTrackingEventProcessorConfiguration("CachedSagaProcessor", configuration2 -> {
                return andEventAvailabilityTimeout;
            }).registerSaga(CachedSaga.class, consumer);
        }).start();
        this.sagaProcessor = (StreamingEventProcessor) this.config.eventProcessingConfiguration().eventProcessor("CachedSagaProcessor", StreamingEventProcessor.class).orElseThrow(() -> {
            return new IllegalStateException("CachedSagaProcessor is not present");
        });
    }

    public abstract Cache buildCache(String str);

    @Test
    void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() {
        int i = 1;
        int i2 = 1;
        String str = SAGA_NAMES[DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES];
        String str2 = str + "-id";
        String sagaAssociationCacheKey = sagaAssociationCacheKey(str2);
        publish(new CachedSaga.SagaCreatedEvent(str2, str, NUMBER_OF_ASSOCIATIONS));
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(ONE_SECOND).until(() -> {
            return handledEventsUpTo(i);
        });
        Optional<Set<String>> optional = this.associationsCacheListener.get(sagaAssociationCacheKey);
        Assertions.assertTrue(optional.isPresent());
        Optional<String> findFirst = optional.get().stream().findFirst();
        Assertions.assertTrue(findFirst.isPresent());
        String str3 = findFirst.get();
        Optional<SagaStore.Entry<CachedSaga>> optional2 = this.sagaCacheListener.get(str3);
        Assertions.assertTrue(optional2.isPresent());
        CachedSaga cachedSaga = (CachedSaga) optional2.get().saga();
        Assertions.assertEquals(str, cachedSaga.getName());
        Assertions.assertTrue(cachedSaga.getState().isEmpty());
        publishBulkUpdatesTo(str2, NUMBER_OF_UPDATES);
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(FOUR_SECONDS).until(() -> {
            return handledEventsUpTo(i + NUMBER_OF_UPDATES);
        });
        Optional<SagaStore.Entry<CachedSaga>> optional3 = this.sagaCacheListener.get(str3);
        Assertions.assertTrue(optional3.isPresent());
        CachedSaga cachedSaga2 = (CachedSaga) optional3.get().saga();
        Assertions.assertEquals(str, cachedSaga2.getName());
        Assertions.assertEquals(NUMBER_OF_UPDATES, cachedSaga2.getState().size());
        publish(new CachedSaga.SagaEndsEvent(str2, true));
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(ONE_SECOND).until(() -> {
            return handledEventsUpTo(i + NUMBER_OF_UPDATES + i2);
        });
        Assertions.assertTrue(this.associationsCacheListener.isRemoved(sagaAssociationCacheKey));
        Assertions.assertTrue(this.sagaCacheListener.isRemoved(str3));
    }

    @Test
    void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException {
        int i = 1;
        int i2 = 1;
        String str = SAGA_NAMES[DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES];
        String str2 = "some-id";
        String sagaAssociationCacheKey = sagaAssociationCacheKey("some-id");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUMBER_OF_CONCURRENT_PUBLISHERS);
        publish(new CachedSaga.SagaCreatedEvent("some-id", str, NUMBER_OF_ASSOCIATIONS));
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(TWO_SECONDS).until(() -> {
            return handledEventsUpTo(i);
        });
        Optional<Set<String>> optional = this.associationsCacheListener.get(sagaAssociationCacheKey);
        Assertions.assertTrue(optional.isPresent());
        Optional<String> findFirst = optional.get().stream().findFirst();
        Assertions.assertTrue(findFirst.isPresent());
        String str3 = findFirst.get();
        Optional<SagaStore.Entry<CachedSaga>> optional2 = this.sagaCacheListener.get(str3);
        Assertions.assertTrue(optional2.isPresent());
        CachedSaga cachedSaga = (CachedSaga) optional2.get().saga();
        Assertions.assertEquals(str, cachedSaga.getName());
        Assertions.assertTrue(cachedSaga.getState().isEmpty());
        ((CompletableFuture) IntStream.range(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES, NUMBER_OF_CONCURRENT_PUBLISHERS).mapToObj(i3 -> {
            return CompletableFuture.runAsync(() -> {
                publishBulkUpdatesTo(str2, NUMBER_OF_UPDATES);
            }, newFixedThreadPool);
        }).reduce((completableFuture, completableFuture2) -> {
            return CompletableFuture.allOf(completableFuture, completableFuture2);
        }).orElse(CompletableFuture.completedFuture(null))).get(15L, TimeUnit.SECONDS);
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(SIXTEEN_SECONDS).until(() -> {
            return handledEventsUpTo(i + 32768);
        });
        Optional<SagaStore.Entry<CachedSaga>> optional3 = this.sagaCacheListener.get(str3);
        Assertions.assertTrue(optional3.isPresent());
        CachedSaga cachedSaga2 = (CachedSaga) optional3.get().saga();
        Assertions.assertEquals(str, cachedSaga2.getName());
        Assertions.assertEquals(32768, cachedSaga2.getState().size());
        publish(new CachedSaga.SagaEndsEvent("some-id", true));
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(TWO_SECONDS).until(() -> {
            return handledEventsUpTo(i + 32768 + i2);
        });
        Assertions.assertTrue(this.associationsCacheListener.isRemoved(sagaAssociationCacheKey));
        Assertions.assertTrue(this.sagaCacheListener.isRemoved(str3));
    }

    @Test
    void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException {
        int length = SAGA_NAMES.length;
        int length2 = SAGA_NAMES.length;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(SAGA_NAMES.length);
        String[] strArr = SAGA_NAMES;
        int length3 = strArr.length;
        for (int i = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i < length3; i++) {
            String str = strArr[i];
            publish(new CachedSaga.SagaCreatedEvent(str + "-id", str, NUMBER_OF_ASSOCIATIONS));
        }
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(TWO_SECONDS).until(() -> {
            return handledEventsUpTo(length);
        });
        String[] strArr2 = SAGA_NAMES;
        int length4 = strArr2.length;
        for (int i2 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i2 < length4; i2++) {
            String str2 = strArr2[i2];
            Optional<Set<String>> optional = this.associationsCacheListener.get(sagaAssociationCacheKey(str2 + "-id"));
            Assertions.assertTrue(optional.isPresent());
            Optional<String> findFirst = optional.get().stream().findFirst();
            Assertions.assertTrue(findFirst.isPresent());
            Optional<SagaStore.Entry<CachedSaga>> optional2 = this.sagaCacheListener.get(findFirst.get());
            Assertions.assertTrue(optional2.isPresent());
            CachedSaga cachedSaga = (CachedSaga) optional2.get().saga();
            Assertions.assertEquals(str2, cachedSaga.getName());
            Assertions.assertTrue(cachedSaga.getState().isEmpty());
        }
        ((CompletableFuture) Arrays.stream(SAGA_NAMES).map(str3 -> {
            return CompletableFuture.runAsync(() -> {
                publishBulkUpdatesTo(str3 + "-id", NUMBER_OF_UPDATES);
            }, newFixedThreadPool);
        }).reduce((completableFuture, completableFuture2) -> {
            return CompletableFuture.allOf(completableFuture, completableFuture2);
        }).orElse(CompletableFuture.completedFuture(null))).get(15L, TimeUnit.SECONDS);
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(EIGHT_SECONDS).until(() -> {
            return handledEventsUpTo(length + (NUMBER_OF_UPDATES * SAGA_NAMES.length));
        });
        String[] strArr3 = SAGA_NAMES;
        int length5 = strArr3.length;
        for (int i3 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i3 < length5; i3++) {
            String str4 = strArr3[i3];
            Optional<Set<String>> optional3 = this.associationsCacheListener.get(sagaAssociationCacheKey(str4 + "-id"));
            Assertions.assertTrue(optional3.isPresent());
            Optional<String> findFirst2 = optional3.get().stream().findFirst();
            Assertions.assertTrue(findFirst2.isPresent());
            Optional<SagaStore.Entry<CachedSaga>> optional4 = this.sagaCacheListener.get(findFirst2.get());
            Assertions.assertTrue(optional4.isPresent());
            CachedSaga cachedSaga2 = (CachedSaga) optional4.get().saga();
            Assertions.assertEquals(str4, cachedSaga2.getName());
            Assertions.assertEquals(NUMBER_OF_UPDATES, cachedSaga2.getState().size());
        }
        String[] strArr4 = SAGA_NAMES;
        int length6 = strArr4.length;
        for (int i4 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i4 < length6; i4++) {
            publish(new CachedSaga.SagaEndsEvent(strArr4[i4] + "-id", true));
        }
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(TWO_SECONDS).until(() -> {
            return handledEventsUpTo(length + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + length2);
        });
        String[] strArr5 = SAGA_NAMES;
        int length7 = strArr5.length;
        for (int i5 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i5 < length7; i5++) {
            Assertions.assertTrue(this.associationsCacheListener.isRemoved(sagaAssociationCacheKey(strArr5[i5] + "-id")));
        }
    }

    @Test
    void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException {
        int length = SAGA_NAMES.length;
        int length2 = SAGA_NAMES.length;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS);
        String[] strArr = SAGA_NAMES;
        int length3 = strArr.length;
        for (int i = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i < length3; i++) {
            String str = strArr[i];
            publish(new CachedSaga.SagaCreatedEvent(str + "-id", str, NUMBER_OF_ASSOCIATIONS));
        }
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(TWO_SECONDS).until(() -> {
            return handledEventsUpTo(length);
        });
        String[] strArr2 = SAGA_NAMES;
        int length4 = strArr2.length;
        for (int i2 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i2 < length4; i2++) {
            String str2 = strArr2[i2];
            Optional<Set<String>> optional = this.associationsCacheListener.get(sagaAssociationCacheKey(str2 + "-id"));
            Assertions.assertTrue(optional.isPresent());
            Optional<String> findFirst = optional.get().stream().findFirst();
            Assertions.assertTrue(findFirst.isPresent());
            Optional<SagaStore.Entry<CachedSaga>> optional2 = this.sagaCacheListener.get(findFirst.get());
            Assertions.assertTrue(optional2.isPresent());
            CachedSaga cachedSaga = (CachedSaga) optional2.get().saga();
            Assertions.assertEquals(str2, cachedSaga.getName());
            Assertions.assertTrue(cachedSaga.getState().isEmpty());
        }
        ((CompletableFuture) IntStream.range(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES, SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS).mapToObj(i3 -> {
            return CompletableFuture.runAsync(() -> {
                publishBulkUpdatesTo(SAGA_NAMES[i3 % SAGA_NAMES.length] + "-id", NUMBER_OF_UPDATES);
            }, newFixedThreadPool);
        }).reduce((completableFuture, completableFuture2) -> {
            return CompletableFuture.allOf(completableFuture, completableFuture2);
        }).orElse(CompletableFuture.completedFuture(null))).get(15L, TimeUnit.SECONDS);
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(THIRTY_TWO_SECONDS).until(() -> {
            return handledEventsUpTo(length + (NUMBER_OF_UPDATES * SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS));
        });
        String[] strArr3 = SAGA_NAMES;
        int length5 = strArr3.length;
        for (int i4 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i4 < length5; i4++) {
            String str3 = strArr3[i4];
            Optional<Set<String>> optional3 = this.associationsCacheListener.get(sagaAssociationCacheKey(str3 + "-id"));
            Assertions.assertTrue(optional3.isPresent());
            Optional<String> findFirst2 = optional3.get().stream().findFirst();
            Assertions.assertTrue(findFirst2.isPresent());
            Optional<SagaStore.Entry<CachedSaga>> optional4 = this.sagaCacheListener.get(findFirst2.get());
            Assertions.assertTrue(optional4.isPresent());
            CachedSaga cachedSaga2 = (CachedSaga) optional4.get().saga();
            Assertions.assertEquals(str3, cachedSaga2.getName());
            Assertions.assertEquals(32768, cachedSaga2.getState().size());
        }
        String[] strArr4 = SAGA_NAMES;
        int length6 = strArr4.length;
        for (int i5 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i5 < length6; i5++) {
            publish(new CachedSaga.SagaEndsEvent(strArr4[i5] + "-id", true));
        }
        Awaitility.await().pollDelay(DEFAULT_DELAY).atMost(FOUR_SECONDS).until(() -> {
            return handledEventsUpTo(length + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + length2);
        });
        String[] strArr5 = SAGA_NAMES;
        int length7 = strArr5.length;
        for (int i6 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i6 < length7; i6++) {
            Assertions.assertTrue(this.associationsCacheListener.isRemoved(sagaAssociationCacheKey(strArr5[i6] + "-id")));
        }
    }

    private void publishBulkUpdatesTo(String str, int i) {
        Object[] objArr = new Object[i];
        for (int i2 = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i2 < i; i2++) {
            objArr[i2] = new CachedSaga.VeryImportantEvent(str, Integer.valueOf(i2));
        }
        publish(objArr);
    }

    private void publish(Object... objArr) {
        ArrayList arrayList = new ArrayList();
        int length = objArr.length;
        for (int i = DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES; i < length; i++) {
            arrayList.add(GenericEventMessage.asEventMessage(objArr[i]));
        }
        this.config.eventBus().publish(arrayList);
    }

    private Boolean handledEventsUpTo(int i) {
        return (Boolean) this.sagaProcessor.processingStatus().values().stream().map(eventTrackerStatus -> {
            return Boolean.valueOf(eventTrackerStatus.getCurrentPosition().orElse(-1L) >= ((long) (i - 1)));
        }).reduce((v0, v1) -> {
            return Boolean.logicalAnd(v0, v1);
        }).orElse(false);
    }

    private static String sagaAssociationCacheKey(String str) {
        return CachedSaga.class.getName() + "/id=" + str;
    }
}
