/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.source;

import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.source.FileStoreEmptySource;
import org.apache.flink.table.store.connector.source.FileStoreSource;
import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class FlinkSourceBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final Configuration conf;
    private boolean isContinuous = false;
    private StreamExecutionEnvironment env;
    @Nullable
    private int[][] projectedFields;
    @Nullable
    private Predicate predicate;
    @Nullable
    private LogSourceProvider logSourceProvider;
    @Nullable
    private Integer parallelism;
    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.conf = Configuration.fromMap(table.schema().options());
    }

    public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
        this.isContinuous = isContinuous;
        return this;
    }

    public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) {
        this.env = env;
        return this;
    }

    public FlinkSourceBuilder withProjection(int[][] projectedFields) {
        this.projectedFields = projectedFields;
        return this;
    }

    public FlinkSourceBuilder withPredicate(Predicate predicate) {
        this.predicate = predicate;
        return this;
    }

    public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
        this.logSourceProvider = logSourceProvider;
        return this;
    }

    public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSourceBuilder withWatermarkStrategy(@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        return this;
    }

    private long discoveryIntervalMills() {
        return ((Duration)this.conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis();
    }

    private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) {
        return new FileStoreSource(this.table, isContinuous, this.discoveryIntervalMills(), continuousScanLatest, this.projectedFields, this.predicate);
    }

    private Source<RowData, ?, ?> buildSource() {
        if (this.isContinuous) {
            if (this.table.schema().primaryKeys().size() > 0 && this.conf.get(CoreOptions.MERGE_ENGINE) == CoreOptions.MergeEngine.PARTIAL_UPDATE) {
                throw new ValidationException("Partial update continuous reading is not supported.");
            }
            CoreOptions.LogStartupMode startupMode = (CoreOptions.LogStartupMode)((Object)this.conf.get(CoreOptions.LOG_SCAN));
            if (this.logSourceProvider == null) {
                return this.buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST);
            }
            if (startupMode != CoreOptions.LogStartupMode.FULL) {
                return this.logSourceProvider.createSource(null);
            }
            return HybridSource.builder((Source)this.buildFileSource(false, false)).addSource((HybridSource.SourceFactory)new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build();
        }
        return this.buildFileSource(false, false);
    }

    public DataStreamSource<RowData> build() {
        if (this.env == null) {
            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
        }
        RowType rowType = this.table.schema().logicalRowType();
        LogicalType produceType = Optional.ofNullable(this.projectedFields).map(Projection::of).map(p -> p.project((LogicalType)rowType)).orElse((LogicalType)rowType);
        DataStreamSource dataStream = this.env.fromSource((Source)((Boolean)this.conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED) != false ? new FileStoreEmptySource() : this.buildSource()), this.watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : this.watermarkStrategy, this.tableIdentifier.asSummaryString(), (TypeInformation)InternalTypeInfo.of((LogicalType)produceType));
        if (this.parallelism != null) {
            dataStream.setParallelism(this.parallelism.intValue());
        }
        return dataStream;
    }
}

