package io.lettuce.core;

import com.aliyun.openservices.log.common.Consts;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.output.StreamingOutput;
import io.lettuce.core.protocol.CommandWrapper;
import io.lettuce.core.protocol.DemandAware;
import io.lettuce.core.protocol.RedisCommand;
import io.netty.util.Recycler;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher.class */
class RedisPublisher<K, V, T> implements Publisher<T> {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance((Class<?>) RedisPublisher.class);
    private final boolean traceEnabled;
    private final Supplier<? extends RedisCommand<K, V, T>> commandSupplier;
    private final AtomicReference<RedisCommand<K, V, T>> ref;
    private final StatefulConnection<K, V> connection;
    private final boolean dissolve;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$CommandDispatch.class */
    public enum CommandDispatch {
        UNDISPATCHED { // from class: io.lettuce.core.RedisPublisher.CommandDispatch.1
            @Override // io.lettuce.core.RedisPublisher.CommandDispatch
            void dispatch(RedisSubscription<?> redisSubscription) {
                if (RedisSubscription.COMMAND_DISPATCH.compareAndSet(redisSubscription, this, DISPATCHED)) {
                    redisSubscription.dispatchCommand();
                }
            }
        },
        DISPATCHED;

        void dispatch(RedisSubscription<?> redisSubscription) {
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$CompositeSubscriber.class */
    private static class CompositeSubscriber<T> extends StreamingOutput.Subscriber<T> {
        private final StreamingOutput.Subscriber<T> first;
        private final StreamingOutput.Subscriber<T> second;

        public CompositeSubscriber(StreamingOutput.Subscriber<T> subscriber, StreamingOutput.Subscriber<T> subscriber2) {
            this.first = subscriber;
            this.second = subscriber2;
        }

        @Override // io.lettuce.core.output.StreamingOutput.Subscriber
        public void onNext(T t) {
            throw new UnsupportedOperationException();
        }

        @Override // io.lettuce.core.output.StreamingOutput.Subscriber
        public void onNext(Collection<T> collection, T t) {
            this.first.onNext(collection, t);
            this.second.onNext(collection, t);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$ImmediateSubscriber.class */
    public static class ImmediateSubscriber<T> implements RedisSubscriber<T> {
        private final CoreSubscriber<T> delegate;

        public ImmediateSubscriber(Subscriber<T> subscriber) {
            this.delegate = reactor.core.publisher.Operators.toCoreSubscriber(subscriber);
        }

        @Override // reactor.core.CoreSubscriber
        public Context currentContext() {
            return this.delegate.currentContext();
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.delegate.onSubscribe(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            this.delegate.onNext(t);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.delegate.onError(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.delegate.onComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$OnComplete.class */
    public static class OnComplete implements Runnable {
        private static final Recycler<OnComplete> RECYCLER = new Recycler<OnComplete>() { // from class: io.lettuce.core.RedisPublisher.OnComplete.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public OnComplete newObject(Recycler.Handle<OnComplete> handle) {
                return new OnComplete(handle);
            }
        };
        private final Recycler.Handle<OnComplete> handle;
        private Throwable signal;
        private Subscriber<?> subscriber;

        OnComplete(Recycler.Handle<OnComplete> handle) {
            this.handle = handle;
        }

        static OnComplete newInstance(Throwable th, Subscriber<?> subscriber) {
            OnComplete onComplete = RECYCLER.get();
            onComplete.signal = th;
            onComplete.subscriber = subscriber;
            return onComplete;
        }

        static OnComplete newInstance(Subscriber<?> subscriber) {
            OnComplete onComplete = RECYCLER.get();
            onComplete.signal = null;
            onComplete.subscriber = subscriber;
            return onComplete;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.signal != null) {
                    this.subscriber.onError(this.signal);
                } else {
                    this.subscriber.onComplete();
                }
            } finally {
                recycle();
            }
        }

        private void recycle() {
            this.signal = null;
            this.subscriber = null;
            this.handle.recycle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$OnNext.class */
    public static class OnNext implements Runnable {
        private static final Recycler<OnNext> RECYCLER = new Recycler<OnNext>() { // from class: io.lettuce.core.RedisPublisher.OnNext.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public OnNext newObject(Recycler.Handle<OnNext> handle) {
                return new OnNext(handle);
            }
        };
        private final Recycler.Handle<OnNext> handle;
        private Object signal;
        private Subscriber<Object> subscriber;

        OnNext(Recycler.Handle<OnNext> handle) {
            this.handle = handle;
        }

        static OnNext newInstance(Object obj, Subscriber<?> subscriber) {
            OnNext onNext = RECYCLER.get();
            onNext.signal = obj;
            onNext.subscriber = subscriber;
            return onNext;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.subscriber.onNext(this.signal);
            } finally {
                recycle();
            }
        }

        private void recycle() {
            this.signal = null;
            this.subscriber = null;
            this.handle.recycle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$PublishOnSubscriber.class */
    public static class PublishOnSubscriber<T> implements RedisSubscriber<T> {
        private final CoreSubscriber<T> delegate;
        private final Executor executor;

        public PublishOnSubscriber(Subscriber<T> subscriber, Executor executor) {
            this.delegate = reactor.core.publisher.Operators.toCoreSubscriber(subscriber);
            this.executor = executor;
        }

        @Override // reactor.core.CoreSubscriber
        public Context currentContext() {
            return this.delegate.currentContext();
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.delegate.onSubscribe(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            this.executor.execute(OnNext.newInstance(t, this.delegate));
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.executor.execute(OnComplete.newInstance(th, this.delegate));
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.executor.execute(OnComplete.newInstance(this.delegate));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$RedisSubscriber.class */
    public interface RedisSubscriber<T> extends CoreSubscriber<T> {
        static <T> RedisSubscriber<T> create(Subscriber<?> subscriber, Executor executor) {
            return executor == ImmediateEventExecutor.INSTANCE ? new ImmediateSubscriber(subscriber) : new PublishOnSubscriber(subscriber, executor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$RedisSubscription.class */
    public static class RedisSubscription<T> extends StreamingOutput.Subscriber<T> implements Subscription {
        static final int ST_PROGRESS = 0;
        static final int ST_COMPLETED = 1;
        private final SubscriptionCommand<?, ?, T> subscriptionCommand;
        final StatefulConnection<?, ?> connection;
        final RedisCommand<?, ?, T> command;
        final boolean dissolve;
        private final Executor executor;
        volatile long demand;
        volatile RedisSubscriber<? super T> subscriber;
        static final InternalLogger LOG = InternalLoggerFactory.getInstance((Class<?>) RedisPublisher.class);
        static final AtomicLongFieldUpdater<RedisSubscription> DEMAND = AtomicLongFieldUpdater.newUpdater(RedisSubscription.class, "demand");
        static final AtomicReferenceFieldUpdater<RedisSubscription, State> STATE = AtomicReferenceFieldUpdater.newUpdater(RedisSubscription.class, State.class, Consts.JOB_INSTANCES_STATE);
        static final AtomicReferenceFieldUpdater<RedisSubscription, CommandDispatch> COMMAND_DISPATCH = AtomicReferenceFieldUpdater.newUpdater(RedisSubscription.class, CommandDispatch.class, "commandDispatch");
        private final boolean traceEnabled = LOG.isTraceEnabled();
        final Queue<T> data = Operators.newQueue();
        volatile State state = State.UNSUBSCRIBED;
        volatile CommandDispatch commandDispatch = CommandDispatch.UNDISPATCHED;
        volatile boolean allDataRead = false;

        RedisSubscription(StatefulConnection<?, ?> statefulConnection, RedisCommand<?, ?, T> redisCommand, boolean z, Executor executor) {
            LettuceAssert.notNull(statefulConnection, "Connection must not be null");
            LettuceAssert.notNull(redisCommand, "RedisCommand must not be null");
            LettuceAssert.notNull(executor, "Executor must not be null");
            this.connection = statefulConnection;
            this.command = redisCommand;
            this.dissolve = z;
            this.executor = executor;
            if (redisCommand.getOutput() instanceof StreamingOutput) {
                StreamingOutput streamingOutput = (StreamingOutput) redisCommand.getOutput();
                if ((statefulConnection instanceof StatefulRedisConnection) && ((StatefulRedisConnection) statefulConnection).isMulti()) {
                    streamingOutput.setSubscriber(new CompositeSubscriber(this, streamingOutput.getSubscriber()));
                } else {
                    streamingOutput.setSubscriber(this);
                }
            }
            this.subscriptionCommand = new SubscriptionCommand<>(redisCommand, this, z);
        }

        void subscribe(Subscriber<? super T> subscriber) {
            if (subscriber == null) {
                throw new NullPointerException("Subscriber must not be null");
            }
            State state = state();
            if (this.traceEnabled) {
                LOG.trace("{} subscribe: {}@{}", state, subscriber.getClass().getName(), Integer.valueOf(subscriber.hashCode()));
            }
            state.subscribe(this, subscriber);
        }

        @Override // org.reactivestreams.Subscription
        public final void request(long j) {
            State state = state();
            if (this.traceEnabled) {
                LOG.trace("{} request: {}", state, Long.valueOf(j));
            }
            state.request(this, j);
        }

        @Override // org.reactivestreams.Subscription
        public final void cancel() {
            State state = state();
            if (this.traceEnabled) {
                LOG.trace("{} cancel", state);
            }
            state.cancel(this);
        }

        @Override // io.lettuce.core.output.StreamingOutput.Subscriber
        public void onNext(T t) {
            LettuceAssert.notNull(t, "Data must not be null");
            if (state() == State.COMPLETED) {
                return;
            }
            if (this.data.isEmpty() && state() == State.DEMAND && getDemand() > 0) {
                try {
                    DEMAND.decrementAndGet(this);
                    this.subscriber.onNext(t);
                    return;
                } catch (Exception e) {
                    onError(e);
                    return;
                }
            }
            if (this.data.offer(t)) {
                onDataAvailable();
                return;
            }
            RedisSubscriber<? super T> redisSubscriber = this.subscriber;
            Context empty = Context.empty();
            if (redisSubscriber instanceof CoreSubscriber) {
                empty = redisSubscriber.currentContext();
            }
            onError(Operators.onOperatorError(this, Exceptions.failWithOverflow(), t, empty));
        }

        final void onDataAvailable() {
            State state = state();
            if (this.traceEnabled) {
                LOG.trace("{} onDataAvailable()", state);
            }
            state.onDataAvailable(this);
        }

        final void onAllDataRead() {
            State state = state();
            if (this.traceEnabled) {
                LOG.trace("{} onAllDataRead()", state);
            }
            this.allDataRead = true;
            onDataAvailable();
        }

        final void onError(Throwable th) {
            State state = state();
            if (LOG.isErrorEnabled()) {
                LOG.trace("{} onError(): {}", state, th.toString(), th);
            }
            state.onError(this, th);
        }

        protected T read() {
            return this.data.poll();
        }

        boolean hasDemand() {
            return getDemand() > 0;
        }

        private long getDemand() {
            return DEMAND.get(this);
        }

        boolean changeState(State state, State state2) {
            return STATE.compareAndSet(this, state, state2);
        }

        boolean afterRead() {
            return changeState(State.READING, getDemand() > 0 ? State.DEMAND : State.NO_DEMAND);
        }

        public boolean complete() {
            return changeState(State.READING, State.COMPLETED);
        }

        void checkCommandDispatch() {
            COMMAND_DISPATCH.get(this).dispatch(this);
        }

        void dispatchCommand() {
            this.connection.dispatch(this.subscriptionCommand);
        }

        void checkOnDataAvailable() {
            if (this.data.isEmpty()) {
                potentiallyReadMore();
            }
            if (this.data.isEmpty()) {
                return;
            }
            onDataAvailable();
        }

        void potentiallyReadMore() {
            if (getDemand() > this.data.size() - 1) {
                state().readData(this);
            }
        }

        void readAndPublish() {
            T read;
            while (hasDemand() && (read = read()) != null) {
                DEMAND.decrementAndGet(this);
                this.subscriber.onNext(read);
            }
        }

        State state() {
            return STATE.get(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$State.class */
    public enum State {
        UNSUBSCRIBED { // from class: io.lettuce.core.RedisPublisher.State.1
            @Override // io.lettuce.core.RedisPublisher.State
            void subscribe(RedisSubscription<?> redisSubscription, Subscriber<?> subscriber) {
                LettuceAssert.notNull(subscriber, "Subscriber must not be null");
                if (!redisSubscription.changeState(this, NO_DEMAND)) {
                    throw new IllegalStateException(toString());
                }
                redisSubscription.subscriber = RedisSubscriber.create(subscriber, ((RedisSubscription) redisSubscription).executor);
                subscriber.onSubscribe(redisSubscription);
            }
        },
        NO_DEMAND { // from class: io.lettuce.core.RedisPublisher.State.2
            @Override // io.lettuce.core.RedisPublisher.State
            void request(RedisSubscription<?> redisSubscription, long j) {
                if (!Operators.request(RedisSubscription.DEMAND, redisSubscription, j)) {
                    onError(redisSubscription, Exceptions.nullOrNegativeRequestException(j));
                    return;
                }
                if (redisSubscription.changeState(this, DEMAND)) {
                    try {
                        redisSubscription.checkCommandDispatch();
                    } catch (Exception e) {
                        redisSubscription.onError(e);
                    }
                    redisSubscription.checkOnDataAvailable();
                }
                redisSubscription.potentiallyReadMore();
                redisSubscription.state().onDataAvailable(redisSubscription);
            }
        },
        DEMAND { // from class: io.lettuce.core.RedisPublisher.State.3
            @Override // io.lettuce.core.RedisPublisher.State
            void onDataAvailable(RedisSubscription<?> redisSubscription) {
                while (read(redisSubscription) && redisSubscription.hasDemand()) {
                    try {
                    } catch (Exception e) {
                        redisSubscription.onError(e);
                        return;
                    }
                }
            }

            @Override // io.lettuce.core.RedisPublisher.State
            void request(RedisSubscription<?> redisSubscription, long j) {
                if (!Operators.request(RedisSubscription.DEMAND, redisSubscription, j)) {
                    onError(redisSubscription, Exceptions.nullOrNegativeRequestException(j));
                } else {
                    onDataAvailable(redisSubscription);
                    redisSubscription.potentiallyReadMore();
                }
            }

            private boolean read(RedisSubscription<?> redisSubscription) {
                State state = redisSubscription.state();
                if ((state != NO_DEMAND && state != DEMAND) || !redisSubscription.changeState(state, READING)) {
                    return false;
                }
                redisSubscription.readAndPublish();
                if (redisSubscription.allDataRead && redisSubscription.data.isEmpty()) {
                    state.onAllDataRead(redisSubscription);
                    return false;
                }
                redisSubscription.afterRead();
                return redisSubscription.allDataRead || !redisSubscription.data.isEmpty();
            }
        },
        READING { // from class: io.lettuce.core.RedisPublisher.State.4
            @Override // io.lettuce.core.RedisPublisher.State
            void request(RedisSubscription<?> redisSubscription, long j) {
                DEMAND.request(redisSubscription, j);
            }
        },
        COMPLETED { // from class: io.lettuce.core.RedisPublisher.State.5
            @Override // io.lettuce.core.RedisPublisher.State
            void request(RedisSubscription<?> redisSubscription, long j) {
            }

            @Override // io.lettuce.core.RedisPublisher.State
            void cancel(RedisSubscription<?> redisSubscription) {
            }

            @Override // io.lettuce.core.RedisPublisher.State
            void onAllDataRead(RedisSubscription<?> redisSubscription) {
            }

            @Override // io.lettuce.core.RedisPublisher.State
            void onError(RedisSubscription<?> redisSubscription, Throwable th) {
            }
        };

        void subscribe(RedisSubscription<?> redisSubscription, Subscriber<?> subscriber) {
            throw new IllegalStateException(toString());
        }

        void request(RedisSubscription<?> redisSubscription, long j) {
            throw new IllegalStateException(toString());
        }

        void cancel(RedisSubscription<?> redisSubscription) {
            redisSubscription.command.cancel();
            if (redisSubscription.changeState(this, COMPLETED)) {
                readData(redisSubscription);
            }
        }

        void readData(RedisSubscription<?> redisSubscription) {
            DemandAware.Source source = ((RedisSubscription) redisSubscription).subscriptionCommand.source;
            if (source != null) {
                source.requestMore();
            }
        }

        void onDataAvailable(RedisSubscription<?> redisSubscription) {
        }

        void onAllDataRead(RedisSubscription<?> redisSubscription) {
            if (redisSubscription.data.isEmpty() && redisSubscription.complete()) {
                readData(redisSubscription);
                RedisSubscriber<? super Object> redisSubscriber = redisSubscription.subscriber;
                if (redisSubscriber != null) {
                    redisSubscriber.onComplete();
                }
            }
        }

        void onError(RedisSubscription<?> redisSubscription, Throwable th) {
            RedisSubscriber<? super Object> redisSubscriber;
            do {
                State state = redisSubscription.state();
                if (state == COMPLETED || !redisSubscription.changeState(state, COMPLETED)) {
                    return;
                }
                readData(redisSubscription);
                redisSubscriber = redisSubscription.subscriber;
            } while (redisSubscriber == null);
            redisSubscriber.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lettuce-core-6.4.1.RELEASE.jar:io/lettuce/core/RedisPublisher$SubscriptionCommand.class */
    public static class SubscriptionCommand<K, V, T> extends CommandWrapper<K, V, T> implements DemandAware.Sink {
        private final boolean dissolve;
        private final RedisSubscription<T> subscription;
        private volatile DemandAware.Source source;

        public SubscriptionCommand(RedisCommand<K, V, T> redisCommand, RedisSubscription<T> redisSubscription, boolean z) {
            super(redisCommand);
            this.subscription = redisSubscription;
            this.dissolve = z;
        }

        @Override // io.lettuce.core.protocol.DemandAware.Sink
        public boolean hasDemand() {
            return isDone() || this.subscription.state() == State.COMPLETED || this.subscription.data.isEmpty();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.lettuce.core.protocol.CommandWrapper
        protected void doOnComplete() {
            if (getOutput() != null) {
                T t = getOutput().get();
                if (getOutput().hasError()) {
                    onError(ExceptionFactory.createExecutionException(getOutput().getError()));
                    return;
                }
                if (!(getOutput() instanceof StreamingOutput) && t != null) {
                    if (this.dissolve && (t instanceof Collection)) {
                        for (Object obj : (Collection) t) {
                            if (obj != null) {
                                this.subscription.onNext(obj);
                            }
                        }
                    } else {
                        this.subscription.onNext(t);
                    }
                }
            }
            this.subscription.onAllDataRead();
        }

        @Override // io.lettuce.core.protocol.DemandAware.Sink
        public void setSource(DemandAware.Source source) {
            this.source = source;
        }

        @Override // io.lettuce.core.protocol.DemandAware.Sink
        public void removeSource() {
            this.source = null;
        }

        @Override // io.lettuce.core.protocol.CommandWrapper
        protected void doOnError(Throwable th) {
            onError(th);
        }

        private void onError(Throwable th) {
            this.subscription.onError(th);
        }
    }

    public RedisPublisher(RedisCommand<K, V, T> redisCommand, StatefulConnection<K, V> statefulConnection, boolean z, Executor executor) {
        this(() -> {
            return redisCommand;
        }, statefulConnection, z, executor);
    }

    public RedisPublisher(Supplier<RedisCommand<K, V, T>> supplier, StatefulConnection<K, V> statefulConnection, boolean z, Executor executor) {
        this.traceEnabled = LOG.isTraceEnabled();
        LettuceAssert.notNull(supplier, "CommandSupplier must not be null");
        LettuceAssert.notNull(statefulConnection, "StatefulConnection must not be null");
        LettuceAssert.notNull(executor, "Executor must not be null");
        this.commandSupplier = supplier;
        this.connection = statefulConnection;
        this.dissolve = z;
        this.executor = executor;
        this.ref = new AtomicReference<>(supplier.get());
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        if (this.traceEnabled) {
            LOG.trace("subscribe: {}@{}", subscriber.getClass().getName(), Integer.valueOf(Objects.hashCode(subscriber)));
        }
        RedisCommand<K, V, T> redisCommand = this.ref.get();
        if (redisCommand == null) {
            redisCommand = this.commandSupplier.get();
        } else if (!this.ref.compareAndSet(redisCommand, null)) {
            redisCommand = this.commandSupplier.get();
        }
        new RedisSubscription(this.connection, redisCommand, this.dissolve, this.executor).subscribe(subscriber);
    }
}
