/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.executor.parallel.processor;

import com.alibaba.dts.client.context.ClientContext;
import com.alibaba.dts.client.executor.job.processor.FailureJobProcessor;
import com.alibaba.dts.client.executor.job.processor.ParallelJobProcessor;
import com.alibaba.dts.client.executor.parallel.ParallelPool;
import com.alibaba.dts.client.executor.parallel.processor.DefaultFailureJobProcessor;
import com.alibaba.dts.client.executor.parallel.processor.FailureJobContext;
import com.alibaba.dts.client.executor.parallel.processor.ParallelJobContext;
import com.alibaba.dts.client.executor.parallel.unit.ExecutorUnit;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.result.ProcessResult;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ParallelTaskProcessor
extends Thread
implements Constants {
    private static final Log logger = LogFactory.getLog(ParallelTaskProcessor.class);
    private ExecutorUnit executorUnit;
    private volatile boolean stop = false;
    private int status = 0;
    private AtomicInteger threadCounter;
    private ParallelJobContext context;
    private FailureJobContext failureJobContext;
    private FailureJobProcessor failureJobProcessor = new DefaultFailureJobProcessor();
    private final ClientContext clientContext;

    public ParallelTaskProcessor(ClientContext clientContext, ExecutorUnit executorUnit, int index, AtomicInteger threadCounter) {
        this.clientContext = clientContext;
        this.executorUnit = executorUnit;
        super.setName("DtsTaskProcessor-" + executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount() + "-" + index);
        this.threadCounter = threadCounter;
        this.context = new ParallelJobContext(clientContext, this.executorUnit.getExecutableTask().getJob(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot(), executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
        this.failureJobContext = new FailureJobContext(this.executorUnit.getExecutableTask().getJob(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot(), executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
        String[] jobProcessorProperties = executorUnit.getExecutableTask().getJob().getJobProcessor().split(":");
        String jobProcessor = jobProcessorProperties[0].trim();
        if (this.clientContext.getClientConfig().getFailureJobProcessorMap() != null && this.clientContext.getClientConfig().getFailureJobProcessorMap().get(jobProcessor) != null) {
            this.failureJobProcessor = this.clientContext.getClientConfig().getFailureJobProcessorMap().get(jobProcessor);
        }
        this.context.setAvailableMachineAmount(this.executorUnit.getExecutableTask().getAvailableMachineAmount());
        this.context.setCurrentMachineNumber(this.executorUnit.getExecutableTask().getCurrentMachineNumber());
    }

    public void refresh(ExecutorUnit executorUnit, int index) {
        this.executorUnit = executorUnit;
        super.setName("DtsTaskProcessor-" + executorUnit.getExecutableTask().getJob().getId() + "-" + executorUnit.getExecutableTask().getJob().getJobProcessor() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getId() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getFireTime() + "-" + executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount() + "-" + index);
        this.context = new ParallelJobContext(this.clientContext, this.executorUnit.getExecutableTask().getJob(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot(), executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
        this.failureJobContext = new FailureJobContext(this.executorUnit.getExecutableTask().getJob(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot(), executorUnit.getExecutableTask().getJobInstanceSnapshot().getRetryCount());
        String[] jobProcessorProperties = executorUnit.getExecutableTask().getJob().getJobProcessor().split(":");
        String jobProcessor = jobProcessorProperties[0].trim();
        if (this.clientContext.getClientConfig().getFailureJobProcessorMap() != null && this.clientContext.getClientConfig().getFailureJobProcessorMap().get(jobProcessor) != null) {
            this.failureJobProcessor = this.clientContext.getClientConfig().getFailureJobProcessorMap().get(jobProcessor);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            ParallelJobProcessor parallelJobProcessor = null;
            try {
                parallelJobProcessor = this.clientContext.getJobProcessorFactory().createAndGetParallelJobProcessor(this.executorUnit.getExecutableTask().getJob(), false);
            }
            catch (Throwable e) {
                logger.error((Object)("[ParallelTaskProcessor]: createAndGetParallelJobProcessor error, jobProcessor:" + this.executorUnit.getExecutableTask().getJob().getJobProcessor()), e);
            }
            BlockingQueue<TaskSnapshot> queue = this.executorUnit.getQueue();
            while (true) {
                if (this.stop) {
                    if (queue.isEmpty()) return;
                }
                TaskSnapshot taskSnapshot = null;
                try {
                    taskSnapshot = queue.poll(10000L, TimeUnit.MILLISECONDS);
                }
                catch (Throwable e) {
                    logger.error((Object)("[ParallelTaskProcessor]: take executableTask error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId()), e);
                }
                if (null == taskSnapshot) continue;
                this.executeTask(taskSnapshot, parallelJobProcessor);
            }
        }
        catch (Throwable e) {
            logger.error((Object)("[ParallelTaskProcessor]: run error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId()), e);
            return;
        }
        finally {
            try {
                ParallelPool parallelPool = this.executorUnit.getParallelPool();
                parallelPool.stopTask(this.executorUnit.getExecutableTask().getJob().getId(), this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId());
            }
            catch (Throwable e) {
                logger.error((Object)("[ParallelTaskProcessor]: finally stopTask error, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId()), e);
            }
            finally {
                if (this.clientContext.getClientConfig().isFinishLog()) {
                    logger.warn((Object)("[ParallelTaskProcessor]: finally stopTask, instanceId:" + this.executorUnit.getExecutableTask().getJobInstanceSnapshot().getId()));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask(TaskSnapshot taskSnapshot, ParallelJobProcessor parallelJobProcessor) {
        if (null == parallelJobProcessor) {
            logger.error((Object)("[ParallelTaskProcessor]: jobProcessor is null, please check " + this.executorUnit.getExecutableTask().getJob().getJobProcessor()));
            this.clientContext.getExecutor().acknowledge(taskSnapshot, 4, 0);
            return;
        }
        this.status = 1;
        this.threadCounter.incrementAndGet();
        try {
            this.context.setTask(taskSnapshot);
            this.context.initRetryCount(taskSnapshot.getRetryCount());
            this.failureJobContext.setTask(this.context.getTask());
            this.failureJobContext.initRetryCount(taskSnapshot.getRetryCount());
            ProcessResult processResult = null;
            try {
                processResult = parallelJobProcessor.process(this.context);
            }
            catch (Throwable e) {
                logger.error((Object)("[ParallelTaskProcessor]: process error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId()), e);
                this.failureJobContext.setE(e);
            }
            if (null == processResult) {
                logger.error((Object)("[ParallelTaskProcessor]: process error, processResult is null, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId()));
                processResult = new ProcessResult(false);
                if (null == this.failureJobContext.getE()) {
                    this.failureJobContext.setE(new RuntimeException("processResult is null"));
                }
                try {
                    this.failureJobProcessor.process(this.failureJobContext);
                }
                catch (Throwable e) {
                    logger.error((Object)("[ParallelTaskProcessor]: process failure job error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId()), e);
                }
            }
            if (!this.stop) {
                this.handleRetryCount(taskSnapshot, processResult);
                this.clientContext.getExecutor().acknowledge(taskSnapshot, processResult.isSuccess() ? 3 : 4, processResult.getRetryCount());
            }
        }
        catch (Throwable e) {
            logger.error((Object)("[ParallelTaskProcessor]: executeTask error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId()), e);
        }
        finally {
            this.threadCounter.decrementAndGet();
            this.status = 0;
        }
    }

    private void handleRetryCount(TaskSnapshot taskSnapshot, ProcessResult processResult) {
        if (processResult.isSuccess()) {
            processResult.setRetryCount(0);
            return;
        }
        if (this.executorUnit.getExecutableTask().isCompensation()) {
            if (taskSnapshot.getRetryCount() > 0) {
                processResult.setRetryCount(taskSnapshot.getRetryCount() - 1);
            } else {
                processResult.setRetryCount(0);
            }
            return;
        }
        if (processResult.getRetryCount() > 100) {
            processResult.setRetryCount(100);
            return;
        }
    }

    public boolean isStop() {
        return this.stop;
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }

    public int getStatus() {
        return this.status;
    }
}

