/*
 * Decompiled with CFR 0.152.
 */
package xyz.tcheeric.nostr.cashu.services.stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Generated;
import nostr.event.BaseMessage;
import nostr.event.filter.Filterable;
import nostr.event.filter.Filters;
import nostr.event.filter.SinceFilter;
import nostr.event.impl.GenericEvent;
import nostr.event.json.codec.BaseMessageDecoder;
import nostr.event.json.codec.EventEncodingException;
import nostr.event.message.EoseMessage;
import nostr.event.message.EventMessage;
import nostr.event.message.NoticeMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.tcheeric.nostr.cashu.services.stream.NostrStreamingClient;
import xyz.tcheeric.nostr.cashu.util.AsyncExecutor;
import xyz.tcheeric.nostr.cashu.util.RetryPolicy;

public final class NostrSubscription
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(NostrSubscription.class);
    private static final int DEDUPE_WINDOW = 2048;
    private static final ObjectMapper JSON = new ObjectMapper();
    private final String id;
    private final List<Filterable> baseFilterables;
    private final Integer limit;
    private final NostrStreamingClient streamingClient;
    private final AsyncExecutor executor;
    private final RetryPolicy retryPolicy;
    private final Consumer<GenericEvent> eventListener;
    private final Consumer<Throwable> errorListener;
    private final Supplier<Map<String, String>> relaySupplier;
    private final boolean closeStreamingClientOnShutdown;
    private final ExecutorService dispatcher;
    private final BaseMessageDecoder<BaseMessage> decoder;
    private final AtomicBoolean closed;
    private final AtomicBoolean reconnecting;
    private final AtomicReference<AutoCloseable> activeHandle;
    private final Object cursorLock;
    private final Object dedupeLock;
    private final Deque<String> recentEventIds;
    private final Set<String> recentEventSet;
    private volatile long cursor;

    public NostrSubscription(Filters filters, NostrStreamingClient streamingClient, AsyncExecutor executor, RetryPolicy retryPolicy, Consumer<GenericEvent> eventListener, Consumer<Throwable> errorListener, Supplier<Map<String, String>> relaySupplier) {
        this(UUID.randomUUID().toString(), filters, streamingClient, executor, retryPolicy, eventListener, errorListener, relaySupplier, false);
    }

    public NostrSubscription(String id, Filters filters, NostrStreamingClient streamingClient, AsyncExecutor executor, RetryPolicy retryPolicy, Consumer<GenericEvent> eventListener, Consumer<Throwable> errorListener, Supplier<Map<String, String>> relaySupplier) {
        this(id, filters, streamingClient, executor, retryPolicy, eventListener, errorListener, relaySupplier, false);
    }

    public NostrSubscription(String id, Filters filters, NostrStreamingClient streamingClient, AsyncExecutor executor, RetryPolicy retryPolicy, Consumer<GenericEvent> eventListener, Consumer<Throwable> errorListener, Supplier<Map<String, String>> relaySupplier, boolean closeStreamingClientOnShutdown) {
        this.id = Objects.requireNonNull(id, "id");
        Objects.requireNonNull(filters, "filters");
        this.streamingClient = Objects.requireNonNull(streamingClient, "streamingClient");
        this.executor = Objects.requireNonNull(executor, "executor");
        this.retryPolicy = retryPolicy != null ? retryPolicy : RetryPolicy.noRetries();
        this.eventListener = Objects.requireNonNull(eventListener, "eventListener");
        this.errorListener = errorListener != null ? errorListener : throwable -> log.warn("subscription error", throwable);
        this.relaySupplier = Objects.requireNonNull(relaySupplier, "relaySupplier");
        this.closeStreamingClientOnShutdown = closeStreamingClientOnShutdown;
        this.baseFilterables = NostrSubscription.extractBaseFilterables(filters);
        this.limit = filters.getLimit();
        this.cursor = NostrSubscription.extractInitialCursor(filters);
        this.decoder = new BaseMessageDecoder();
        this.closed = new AtomicBoolean(false);
        this.reconnecting = new AtomicBoolean(false);
        this.activeHandle = new AtomicReference();
        this.cursorLock = new Object();
        this.dedupeLock = new Object();
        this.recentEventIds = new ArrayDeque<String>(2048);
        this.recentEventSet = new HashSet<String>(2048);
        this.dispatcher = Executors.newSingleThreadExecutor(NostrSubscription.newSubscriptionThreadFactory(id));
        this.open();
    }

    public String getId() {
        return this.id;
    }

    public void replay() {
        this.replayFromCursor(this.getCursor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void replayFromCursor(long since) {
        Object object = this.cursorLock;
        synchronized (object) {
            if (since > this.cursor) {
                this.cursor = since;
            }
        }
        this.scheduleReconnect();
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        AutoCloseable handle = this.activeHandle.getAndSet(null);
        if (handle != null) {
            try {
                handle.close();
            }
            catch (Exception ex) {
                log.warn("subscription close_failed", (Throwable)ex);
            }
        }
        this.dispatcher.shutdownNow();
        if (this.closeStreamingClientOnShutdown) {
            try {
                this.streamingClient.close();
            }
            catch (Exception ex) {
                log.warn("subscription streaming_client_close_failed", (Throwable)ex);
            }
        }
    }

    private void open() {
        if (this.closed.get()) {
            return;
        }
        Map<String, String> relays = this.relaySupplier.get();
        Filters effectiveFilters = this.buildFilters();
        try {
            AutoCloseable closable = this.streamingClient.subscribe(effectiveFilters, this.id, this::handleMessage, this::handleError);
            AutoCloseable previous = this.activeHandle.getAndSet(closable);
            if (previous != null) {
                try {
                    previous.close();
                }
                catch (Exception ex) {
                    log.warn("subscription previous_close_failed", (Throwable)ex);
                }
            }
            log.debug("subscription established id={} relays={}", (Object)this.id, relays);
        }
        catch (RuntimeException ex) {
            log.warn("subscription establish_failed id={} cause={}", (Object)this.id, (Object)ex.getMessage());
            this.handleError(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Filters buildFilters() {
        long since;
        ArrayList<Filterable> filterables = new ArrayList<Filterable>(this.baseFilterables);
        Object object = this.cursorLock;
        synchronized (object) {
            since = this.cursor;
        }
        filterables.removeIf(filterable -> filterable instanceof SinceFilter);
        if (since > 0L) {
            filterables.add((Filterable)new SinceFilter(Long.valueOf(since)));
        }
        Filters filters = new Filters(filterables);
        if (this.limit != null) {
            filters.setLimit(this.limit);
        }
        return filters;
    }

    private void handleMessage(String message) {
        block9: {
            if (this.closed.get()) {
                return;
            }
            if (this.handleClosedFrame(message)) {
                return;
            }
            try {
                BaseMessage decoded = this.decoder.decode(message);
                if (decoded instanceof EventMessage) {
                    EventMessage eventMessage = (EventMessage)decoded;
                    this.processEventMessage(eventMessage);
                } else if (decoded instanceof NoticeMessage) {
                    NoticeMessage notice = (NoticeMessage)decoded;
                    log.info("subscription notice id={} message={}", (Object)this.id, (Object)notice.getMessage());
                } else if (decoded instanceof EoseMessage) {
                    log.debug("subscription eose id={}", (Object)this.id);
                }
            }
            catch (IllegalArgumentException | EventEncodingException ex) {
                if (this.handleClosedFrame(message)) break block9;
                log.warn("subscription decode_failed id={} error={}", (Object)this.id, (Object)ex.getMessage());
            }
        }
    }

    private void processEventMessage(EventMessage message) {
        if (!Objects.equals(this.id, message.getSubscriptionId())) {
            return;
        }
        GenericEvent event = (GenericEvent)message.getEvent();
        if (event == null) {
            return;
        }
        if (!this.registerEvent(event)) {
            return;
        }
        this.dispatcher.submit(() -> this.deliverEvent(event));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean registerEvent(GenericEvent event) {
        Long createdAt;
        String eventId = event.getId();
        if (eventId != null) {
            Object object = this.dedupeLock;
            synchronized (object) {
                if (this.recentEventSet.contains(eventId)) {
                    return false;
                }
                this.recentEventIds.addLast(eventId);
                this.recentEventSet.add(eventId);
                if (this.recentEventIds.size() > 2048) {
                    String oldest = this.recentEventIds.removeFirst();
                    this.recentEventSet.remove(oldest);
                }
            }
        }
        if ((createdAt = event.getCreatedAt()) != null) {
            Object object = this.cursorLock;
            synchronized (object) {
                if (createdAt > this.cursor) {
                    this.cursor = createdAt;
                }
            }
        }
        return true;
    }

    private void deliverEvent(GenericEvent event) {
        if (this.closed.get()) {
            return;
        }
        try {
            this.eventListener.accept(event);
        }
        catch (Exception ex) {
            log.warn("subscription listener_failed id={}", (Object)this.id, (Object)ex);
        }
    }

    private void handleError(Throwable throwable) {
        if (this.closed.get()) {
            return;
        }
        this.errorListener.accept(throwable);
        this.scheduleReconnect();
    }

    private void scheduleReconnect() {
        if (this.closed.get()) {
            return;
        }
        if (!this.reconnecting.compareAndSet(false, true)) {
            return;
        }
        this.executor.execute(() -> {
            try {
                AutoCloseable handle = this.activeHandle.getAndSet(null);
                if (handle != null) {
                    try {
                        handle.close();
                    }
                    catch (Exception ex) {
                        log.warn("subscription close_on_reconnect_failed id={}", (Object)this.id, (Object)ex);
                    }
                }
                this.open();
            }
            finally {
                this.reconnecting.set(false);
            }
            return null;
        }, this.retryPolicy).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                log.warn("subscription reconnect_failed id={} cause={}", (Object)this.id, (Object)throwable.getMessage());
            }
        });
    }

    private static List<Filterable> extractBaseFilterables(Filters filters) {
        Collection values = filters.getFiltersMap().values();
        ArrayList<Filterable> flattened = new ArrayList<Filterable>();
        for (List entry : values) {
            flattened.addAll(entry);
        }
        return flattened;
    }

    private static long extractInitialCursor(Filters filters) {
        return filters.getFiltersMap().values().stream().flatMap(Collection::stream).filter(filterable -> filterable instanceof SinceFilter).map(filterable -> ((SinceFilter)filterable).getFilterableValue()).map(NostrSubscription::coerceToLong).filter(Objects::nonNull).mapToLong(Long::longValue).max().orElse(0L);
    }

    private static ThreadFactory newSubscriptionThreadFactory(String id) {
        return runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("nostr-sub-" + id);
            thread.setDaemon(true);
            return thread;
        };
    }

    private static Long coerceToLong(Object value) {
        if (value == null) {
            return null;
        }
        if (value instanceof Number) {
            Number number = (Number)value;
            return number.longValue();
        }
        try {
            return Long.parseLong(value.toString());
        }
        catch (NumberFormatException ex) {
            log.warn("subscription invalid_cursor_value value={} reason={}", value, (Object)ex.getMessage());
            return null;
        }
    }

    private boolean handleClosedFrame(String rawMessage) {
        if (rawMessage == null || !rawMessage.toUpperCase().contains("\"CLOSED\"")) {
            return false;
        }
        try {
            String subscriptionId;
            JsonNode node = JSON.readTree(rawMessage);
            if (!node.isArray() || node.size() < 2) {
                return true;
            }
            String command = node.get(0).asText();
            if (!"CLOSED".equalsIgnoreCase(command)) {
                return false;
            }
            String string = subscriptionId = node.get(1).isNull() ? null : node.get(1).asText();
            if (!Objects.equals(this.id, subscriptionId)) {
                return true;
            }
            String reason = node.size() > 2 && !node.get(2).isNull() ? node.get(2).asText() : null;
            log.warn("subscription closed_by_relay id={} reason={}", (Object)this.id, reason);
            this.handleError(new RelayClosedException(reason));
            return true;
        }
        catch (JsonProcessingException parseError) {
            log.warn("subscription closed_decode_failed id={} error={}", (Object)this.id, (Object)parseError.getOriginalMessage());
            return true;
        }
    }

    private static String toString(Object value) {
        return value != null ? value.toString() : null;
    }

    @Generated
    public long getCursor() {
        return this.cursor;
    }

    private static final class RelayClosedException
    extends RuntimeException {
        private RelayClosedException(String reason) {
            super((String)(reason != null && !reason.isBlank() ? "relay closed subscription: " + reason : "relay closed subscription"));
        }
    }
}

