/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.AbstractRedisReactiveCommands;
import io.lettuce.core.FlushMode;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.MSetExArgs;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.StreamScanCursor;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RediSearchReactiveCommands;
import io.lettuce.core.api.reactive.RedisKeyReactiveCommands;
import io.lettuce.core.api.reactive.RedisScriptingReactiveCommands;
import io.lettuce.core.api.reactive.RedisServerReactiveCommands;
import io.lettuce.core.cluster.AsyncClusterConnectionProvider;
import io.lettuce.core.cluster.ClusterDistributionChannelWriter;
import io.lettuce.core.cluster.ClusterScanSupport;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ConnectionIntent;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.search.AggregationReply;
import io.lettuce.core.search.HybridReply;
import io.lettuce.core.search.SearchReply;
import io.lettuce.core.search.SpellCheckResult;
import io.lettuce.core.search.arguments.AggregateArgs;
import io.lettuce.core.search.arguments.CreateArgs;
import io.lettuce.core.search.arguments.ExplainArgs;
import io.lettuce.core.search.arguments.FieldArgs;
import io.lettuce.core.search.arguments.HybridArgs;
import io.lettuce.core.search.arguments.SearchArgs;
import io.lettuce.core.search.arguments.SpellCheckArgs;
import io.lettuce.core.search.arguments.SynUpdateArgs;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedisAdvancedClusterReactiveCommandsImpl<K, V>
extends AbstractRedisReactiveCommands<K, V>
implements RedisAdvancedClusterReactiveCommands<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisAdvancedClusterReactiveCommandsImpl.class);
    private static final Predicate<RedisClusterNode> ALL_NODES = node -> true;
    private final RedisCodec<K, V> codec;

    @Deprecated
    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec, Supplier<JsonParser> parser) {
        super(connection, codec, parser);
        this.codec = codec;
    }

    @Deprecated
    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.codec = codec;
    }

    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec, Supplier<JsonParser> parser) {
        super(connection, codec, parser);
        this.codec = codec;
    }

    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.codec = codec;
    }

    @Override
    public Mono<String> clientSetname(K name) {
        ArrayList<Object> publishers = new ArrayList<Object>();
        publishers.add(super.clientSetname(name));
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            Mono<StatefulRedisConnection<K, V>> byNodeId = this.getStatefulConnection(redisClusterNode.getNodeId());
            publishers.add(byNodeId.flatMap(conn -> {
                if (conn.isOpen()) {
                    return conn.reactive().clientSetname(name);
                }
                return Mono.empty();
            }));
            Mono<StatefulRedisConnection<K, V>> byHost = this.getStatefulConnection(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort());
            publishers.add(byHost.flatMap(conn -> {
                if (conn.isOpen()) {
                    return conn.reactive().clientSetname(name);
                }
                return Mono.empty();
            }));
        }
        return Flux.merge(publishers).last();
    }

    @Override
    public Mono<Long> clusterCountKeysInSlot(int slot) {
        Mono<RedisClusterReactiveCommands<K, V>> connectionBySlot = this.findConnectionBySlotReactive(slot);
        return connectionBySlot.flatMap(cmd -> cmd.clusterCountKeysInSlot(slot));
    }

    @Override
    public Flux<K> clusterGetKeysInSlot(int slot, int count) {
        Mono<RedisClusterReactiveCommands<K, V>> connectionBySlot = this.findConnectionBySlotReactive(slot);
        return connectionBySlot.flatMapMany(conn -> conn.clusterGetKeysInSlot(slot, count));
    }

    @Override
    public Mono<Long> dbsize() {
        Map publishers = this.executeOnUpstream(RedisServerReactiveCommands::dbsize);
        return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Long> del(K ... keys) {
        return this.del((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> del(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.del(keys);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.del((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Long> exists(K ... keys) {
        return this.exists((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> exists(Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.exists(keyList);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.exists((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<String> flushall() {
        Map publishers = this.executeOnUpstream(RedisServerReactiveCommands::flushall);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> flushall(FlushMode flushMode) {
        Map publishers = this.executeOnUpstream(it -> it.flushall(flushMode));
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> flushallAsync() {
        Map publishers = this.executeOnUpstream(RedisServerReactiveCommands::flushallAsync);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> flushdb() {
        Map publishers = this.executeOnUpstream(RedisServerReactiveCommands::flushdb);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> flushdb(FlushMode flushMode) {
        Map publishers = this.executeOnUpstream(it -> it.flushdb(flushMode));
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Flux<K> keys(String pattern) {
        Map publishers = this.executeOnUpstream(commands -> commands.keys(pattern));
        return Flux.merge(publishers.values());
    }

    @Override
    @Deprecated
    public Flux<K> keysLegacy(K pattern) {
        Map publishers = this.executeOnUpstream(commands -> commands.keysLegacy(pattern));
        return Flux.merge(publishers.values());
    }

    @Override
    public Mono<Long> keys(KeyStreamingChannel<K> channel, String pattern) {
        Map publishers = this.executeOnUpstream(commands -> commands.keys(channel, pattern));
        return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
    }

    @Override
    @Deprecated
    public Mono<Long> keysLegacy(KeyStreamingChannel<K> channel, K pattern) {
        Map publishers = this.executeOnUpstream(commands -> commands.keysLegacy(channel, pattern));
        return Flux.merge(publishers.values()).reduce((accu, next) -> accu + next);
    }

    @Override
    public Flux<KeyValue<K, V>> mget(K ... keys) {
        return this.mget((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
        List keyList = LettuceLists.newList(keys);
        Map partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.mget(keyList);
        }
        List publishers = partitioned.values().stream().map(x$0 -> super.mget(x$0)).collect(Collectors.toList());
        return Flux.mergeSequential(publishers).collectList().map(results -> {
            KeyValue[] values = new KeyValue[keyList.size()];
            int offset = 0;
            for (List partitionKeys : partitioned.values()) {
                for (int i = 0; i < keyList.size(); ++i) {
                    int index = partitionKeys.indexOf(keyList.get(i));
                    if (index == -1) continue;
                    values[i] = (KeyValue)results.get(offset + index);
                }
                offset += partitionKeys.size();
            }
            return Arrays.asList(values);
        }).flatMapMany(Flux::fromIterable);
    }

    @Override
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> channel, K ... keys) {
        return this.mget(channel, (Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> channel, Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.mget(channel, keyList);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.mget(channel, (Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce(Long::sum);
    }

    @Override
    public Mono<Boolean> msetnx(Map<K, V> map) {
        return this.pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.msetnx(kvMap).flux(), booleanFlux -> booleanFlux).reduce((accu, next) -> accu != false && next != false);
    }

    @Override
    public Mono<String> mset(Map<K, V> map) {
        return this.pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.mset(kvMap).flux(), booleanFlux -> booleanFlux).last();
    }

    @Override
    public Mono<Boolean> msetex(Map<K, V> map, MSetExArgs args) {
        return this.pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.msetex(kvMap, args).flux(), booleanFlux -> booleanFlux).reduce((accu, next) -> accu != false && next != false);
    }

    @Override
    public Mono<K> randomkey() {
        Partitions partitions = this.getStatefulConnection().getPartitions();
        if (partitions.isEmpty()) {
            return super.randomkey();
        }
        int index = ThreadLocalRandom.current().nextInt(partitions.size());
        Mono<RedisClusterReactiveCommands<K, V>> connection = this.getConnectionReactive(partitions.getPartition(index).getNodeId());
        return connection.flatMap(RedisKeyReactiveCommands::randomkey);
    }

    @Override
    public Mono<String> scriptFlush() {
        Map publishers = this.executeOnNodes(RedisScriptingReactiveCommands::scriptFlush, ALL_NODES);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<String> scriptKill() {
        Map publishers = this.executeOnNodes(RedisScriptingReactiveCommands::scriptKill, ALL_NODES);
        return Flux.merge(publishers.values()).onErrorReturn((Object)"OK").last();
    }

    @Override
    public Mono<String> scriptLoad(byte[] script) {
        Map publishers = this.executeOnNodes(commands -> commands.scriptLoad(script), ALL_NODES);
        return Flux.merge(publishers.values()).last();
    }

    @Override
    public Mono<Void> shutdown(boolean save) {
        Map publishers = this.executeOnNodes(commands -> commands.shutdown(save), ALL_NODES);
        return Flux.merge(publishers.values()).then();
    }

    @Override
    public Mono<Long> touch(K ... keys) {
        return this.touch((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> touch(Iterable<K> keys) {
        List<K> keyList = LettuceLists.newList(keys);
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keyList);
        if (partitioned.size() < 2) {
            return super.touch(keyList);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.touch((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public Mono<Long> unlink(K ... keys) {
        return this.unlink((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public Mono<Long> unlink(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.unlink(keys);
        }
        ArrayList<Mono<Long>> publishers = new ArrayList<Mono<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            publishers.add(super.unlink((Iterable)entry.getValue()));
        }
        return Flux.merge(publishers).reduce((accu, next) -> accu + next);
    }

    @Override
    public RedisClusterReactiveCommands<K, V> getConnection(String nodeId) {
        return this.getStatefulConnection().getConnection(nodeId).reactive();
    }

    private Mono<StatefulRedisConnection<K, V>> getStatefulConnection(String nodeId) {
        return RedisAdvancedClusterReactiveCommandsImpl.getMono(this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId));
    }

    private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String nodeId) {
        return RedisAdvancedClusterReactiveCommandsImpl.getMono(this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, nodeId)).map(StatefulRedisConnection::reactive);
    }

    @Override
    public RedisClusterReactiveCommands<K, V> getConnection(String host, int port) {
        return this.getStatefulConnection().getConnection(host, port).reactive();
    }

    private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String host, int port) {
        return RedisAdvancedClusterReactiveCommandsImpl.getMono(this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port)).map(StatefulRedisConnection::reactive);
    }

    private Mono<StatefulRedisConnection<K, V>> getStatefulConnection(String host, int port) {
        return RedisAdvancedClusterReactiveCommandsImpl.getMono(this.getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, host, port));
    }

    @Override
    public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnection)super.getConnection();
    }

    private Mono<StatefulRedisConnection<K, V>> getStatefulConnection(ConnectionIntent intent) {
        return RedisAdvancedClusterReactiveCommandsImpl.getMono(this.getConnectionProvider().getRandomConnectionAsync(intent));
    }

    @Override
    public Mono<KeyScanCursor<K>> scan() {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(scanArgs), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor, scanArgs), ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, RedisKeyReactiveCommands::scan, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel, scanArgs), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor, scanArgs), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor), ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override
    public Mono<AggregationReply<K, V>> ftAggregate(String index, V query, AggregateArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftAggregate(index, query, args), (String nodeId, RedisClusterReactiveCommands<K, V> conn) -> conn.ftAggregate(index, query, args).mapNotNull(reply -> {
            if (reply != null) {
                reply.getCursor().filter(c -> c.getCursorId() > 0L).ifPresent(c -> c.setNodeId((String)nodeId));
            }
            return reply;
        }), (ProtocolKeyword)CommandType.FT_AGGREGATE);
    }

    @Override
    public Mono<AggregationReply<K, V>> ftAggregate(String index, V query) {
        return this.ftAggregate(index, query, null);
    }

    @Override
    public Mono<SearchReply<K, V>> ftSearch(String index, V query, SearchArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftSearch(index, query, args), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSearch(index, query, args), (ProtocolKeyword)CommandType.FT_SEARCH);
    }

    @Override
    public Mono<SearchReply<K, V>> ftSearch(String index, V query) {
        return this.ftSearch(index, query, SearchArgs.builder().build());
    }

    @Override
    public Mono<HybridReply<K, V>> ftHybrid(String index, HybridArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftHybrid(index, args), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftHybrid(index, args), (ProtocolKeyword)CommandType.FT_HYBRID);
    }

    @Override
    public Mono<String> ftExplain(String index, V query) {
        return this.routeKeyless(() -> super.ftExplain(index, query), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftExplain(index, query), (ProtocolKeyword)CommandType.FT_EXPLAIN);
    }

    @Override
    public Mono<String> ftExplain(String index, V query, ExplainArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftExplain(index, query, args), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftExplain(index, query, args), (ProtocolKeyword)CommandType.FT_EXPLAIN);
    }

    @Override
    public Flux<V> ftTagvals(String index, String fieldName) {
        return this.routeKeylessMany(() -> super.ftTagvals(index, fieldName), conn -> conn.ftTagvals(index, fieldName), CommandType.FT_TAGVALS);
    }

    @Override
    public Mono<SpellCheckResult<V>> ftSpellcheck(String index, V query) {
        return this.routeKeyless(() -> super.ftSpellcheck(index, query), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSpellcheck(index, query), (ProtocolKeyword)CommandType.FT_SPELLCHECK);
    }

    @Override
    public Mono<SpellCheckResult<V>> ftSpellcheck(String index, V query, SpellCheckArgs<K, V> args) {
        return this.routeKeyless(() -> super.ftSpellcheck(index, query, args), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSpellcheck(index, query, args), (ProtocolKeyword)CommandType.FT_SPELLCHECK);
    }

    @Override
    public Mono<Long> ftDictadd(String dict, V ... terms) {
        return this.routeKeyless(() -> super.ftDictadd(dict, terms), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftDictadd(dict, terms), (ProtocolKeyword)CommandType.FT_DICTADD);
    }

    @Override
    public Mono<Long> ftDictdel(String dict, V ... terms) {
        return this.routeKeyless(() -> super.ftDictdel(dict, terms), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftDictdel(dict, terms), (ProtocolKeyword)CommandType.FT_DICTDEL);
    }

    @Override
    public Flux<V> ftDictdump(String dict) {
        return this.routeKeylessMany(() -> super.ftDictdump(dict), conn -> conn.ftDictdump(dict), CommandType.FT_DICTDUMP);
    }

    @Override
    public Mono<String> ftAliasadd(String alias, String index) {
        return this.routeKeyless(() -> super.ftAliasadd(alias, index), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftAliasadd(alias, index), (ProtocolKeyword)CommandType.FT_ALIASADD);
    }

    @Override
    public Mono<String> ftAliasupdate(String alias, String index) {
        return this.routeKeyless(() -> super.ftAliasupdate(alias, index), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftAliasupdate(alias, index), (ProtocolKeyword)CommandType.FT_ALIASUPDATE);
    }

    @Override
    public Mono<String> ftAliasdel(String alias) {
        return this.routeKeyless(() -> super.ftAliasdel(alias), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftAliasdel(alias), (ProtocolKeyword)CommandType.FT_ALIASDEL);
    }

    @Override
    public Mono<String> ftCreate(String index, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftCreate(index, fieldArgs), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftCreate(index, fieldArgs), (ProtocolKeyword)CommandType.FT_CREATE);
    }

    @Override
    public Mono<String> ftCreate(String index, CreateArgs<K, V> arguments, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftCreate(index, arguments, fieldArgs), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftCreate(index, arguments, fieldArgs), (ProtocolKeyword)CommandType.FT_CREATE);
    }

    @Override
    public Mono<String> ftAlter(String index, boolean skipInitialScan, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftAlter(index, skipInitialScan, fieldArgs), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftAlter(index, skipInitialScan, fieldArgs), (ProtocolKeyword)CommandType.FT_ALTER);
    }

    @Override
    public Mono<String> ftAlter(String index, List<FieldArgs<K>> fieldArgs) {
        return this.routeKeyless(() -> super.ftAlter(index, fieldArgs), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftAlter(index, fieldArgs), (ProtocolKeyword)CommandType.FT_ALTER);
    }

    @Override
    public Mono<String> ftDropindex(String index, boolean deleteDocumentKeys) {
        return this.routeKeyless(() -> super.ftDropindex(index, deleteDocumentKeys), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftDropindex(index, deleteDocumentKeys), (ProtocolKeyword)CommandType.FT_DROPINDEX);
    }

    @Override
    public Mono<String> ftDropindex(String index) {
        return this.routeKeyless(() -> super.ftDropindex(index), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftDropindex(index), (ProtocolKeyword)CommandType.FT_DROPINDEX);
    }

    @Override
    public Mono<Map<V, List<V>>> ftSyndump(String index) {
        return this.routeKeyless(() -> super.ftSyndump(index), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSyndump(index), (ProtocolKeyword)CommandType.FT_SYNDUMP);
    }

    @Override
    public Mono<String> ftSynupdate(String index, V synonymGroupId, V ... terms) {
        return this.routeKeyless(() -> super.ftSynupdate(index, synonymGroupId, terms), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSynupdate(index, synonymGroupId, terms), (ProtocolKeyword)CommandType.FT_SYNUPDATE);
    }

    @Override
    public Mono<String> ftSynupdate(String index, V synonymGroupId, SynUpdateArgs<K, V> args, V ... terms) {
        return this.routeKeyless(() -> super.ftSynupdate(index, synonymGroupId, args, terms), (RedisClusterReactiveCommands<K, V> conn) -> conn.ftSynupdate(index, synonymGroupId, args, terms), (ProtocolKeyword)CommandType.FT_SYNUPDATE);
    }

    @Override
    public Flux<V> ftList() {
        return this.routeKeylessMany(() -> super.ftList(), RediSearchReactiveCommands::ftList, CommandType.FT_LIST);
    }

    @Override
    public Mono<AggregationReply<K, V>> ftCursorread(String index, AggregationReply.Cursor cursor, int count) {
        if (cursor == null) {
            return Mono.error((Throwable)new IllegalArgumentException("cursor must not be null"));
        }
        long cursorId = cursor.getCursorId();
        if (cursorId <= 0L) {
            return Mono.just(new AggregationReply());
        }
        Optional<String> nodeIdOpt = cursor.getNodeId();
        if (!nodeIdOpt.isPresent()) {
            return Mono.error((Throwable)new IllegalArgumentException("Cursor missing nodeId; cannot route cursor READ in cluster mode"));
        }
        String nodeId = nodeIdOpt.get();
        StatefulRedisConnection<K, V> byNode = this.getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
        return byNode.reactive().ftCursorread(index, cursor, count).map(reply -> {
            if (reply != null) {
                reply.getCursor().ifPresent(c -> c.setNodeId(nodeId));
            }
            return reply;
        });
    }

    @Override
    public Mono<AggregationReply<K, V>> ftCursorread(String index, AggregationReply.Cursor cursor) {
        return this.ftCursorread(index, cursor, -1);
    }

    @Override
    public Mono<String> ftCursordel(String index, AggregationReply.Cursor cursor) {
        if (cursor == null) {
            return Mono.error((Throwable)new IllegalArgumentException("cursor must not be null"));
        }
        long cursorId = cursor.getCursorId();
        if (cursorId <= 0L) {
            return Mono.just((Object)"OK");
        }
        Optional<String> nodeIdOpt = cursor.getNodeId();
        if (!nodeIdOpt.isPresent()) {
            return Mono.error((Throwable)new IllegalArgumentException("Cursor missing nodeId; cannot route cursor DEL in cluster mode"));
        }
        String nodeId = nodeIdOpt.get();
        StatefulRedisConnection<K, V> byNode = this.getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
        return byNode.reactive().ftCursordel(index, cursor);
    }

    <R> Mono<R> routeKeyless(Supplier<Mono<R>> superCall, Function<RedisClusterReactiveCommands<K, V>, Mono<R>> routedCall, ProtocolKeyword commandType) {
        ConnectionIntent intent = this.getConnectionIntent(commandType);
        return this.getStatefulConnection(intent).map(StatefulRedisConnection::reactive).flatMap(routedCall).onErrorResume(err -> {
            logger.error("Cluster routing failed for {} - falling back to superCall", (Object)commandType, err);
            return (Mono)superCall.get();
        });
    }

    <R> Flux<R> routeKeylessMany(Supplier<Flux<R>> superCall, Function<RedisClusterReactiveCommands<K, V>, Flux<R>> routedCall, ProtocolKeyword commandType) {
        ConnectionIntent intent = this.getConnectionIntent(commandType);
        return this.getStatefulConnection(intent).map(StatefulRedisConnection::reactive).flatMapMany(routedCall).onErrorResume(err -> {
            logger.error("Cluster routing failed for {} - falling back to superCall", (Object)commandType, err);
            return (Publisher)superCall.get();
        });
    }

    <R> Mono<R> routeKeyless(Supplier<Mono<R>> superCall, BiFunction<String, RedisClusterReactiveCommands<K, V>, Mono<R>> routedCall, ProtocolKeyword commandType) {
        ConnectionIntent intent = this.getConnectionIntent(commandType);
        return this.getStatefulConnection(intent).map(StatefulRedisConnection::reactive).flatMap(conn -> conn.clusterMyId().flatMap(nodeId -> (Mono)routedCall.apply((String)nodeId, (RedisClusterReactiveCommands)conn))).onErrorResume(err -> {
            logger.error("Cluster routing failed for {} - falling back to superCall", (Object)commandType, err);
            return (Mono)superCall.get();
        });
    }

    private ConnectionIntent getConnectionIntent(ProtocolKeyword commandType) {
        try {
            Command probe = new Command(commandType, null);
            boolean isReadOnly = this.getStatefulConnection().getOptions().getReadOnlyCommands().isReadOnly(probe);
            return isReadOnly ? ConnectionIntent.READ : ConnectionIntent.WRITE;
        }
        catch (Exception e) {
            logger.error("Error while determining connection intent for " + commandType, (Throwable)e);
            return ConnectionIntent.WRITE;
        }
    }

    private <T extends ScanCursor> Mono<T> clusterScan(ScanCursor cursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> resultMapper) {
        return RedisAdvancedClusterReactiveCommandsImpl.clusterScan(this.getStatefulConnection(), this.getConnectionProvider(), cursor, scanFunction, resultMapper);
    }

    private <T> Flux<T> pipeliningWithMap(Map<K, V> map, Function<Map<K, V>, Flux<T>> function, Function<Flux<T>, Flux<T>> resultFunction) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return function.apply(map);
        }
        List publishers = partitioned.values().stream().map(ks -> {
            HashMap op = new HashMap();
            ks.forEach(k -> op.put(k, map.get(k)));
            return (Flux)function.apply(op);
        }).collect(Collectors.toList());
        return resultFunction.apply(Flux.merge(publishers));
    }

    protected <T> Map<String, Publisher<T>> executeOnUpstream(Function<RedisClusterReactiveCommands<K, V>, ? extends Publisher<T>> function) {
        return this.executeOnNodes(function, redisClusterNode -> redisClusterNode.is(RedisClusterNode.NodeFlag.UPSTREAM));
    }

    protected <T> Map<String, Publisher<T>> executeOnNodes(Function<RedisClusterReactiveCommands<K, V>, ? extends Publisher<T>> function, Predicate<RedisClusterNode> filter) {
        HashMap<String, Publisher<T>> executions = new HashMap<String, Publisher<T>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            if (!filter.test(redisClusterNode)) continue;
            RedisURI uri = redisClusterNode.getUri();
            Mono<RedisClusterReactiveCommands<K, V>> connection = this.getConnectionReactive(uri.getHost(), uri.getPort());
            executions.put(redisClusterNode.getNodeId(), (Publisher<T>)connection.flatMapMany(function::apply));
        }
        return executions;
    }

    private Mono<RedisClusterReactiveCommands<K, V>> findConnectionBySlotReactive(int slot) {
        RedisClusterNode node = this.getStatefulConnection().getPartitions().getPartitionBySlot(slot);
        if (node != null) {
            return this.getConnectionReactive(node.getUri().getHost(), node.getUri().getPort());
        }
        return Mono.error((Throwable)new RedisException("No partition for slot " + slot));
    }

    private AsyncClusterConnectionProvider getConnectionProvider() {
        ClusterDistributionChannelWriter writer = (ClusterDistributionChannelWriter)this.getStatefulConnection().getChannelWriter();
        return (AsyncClusterConnectionProvider)((Object)writer.getClusterConnectionProvider());
    }

    static <T extends ScanCursor, K, V> Mono<T> clusterScan(StatefulRedisClusterConnection<K, V> connection, AsyncClusterConnectionProvider connectionProvider, ScanCursor cursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> mapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(connection, cursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(cursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(cursor);
        Mono scanCursor = RedisAdvancedClusterReactiveCommandsImpl.getMono(connectionProvider.getConnectionAsync(ConnectionIntent.WRITE, currentNodeId)).flatMap(conn -> (Mono)scanFunction.apply(conn.reactive(), continuationCursor));
        return mapper.map(nodeIds, currentNodeId, scanCursor);
    }

    private static <T> Mono<T> getMono(CompletableFuture<T> future) {
        return Mono.fromCompletionStage(future);
    }
}

