/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.PrepareCommitOperator;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCompact;

public class StoreCompactOperator
extends PrepareCommitOperator {
    private final FileStoreTable table;
    @Nullable
    private final Map<String, String> compactPartitionSpec;
    private TableCompact compact;

    public StoreCompactOperator(FileStoreTable table, @Nullable Map<String, String> compactPartitionSpec) {
        this.table = table;
        this.compactPartitionSpec = compactPartitionSpec;
    }

    public void open() throws Exception {
        super.open();
        int task = this.getRuntimeContext().getIndexOfThisSubtask();
        int numTask = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.compact = this.table.newCompact();
        this.compact.withPartitions(this.compactPartitionSpec == null ? Collections.emptyMap() : this.compactPartitionSpec);
        this.compact.withFilter((partition, bucket) -> task == Math.abs(Objects.hash(partition, bucket) % numTask));
    }

    @Override
    protected List<Committable> prepareCommit(boolean endOfInput, long checkpointId) throws IOException {
        return this.compact.compact().stream().map(c -> new Committable(checkpointId, Committable.Kind.FILE, c)).collect(Collectors.toList());
    }
}

