/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class RangeAssignor
extends AbstractPartitionAssignor {
    public static final String RANGE_ASSIGNOR_NAME = "range";
    private static final Utils.TopicPartitionComparator PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();

    @Override
    public String name() {
        return RANGE_ASSIGNOR_NAME;
    }

    private Map<String, List<AbstractPartitionAssignor.MemberInfo>> consumersPerTopic(Map<String, ConsumerPartitionAssignor.Subscription> consumerMetadata) {
        HashMap<String, List<AbstractPartitionAssignor.MemberInfo>> topicToConsumers = new HashMap<String, List<AbstractPartitionAssignor.MemberInfo>>();
        consumerMetadata.forEach((consumerId, subscription) -> {
            AbstractPartitionAssignor.MemberInfo memberInfo = new AbstractPartitionAssignor.MemberInfo((String)consumerId, subscription.groupInstanceId(), subscription.rackId());
            subscription.topics().forEach(topic -> RangeAssignor.put(topicToConsumers, topic, memberInfo));
        });
        return topicToConsumers;
    }

    @Override
    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic, Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
        Map<String, List<AbstractPartitionAssignor.MemberInfo>> consumersPerTopic = this.consumersPerTopic(subscriptions);
        Map<String, String> consumerRacks = this.consumerRacks(subscriptions);
        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream().filter(e -> !((List)e.getValue()).isEmpty()).map(e -> new TopicAssignmentState((String)e.getKey(), (List)e.getValue(), (List)consumersPerTopic.get(e.getKey()), consumerRacks)).collect(Collectors.toList());
        HashMap<String, List<TopicPartition>> assignment = new HashMap<String, List<TopicPartition>>();
        subscriptions.keySet().forEach(memberId -> assignment.put((String)memberId, new ArrayList()));
        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
        if (useRackAware) {
            this.assignWithRackMatching(topicAssignmentStates, assignment);
        }
        topicAssignmentStates.forEach(t -> this.assignRanges((TopicAssignmentState)t, (c, tp) -> true, (Map<String, List<TopicPartition>>)assignment));
        if (useRackAware) {
            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
        }
        return assignment;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
        return this.assignPartitions(RangeAssignor.partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
    }

    private void assignRanges(TopicAssignmentState assignmentState, BiFunction<String, TopicPartition, Boolean> mayAssign, Map<String, List<TopicPartition>> assignment) {
        for (String consumer : assignmentState.consumers.keySet()) {
            if (assignmentState.unassignedPartitions.isEmpty()) break;
            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream().filter(tp -> (Boolean)mayAssign.apply(consumer, (TopicPartition)tp)).limit(assignmentState.maxAssignable(consumer)).collect(Collectors.toList());
            if (assignablePartitions.isEmpty()) continue;
            this.assign(consumer, assignablePartitions, assignmentState, assignment);
        }
    }

    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates, Map<String, List<TopicPartition>> assignment) {
        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
            if (coPartitionedStates.size() > 1) {
                this.assignCoPartitionedWithRackMatching((LinkedHashMap<String, Optional<String>>)consumers, (int)numPartitions, (Collection<TopicAssignmentState>)coPartitionedStates, assignment);
            } else {
                TopicAssignmentState state = (TopicAssignmentState)coPartitionedStates.get(0);
                if (state.needsRackAwareAssignment) {
                    this.assignRanges(state, state::racksMatch, assignment);
                }
            }
        }));
    }

    private void assignCoPartitionedWithRackMatching(LinkedHashMap<String, Optional<String>> consumers, int numPartitions, Collection<TopicAssignmentState> assignmentStates, Map<String, List<TopicPartition>> assignment) {
        LinkedHashSet<String> remainingConsumers = new LinkedHashSet<String>(consumers.keySet());
        for (int i = 0; i < numPartitions; ++i) {
            int p = i;
            Optional<String> matchingConsumer = remainingConsumers.stream().filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch((String)c, new TopicPartition(t.topic, p)) && t.maxAssignable((String)c) > 0)).findFirst();
            if (!matchingConsumer.isPresent()) continue;
            String consumer = matchingConsumer.get();
            assignmentStates.forEach(t -> this.assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), (TopicAssignmentState)t, assignment));
            if (!assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) continue;
            remainingConsumers.remove(consumer);
            if (remainingConsumers.isEmpty()) break;
        }
    }

    private void assign(String consumer, List<TopicPartition> partitions, TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> assignment) {
        assignment.get(consumer).addAll(partitions);
        assignmentState.onAssigned(consumer, partitions);
    }

    private Map<String, String> consumerRacks(Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
        HashMap<String, String> consumerRacks = new HashMap<String, String>(subscriptions.size());
        subscriptions.forEach((memberId, subscription) -> subscription.rackId().filter(r -> !r.isEmpty()).ifPresent(rackId -> consumerRacks.put((String)memberId, (String)rackId)));
        return consumerRacks;
    }

    private class TopicAssignmentState {
        private final String topic;
        private final LinkedHashMap<String, Optional<String>> consumers;
        private final boolean needsRackAwareAssignment;
        private final Map<TopicPartition, Set<String>> partitionRacks;
        private final Set<TopicPartition> unassignedPartitions;
        private final Map<String, Integer> numAssignedByConsumer;
        private final int numPartitionsPerConsumer;
        private int remainingConsumersWithExtraPartition;

        public TopicAssignmentState(String topic, List<PartitionInfo> partitionInfos, List<AbstractPartitionAssignor.MemberInfo> membersOrNull, Map<String, String> consumerRacks) {
            this.topic = topic;
            List<Object> members = membersOrNull == null ? Collections.emptyList() : membersOrNull;
            Collections.sort(members);
            this.consumers = members.stream().map(c -> c.memberId).collect(Collectors.toMap(Function.identity(), c -> Optional.ofNullable((String)consumerRacks.get(c)), (a, b) -> a, LinkedHashMap::new));
            this.unassignedPartitions = partitionInfos.stream().map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toCollection(LinkedHashSet::new));
            this.numAssignedByConsumer = this.consumers.keySet().stream().collect(Collectors.toMap(Function.identity(), c -> 0));
            this.numPartitionsPerConsumer = this.consumers.isEmpty() ? 0 : partitionInfos.size() / this.consumers.size();
            this.remainingConsumersWithExtraPartition = this.consumers.isEmpty() ? 0 : partitionInfos.size() % this.consumers.size();
            HashSet allConsumerRacks = new HashSet();
            HashSet allPartitionRacks = new HashSet();
            members.stream().map(m -> m.memberId).filter(consumerRacks::containsKey).forEach(memberId -> allConsumerRacks.add((String)consumerRacks.get(memberId)));
            if (!allConsumerRacks.isEmpty()) {
                this.partitionRacks = new HashMap<TopicPartition, Set<String>>(partitionInfos.size());
                partitionInfos.forEach(p -> {
                    TopicPartition tp = new TopicPartition(p.topic(), p.partition());
                    Set racks = Arrays.stream(p.replicas()).map(Node::rack).filter(Objects::nonNull).collect(Collectors.toSet());
                    this.partitionRacks.put(tp, racks);
                    allPartitionRacks.addAll(racks);
                });
            } else {
                this.partitionRacks = Collections.emptyMap();
            }
            this.needsRackAwareAssignment = RangeAssignor.this.useRackAwareAssignment(allConsumerRacks, allPartitionRacks, this.partitionRacks);
        }

        boolean racksMatch(String consumer, TopicPartition tp) {
            Optional<String> consumerRack = this.consumers.get(consumer);
            Set<String> replicaRacks = this.partitionRacks.get(tp);
            return consumerRack.isEmpty() || replicaRacks != null && replicaRacks.contains(consumerRack.get());
        }

        int maxAssignable(String consumer) {
            int maxForConsumer = this.numPartitionsPerConsumer + (this.remainingConsumersWithExtraPartition > 0 ? 1 : 0) - this.numAssignedByConsumer.get(consumer);
            return Math.max(0, maxForConsumer);
        }

        void onAssigned(String consumer, List<TopicPartition> newlyAssignedPartitions) {
            int numAssigned = this.numAssignedByConsumer.compute(consumer, (c, n) -> n + newlyAssignedPartitions.size());
            if (numAssigned > this.numPartitionsPerConsumer) {
                --this.remainingConsumersWithExtraPartition;
            }
            this.unassignedPartitions.removeAll(newlyAssignedPartitions);
        }

        public String toString() {
            return "TopicAssignmentState(topic=" + this.topic + ", consumers=" + String.valueOf(this.consumers) + ", partitionRacks=" + String.valueOf(this.partitionRacks) + ", unassignedPartitions=" + String.valueOf(this.unassignedPartitions) + ")";
        }
    }
}

