/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.memory.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.store.file.mergetree.MemTable;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionHelper;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;

public class SortBufferMemTable
implements MemTable {
    private final RowType keyType;
    private final RowType valueType;
    private final KeyValueSerializer serializer;
    private final BinaryInMemorySortBuffer buffer;

    public SortBufferMemTable(RowType keyType, RowType valueType, MemorySegmentPool memoryPool) {
        this.keyType = keyType;
        this.valueType = valueType;
        this.serializer = new KeyValueSerializer(keyType, valueType);
        ArrayList<LogicalType> sortKeyTypes = new ArrayList<LogicalType>(keyType.getChildren());
        sortKeyTypes.add(new BigIntType(false));
        NormalizedKeyComputer normalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer(sortKeyTypes, "MemTableKeyComputer");
        RecordComparator keyComparator = CodeGenUtils.newRecordComparator(sortKeyTypes, "MemTableComparator");
        if (memoryPool.freePages() < 3) {
            throw new IllegalArgumentException("Write buffer requires a minimum of 3 page memory, please increase write buffer memory size.");
        }
        this.buffer = BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, InternalSerializers.create(KeyValue.schema(keyType, valueType)), new BinaryRowDataSerializer(sortKeyTypes.size()), keyComparator, memoryPool);
    }

    @Override
    public boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value) throws IOException {
        return this.buffer.write(this.serializer.toRow(key, sequenceNumber, valueKind, value));
    }

    @Override
    public int size() {
        return this.buffer.size();
    }

    @Override
    public long memoryOccupancy() {
        return this.buffer.getOccupancy();
    }

    @Override
    public Iterator<KeyValue> rawIterator() {
        return new RawIterator(this.buffer.getIterator());
    }

    @Override
    public Iterator<KeyValue> mergeIterator(Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
        new QuickSort().sort(this.buffer);
        return new MergeIterator(this.buffer.getIterator(), keyComparator, mergeFunction);
    }

    @Override
    public void clear() {
        this.buffer.clear();
    }

    @VisibleForTesting
    void assertBufferEmpty() {
        Preconditions.checkState(this.buffer.getBufferSegmentCount() == 0, "The sort buffer is not empty");
    }

    private class RawIterator
    implements Iterator<KeyValue> {
        private final MutableObjectIterator<BinaryRowData> kvIter;
        private final KeyValueSerializer current;
        private BinaryRowData currentRow;
        private boolean advanced;

        private RawIterator(MutableObjectIterator<BinaryRowData> kvIter) {
            this.kvIter = kvIter;
            this.current = new KeyValueSerializer(SortBufferMemTable.this.keyType, SortBufferMemTable.this.valueType);
            this.currentRow = new BinaryRowData(SortBufferMemTable.this.keyType.getFieldCount() + 2 + SortBufferMemTable.this.valueType.getFieldCount());
            this.advanced = false;
        }

        @Override
        public boolean hasNext() {
            if (!this.advanced) {
                this.advanceNext();
            }
            return this.currentRow != null;
        }

        @Override
        public KeyValue next() {
            if (!this.hasNext()) {
                return null;
            }
            this.advanced = false;
            return this.current.getReusedKv();
        }

        private void advanceNext() {
            try {
                this.currentRow = this.kvIter.next(this.currentRow);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.currentRow != null) {
                this.current.fromRow(this.currentRow);
            }
            this.advanced = true;
        }
    }

    private class MergeIterator
    implements Iterator<KeyValue> {
        private final MutableObjectIterator<BinaryRowData> kvIter;
        private final Comparator<RowData> keyComparator;
        private final MergeFunctionHelper mergeFunctionHelper;
        private KeyValueSerializer previous;
        private BinaryRowData previousRow;
        private KeyValueSerializer current;
        private BinaryRowData currentRow;
        private boolean advanced;

        private MergeIterator(MutableObjectIterator<BinaryRowData> kvIter, Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
            this.kvIter = kvIter;
            this.keyComparator = keyComparator;
            this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
            int totalFieldCount = SortBufferMemTable.this.keyType.getFieldCount() + 2 + SortBufferMemTable.this.valueType.getFieldCount();
            this.previous = new KeyValueSerializer(SortBufferMemTable.this.keyType, SortBufferMemTable.this.valueType);
            this.previousRow = new BinaryRowData(totalFieldCount);
            this.current = new KeyValueSerializer(SortBufferMemTable.this.keyType, SortBufferMemTable.this.valueType);
            this.currentRow = new BinaryRowData(totalFieldCount);
            this.readOnce();
            this.advanced = false;
        }

        @Override
        public boolean hasNext() {
            this.advanceIfNeeded();
            return this.previousRow != null;
        }

        @Override
        public KeyValue next() {
            this.advanceIfNeeded();
            if (this.previousRow == null) {
                return null;
            }
            this.advanced = false;
            return this.previous.getReusedKv();
        }

        private void advanceIfNeeded() {
            RowData result;
            if (this.advanced) {
                return;
            }
            this.advanced = true;
            do {
                this.swapSerializers();
                if (this.previousRow == null) {
                    return;
                }
                this.mergeFunctionHelper.reset();
                this.mergeFunctionHelper.add(this.previous.getReusedKv().value());
                while (this.readOnce() && this.keyComparator.compare(this.previous.getReusedKv().key(), this.current.getReusedKv().key()) == 0) {
                    this.mergeFunctionHelper.add(this.current.getReusedKv().value());
                    this.swapSerializers();
                }
            } while ((result = this.mergeFunctionHelper.getValue()) == null);
            this.previous.getReusedKv().setValue(result);
        }

        private boolean readOnce() {
            try {
                this.currentRow = this.kvIter.next(this.currentRow);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.currentRow != null) {
                this.current.fromRow(this.currentRow);
            }
            return this.currentRow != null;
        }

        private void swapSerializers() {
            KeyValueSerializer tmp = this.previous;
            BinaryRowData tmpRow = this.previousRow;
            this.previous = this.current;
            this.previousRow = this.currentRow;
            this.current = tmp;
            this.currentRow = tmpRow;
        }
    }
}

