/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.QueryableStateUtils;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskexecutor.QueryableStateConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvStateService {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateService.class);
    private final Object lock = new Object();
    private final KvStateRegistry kvStateRegistry;
    private KvStateServer kvStateServer;
    private KvStateClientProxy kvStateClientProxy;
    private boolean isShutdown;

    public KvStateService(KvStateRegistry kvStateRegistry, KvStateServer kvStateServer, KvStateClientProxy kvStateClientProxy) {
        this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
        this.kvStateServer = kvStateServer;
        this.kvStateClientProxy = kvStateClientProxy;
    }

    public KvStateRegistry getKvStateRegistry() {
        return this.kvStateRegistry;
    }

    public KvStateServer getKvStateServer() {
        return this.kvStateServer;
    }

    public KvStateClientProxy getKvStateClientProxy() {
        return this.kvStateClientProxy;
    }

    public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
        return this.kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.isShutdown, "The KvStateService has already been shut down.");
            LOG.info("Starting the kvState service and its components.");
            if (this.kvStateServer != null) {
                try {
                    this.kvStateServer.start();
                }
                catch (Throwable ie) {
                    this.kvStateServer.shutdown();
                    this.kvStateServer = null;
                    LOG.error("Failed to start the Queryable State Data Server.", ie);
                }
            }
            if (this.kvStateClientProxy != null) {
                try {
                    this.kvStateClientProxy.start();
                }
                catch (Throwable ie) {
                    this.kvStateClientProxy.shutdown();
                    this.kvStateClientProxy = null;
                    LOG.error("Failed to start the Queryable State Client Proxy.", ie);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                return;
            }
            LOG.info("Shutting down the kvState service and its components.");
            if (this.kvStateClientProxy != null) {
                try {
                    LOG.debug("Shutting down Queryable State Client Proxy.");
                    this.kvStateClientProxy.shutdown();
                }
                catch (Throwable t) {
                    LOG.warn("Cannot shut down Queryable State Client Proxy.", t);
                }
            }
            if (this.kvStateServer != null) {
                try {
                    LOG.debug("Shutting down Queryable State Data Server.");
                    this.kvStateServer.shutdown();
                }
                catch (Throwable t) {
                    LOG.warn("Cannot shut down Queryable State Data Server.", t);
                }
            }
            this.isShutdown = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.isShutdown;
        }
    }

    public static KvStateService fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration) {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
        KvStateClientProxy kvClientProxy = null;
        KvStateServer kvStateServer = null;
        if (qsConfig != null) {
            int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
            int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
            kvClientProxy = QueryableStateUtils.createKvStateClientProxy(taskManagerServicesConfiguration.getExternalAddress(), qsConfig.getProxyPortRange(), numProxyServerNetworkThreads, numProxyServerQueryThreads, (KvStateRequestStats)new DisabledKvStateRequestStats());
            int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
            int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
            kvStateServer = QueryableStateUtils.createKvStateServer(taskManagerServicesConfiguration.getExternalAddress(), qsConfig.getStateServerPortRange(), numStateServerNetworkThreads, numStateServerQueryThreads, kvStateRegistry, (KvStateRequestStats)new DisabledKvStateRequestStats());
        }
        return new KvStateService(kvStateRegistry, kvStateServer, kvClientProxy);
    }
}

