/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.processor.server;

import io.netty.channel.ChannelHandlerContext;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.RemotingServer;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.ChannelManager;
import io.seata.core.rpc.processor.RemotingProcessor;
import io.seata.core.rpc.processor.server.BatchLogHandler;
import java.net.SocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerOnRequestProcessor
implements RemotingProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerOnRequestProcessor.class);
    private RemotingServer remotingServer;
    private TransactionMessageHandler transactionMessageHandler;

    public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {
        this.remotingServer = remotingServer;
        this.transactionMessageHandler = transactionMessageHandler;
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            this.onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            }
            catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", new Object[]{message, NetUtil.toIpAddress((SocketAddress)ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()});
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress((SocketAddress)ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
            }
            catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", (Object)e.getMessage(), (Object)e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        if (message instanceof MergedWarpMessage) {
            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()];
            for (int i = 0; i < results.length; ++i) {
                AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i);
                results[i] = this.transactionMessageHandler.onRequest(subMessage, rpcContext);
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results);
            this.remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
        } else {
            AbstractMessage msg = (AbstractMessage)message;
            AbstractResultMessage result = this.transactionMessageHandler.onRequest(msg, rpcContext);
            this.remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }
}

