/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.commit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

public class CommitterEventHandler
extends AbstractService
implements EventHandler<CommitterEvent> {
    private static final Log LOG = LogFactory.getLog(CommitterEventHandler.class);
    private final AppContext context;
    private final OutputCommitter committer;
    private final RMHeartbeatHandler rmHeartbeatHandler;
    private ThreadPoolExecutor launcherPool;
    private Thread eventHandlingThread;
    private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue();
    private final AtomicBoolean stopped;
    private final ClassLoader jobClassLoader;
    private Thread jobCommitThread = null;
    private int commitThreadCancelTimeoutMs;
    private long commitWindowMs;
    private FileSystem fs;
    private Path startCommitFile;
    private Path endCommitSuccessFile;
    private Path endCommitFailureFile;

    public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) {
        this(context, committer, rmHeartbeatHandler, null);
    }

    public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
        super("CommitterEventHandler");
        this.context = context;
        this.committer = committer;
        this.rmHeartbeatHandler = rmHeartbeatHandler;
        this.stopped = new AtomicBoolean(false);
        this.jobClassLoader = jobClassLoader;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.commitThreadCancelTimeoutMs = conf.getInt("yarn.app.mapreduce.am.job.committer.cancel-timeout", 60000);
        this.commitWindowMs = conf.getLong("yarn.app.mapreduce.am.job.committer.commit-window", 10000L);
        try {
            this.fs = FileSystem.get((Configuration)conf);
            JobID id = TypeConverter.fromYarn((ApplicationId)this.context.getApplicationID());
            JobId jobId = TypeConverter.toYarn((JobID)id);
            String user = UserGroupInformation.getCurrentUser().getShortUserName();
            this.startCommitFile = MRApps.getStartJobCommitFile((Configuration)conf, (String)user, (JobId)jobId);
            this.endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile((Configuration)conf, (String)user, (JobId)jobId);
            this.endCommitFailureFile = MRApps.getEndJobCommitFailureFile((Configuration)conf, (String)user, (JobId)jobId);
        }
        catch (IOException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    protected void serviceStart() throws Exception {
        ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder().setNameFormat("CommitterEvent Processor #%d");
        if (this.jobClassLoader != null) {
            1 backingTf = new /* Unavailable Anonymous Inner Class!! */;
            tfBuilder.setThreadFactory((ThreadFactory)backingTf);
        }
        ThreadFactory tf = tfBuilder.build();
        this.launcherPool = new ThreadPoolExecutor(5, 5, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
        this.eventHandlingThread = new Thread((Runnable)new /* Unavailable Anonymous Inner Class!! */);
        this.eventHandlingThread.setName("CommitterEvent Handler");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    public void handle(CommitterEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    protected void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.launcherPool != null) {
            this.launcherPool.shutdown();
        }
        super.serviceStop();
    }

    private synchronized void jobCommitStarted() throws IOException {
        if (this.jobCommitThread != null) {
            throw new IOException("Commit while another commit thread active: " + this.jobCommitThread.toString());
        }
        this.jobCommitThread = Thread.currentThread();
    }

    private synchronized void jobCommitEnded() {
        if (this.jobCommitThread == Thread.currentThread()) {
            this.jobCommitThread = null;
            this.notifyAll();
        }
    }

    private synchronized void cancelJobCommit() {
        Thread threadCommitting = this.jobCommitThread;
        if (threadCommitting != null && threadCommitting.isAlive()) {
            LOG.info((Object)"Cancelling commit");
            threadCommitting.interrupt();
            long now = this.context.getClock().getTime();
            long timeoutTimestamp = now + (long)this.commitThreadCancelTimeoutMs;
            try {
                while (this.jobCommitThread == threadCommitting && now > timeoutTimestamp) {
                    this.wait(now - timeoutTimestamp);
                    now = this.context.getClock().getTime();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    static /* synthetic */ ClassLoader access$000(CommitterEventHandler x0) {
        return x0.jobClassLoader;
    }

    static /* synthetic */ AtomicBoolean access$100(CommitterEventHandler x0) {
        return x0.stopped;
    }

    static /* synthetic */ BlockingQueue access$200(CommitterEventHandler x0) {
        return x0.eventQueue;
    }

    static /* synthetic */ Log access$300() {
        return LOG;
    }

    static /* synthetic */ ThreadPoolExecutor access$400(CommitterEventHandler x0) {
        return x0.launcherPool;
    }

    static /* synthetic */ OutputCommitter access$500(CommitterEventHandler x0) {
        return x0.committer;
    }

    static /* synthetic */ AppContext access$600(CommitterEventHandler x0) {
        return x0.context;
    }

    static /* synthetic */ FileSystem access$700(CommitterEventHandler x0) {
        return x0.fs;
    }

    static /* synthetic */ Path access$800(CommitterEventHandler x0) {
        return x0.startCommitFile;
    }

    static /* synthetic */ void access$900(CommitterEventHandler x0) throws IOException {
        x0.jobCommitStarted();
    }

    static /* synthetic */ Path access$1000(CommitterEventHandler x0) {
        return x0.endCommitSuccessFile;
    }

    static /* synthetic */ Path access$1100(CommitterEventHandler x0) {
        return x0.endCommitFailureFile;
    }

    static /* synthetic */ void access$1200(CommitterEventHandler x0) {
        x0.jobCommitEnded();
    }

    static /* synthetic */ void access$1300(CommitterEventHandler x0) {
        x0.cancelJobCommit();
    }

    static /* synthetic */ RMHeartbeatHandler access$1400(CommitterEventHandler x0) {
        return x0.rmHeartbeatHandler;
    }

    static /* synthetic */ long access$1500(CommitterEventHandler x0) {
        return x0.commitWindowMs;
    }
}

