/*
 * Decompiled with CFR 0.152.
 */
package com.swimap.base.rpc;

import com.swimap.base.rpc.BinTranslator;
import com.swimap.base.rpc.RpcErrorVerUnMatch;
import com.swimap.base.rpc.RpcPendingRequest;
import com.swimap.base.rpc.RpcRegister;
import com.swimap.base.rpc.RpcRegisterItem;
import com.swimap.base.rpc.RpcSocketPDU;
import com.swimap.base.rpc.RpcTrace;
import com.swimap.base.rpc.RpcUtil;
import com.swimap.base.rpc.VersionUtil;
import com.swimap.base.rpc.handshake.ClientHello;
import com.swimap.base.rpc.handshake.ServerHello;
import com.swimap.base.rpc.nio.ISocketPDU;
import com.swimap.base.rpc.nio.ISocketProtocol;
import com.swimap.base.rpc.nio.NioTool;
import com.swimap.base.rpc.nio.SSLSocketInputStream;
import com.swimap.base.rpc.nio.SocketConnection;
import com.swimap.external.dsf.base.framework.AppError;
import com.swimap.external.dsf.base.rpc.RpcContext;
import com.swimap.external.dsf.base.rpc.RpcError;
import com.swimap.external.dsf.base.rpc.RpcErrorMethodNotFound;
import com.swimap.external.dsf.base.rpc.RpcErrorSecurity;
import com.swimap.external.dsf.base.rpc.RpcMessage;
import com.swimap.external.dsf.base.rpc.mdp.MDPError;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.Socket;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RpcSocketProtocolTCP
implements ISocketProtocol {
    private static Log log = LogFactory.getLog(RpcSocketProtocolTCP.class);
    public static final String TCP = "tcp";
    protected static final int MAX_PATH_LENGTH = 1024;
    private static final Map<String, Integer> clientManeger = new ConcurrentHashMap<String, Integer>();
    private static final int VERSIONX_TAG = 233433;
    private static final int VERSIONXX_TAG = 2046;
    private static final int VERSIONY_TAG = 24523;
    private static final int VERSIONYY_TAG = 24524;
    public static final String TIMEOUT = "dsf_rpc_timeout";
    private Map<Integer, RpcPendingRequest> pendingRequests = new ConcurrentHashMap<Integer, RpcPendingRequest>();
    private long restart = 0L;

    @Override
    public String getSchema() {
        return TCP;
    }

    @Override
    public ISocketPDU readPDU(SocketConnection conn) throws Exception {
        return BinTranslator.readPDU(conn, conn.getInputStream());
    }

    @Override
    public void writePDU(SocketConnection conn, ISocketPDU socketPDU) throws IOException {
        RpcSocketPDU pdu = (RpcSocketPDU)socketPDU;
        if (pdu.isTimeout()) {
            log.error((Object)String.format("write pdu, rpc pdu timeout path=%s", pdu.getPath()));
            return;
        }
        BinTranslator.writeToStream(pdu, conn.getOutputStream(), conn.isUseOldProtocal());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(SocketConnection conn) {
        for (RpcPendingRequest request : this.pendingRequests.values()) {
            request.cancel();
        }
        this.pendingRequests.clear();
        String clientIP = conn.getRemoteAddress();
        Integer count = 0;
        Map<String, Integer> map = clientManeger;
        synchronized (map) {
            if (clientManeger.get(clientIP) != null) {
                Integer n = count = clientManeger.get(clientIP);
                Integer n2 = count = Integer.valueOf(count - 1);
                if (count > 0) {
                    clientManeger.put(clientIP, count);
                } else {
                    clientManeger.remove(clientIP);
                }
            }
        }
    }

    @Override
    public String getPrefixPath() {
        return null;
    }

    @Override
    public void handleConnected(SocketConnection conn) throws IOException {
        InputStream is;
        DataInputStream input;
        long serverR3Flag;
        OutputStream os = conn.getOutputStream();
        DataOutputStream output = new DataOutputStream(os);
        if (RpcUtil.enableCompress) {
            output.writeInt(24524);
            output.flush();
        } else {
            output.writeInt(24523);
            output.flush();
        }
        output.writeLong(255L);
        output.flush();
        if (conn.isSSLConnection()) {
            boolean lab = true;
            while (lab) {
                int retVal = ((SSLSocketInputStream)conn.getInputStream()).doRead();
                if (retVal <= 0) {
                    if (retVal != -1) continue;
                    throw new IOException("input closed");
                }
                lab = false;
            }
        }
        if (!VersionUtil.isProtocalAfterR3(serverR3Flag = (input = new DataInputStream(is = conn.getInputStream())).readLong())) {
            this.handleOldServerHello(conn, serverR3Flag);
            return;
        }
        this.handleServerHello(conn);
    }

    private void handleOldServerHello(SocketConnection conn, long serverStartTime) {
        log.warn((Object)"handle old server Hello, communicate with old protocol");
        conn.setUseOldProtocal(true);
        this.updateRestartTime(conn, serverStartTime);
        conn.setSoTimeout(RpcUtil.getRPCTimeout());
    }

    private void handleServerHello(SocketConnection conn) throws IOException {
        ServerHello serverHello;
        ObjectOutputStream output = new ObjectOutputStream(conn.getOutputStream());
        ClientHello clientHello = new ClientHello();
        output.writeObject(clientHello);
        output.flush();
        ObjectInputStream input = new ObjectInputStream(conn.getInputStream());
        try {
            serverHello = (ServerHello)input.readObject();
        }
        catch (ClassNotFoundException e) {
            throw new RpcErrorVerUnMatch("RPC protocol unmatch, please update manually", null);
        }
        if (!VersionUtil.checkVersion(serverHello.getVersion())) {
            throw new RpcErrorVerUnMatch("RPC protocol unmatch client: xRPC-1.1, server: " + serverHello.getVersion(), serverHello.getUpgradeInfo());
        }
        conn.setUseOldProtocal(false);
        this.updateRestartTime(conn, serverHello.getStartTime());
        conn.setSoTimeout(RpcUtil.getRPCTimeout());
    }

    private void updateRestartTime(SocketConnection conn, long startTime) {
        if (this.restart != startTime) {
            this.restart = startTime;
            conn.setPeerRestart(true);
        } else {
            conn.setPeerRestart(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handlePDU(SocketConnection conn, ISocketPDU socketPDU) throws Exception {
        RpcSocketPDU pdu = (RpcSocketPDU)socketPDU;
        if (pdu.getType() == 2) {
            RpcMessage message = pdu.getMessage();
            int mid = message.getMessageId();
            RpcPendingRequest pendingRequest = this.pendingRequests.remove(mid);
            if (pendingRequest != null) {
                pendingRequest.setResponse(message);
            } else {
                log.warn((Object)("Response ignored: " + message));
            }
        } else {
            if (pdu.isTimeout()) {
                log.error((Object)String.format("handle pdu, rpc pdu timeout path=%s", pdu.getPath()));
                return;
            }
            RequestHandler handler = new RequestHandler(conn, pdu);
            try {
                handler.handleRequest();
            }
            finally {
                if (log.isDebugEnabled() && handler.rsp != null) {
                    long end = System.currentTimeMillis();
                    RpcSocketPDU trace = new RpcSocketPDU(2, pdu.getPath(), handler.rsp.getMessage().clone());
                    trace.getMessage().addParameter("time-consume", end - handler.begin);
                    this.sendTrace(trace, conn, false);
                }
            }
        }
    }

    @Override
    public void handleError(SocketConnection conn, ISocketPDU socketPDU, int errorCode) throws Exception {
        RpcSocketPDU pdu = (RpcSocketPDU)socketPDU;
        if (pdu.getType() == 1) {
            RequestHandler handler = new RequestHandler(conn, pdu);
            RpcError e = new RpcError("task rejected , server is busy.");
            e.setErrorCode(106);
            handler.responseException(e);
        } else {
            log.warn((Object)"task rejected , may be request queue is full.");
        }
    }

    @Override
    public boolean willHandleAccepted() {
        return true;
    }

    @Override
    public boolean willHandleConnected() {
        return true;
    }

    public Map<Integer, RpcPendingRequest> getRequsetPendings() {
        return this.pendingRequests;
    }

    public RpcPendingRequest getPendingRequest(int mid) {
        return new RpcPendingRequest(mid, this.pendingRequests);
    }

    protected void sendTrace(RpcSocketPDU trace, SocketConnection conn, boolean isRequest) {
        if (trace.getPath().startsWith("/event/") || trace.getPath().startsWith("/naming/")) {
            return;
        }
        if (RpcTrace.isRealtime) {
            RpcTrace.getInstance().push(conn, trace, isRequest);
        } else {
            RpcTrace.log(conn, trace, isRequest);
        }
    }

    public String toString() {
        return this.getSchema();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean verifyConnectionCount(String clientIp) {
        Integer clientCount = 0;
        Map<String, Integer> map = clientManeger;
        synchronized (map) {
            clientCount = clientManeger.get(clientIp);
            if (clientCount == null) {
                clientCount = 1;
                clientManeger.put(clientIp, clientCount);
                return true;
            }
            Integer n = clientCount;
            Integer n2 = clientCount = Integer.valueOf(clientCount + 1);
            if (clientCount > RpcUtil.getMaxClientCount()) {
                return false;
            }
            clientManeger.put(clientIp, clientCount);
        }
        return true;
    }

    public Map<String, Integer> getClientManeger() {
        return clientManeger;
    }

    @Override
    public void handleAccepted(SocketConnection conn) throws IOException {
        log.warn((Object)String.format("rpc handleAccepted start from address(%s:%s)", conn.getRemoteAddress(), conn.getRemotePort()));
        conn.setSoTimeout(RpcUtil.getAcceptTimeout());
        if (conn.isSSLConnection()) {
            boolean lab = true;
            while (lab) {
                int retVal = ((SSLSocketInputStream)conn.getInputStream()).doRead();
                if (retVal <= 0) {
                    if (retVal != -1) continue;
                    throw new IOException("input closed");
                }
                lab = false;
            }
        }
        DataInputStream input = new DataInputStream(conn.getInputStream());
        DataOutputStream output = new DataOutputStream(conn.getOutputStream());
        int len = input.readInt();
        if (len == 24524) {
            conn.setClientSupportCompress(true);
            this.handleAcceptYTag(conn, input, output);
            return;
        }
        if (len == 24523) {
            this.handleAcceptYTag(conn, input, output);
            return;
        }
        conn.setUseOldProtocal(true);
        if (len == 2046) {
            conn.setClientSupportCompress(true);
            output.writeLong(NioTool.getStartTime());
            this.closeIOStream(input, output);
            conn.setSoTimeout(RpcUtil.getRPCTimeout());
            log.warn((Object)String.format("rpc handleAccepted end from address(%s:%s)", conn.getRemoteAddress(), conn.getRemotePort()));
            return;
        }
        if (len == 233433) {
            output.writeLong(NioTool.getStartTime());
            this.closeIOStream(input, output);
            conn.setSoTimeout(RpcUtil.getRPCTimeout());
            log.warn((Object)String.format("rpc handleAccepted end from address(%s:%s)", conn.getRemoteAddress(), conn.getRemotePort()));
            return;
        }
        if (len > 1024) {
            throw new IOException("Invalid connection, magic word too long.");
        }
        byte[] buffer = new byte[len];
        input.readFully(buffer);
        String magic = new String(buffer);
        Socket socket = conn.getSocketChannel().socket();
        String clientIp = socket.getInetAddress().getHostAddress();
        String verification = "magic:iMAP";
        if (verification.equals(magic)) {
            if (!this.verifyConnectionCount(clientIp)) {
                log.error((Object)(clientIp + " connection count has been exceed the max limit " + RpcUtil.getMaxClientCount()));
                throw new IOException("exceed max connection limit");
            }
        } else {
            log.error((Object)("unknow magic word, closing connection from " + clientIp));
            throw new IOException("unknow magic word");
        }
        long clientID = input.readLong();
        conn.setClientID(clientID);
        conn.setSoTimeout(RpcUtil.getRPCTimeout());
        output.writeLong(clientID);
        output.writeLong(NioTool.getStartTime());
        output.flush();
        this.closeIOStream(input, output);
        log.warn((Object)String.format("rpc handleAccepted end from address(%s:%s)", conn.getRemoteAddress(), conn.getRemotePort()));
    }

    private void handleAcceptYTag(SocketConnection conn, DataInputStream input, DataOutputStream output) throws IOException {
        long clientR3Flag = input.readLong();
        if (!VersionUtil.isProtocalAfterR3(clientR3Flag)) {
            this.handleOldClientHello(conn, output, clientR3Flag);
        } else {
            output.writeLong(255L);
            output.flush();
            this.handleClientHello(conn);
        }
        this.closeIOStream(input, output);
        conn.setSoTimeout(RpcUtil.getRPCTimeout());
        log.warn((Object)String.format("rpc handleAccepted end from address(%s:%s)", conn.getRemoteAddress(), conn.getRemotePort()));
    }

    private void handleOldClientHello(SocketConnection conn, DataOutputStream output, long clientId) throws IOException {
        log.warn((Object)("handle old client hello, communicate with old protocol." + conn));
        conn.setUseOldProtocal(true);
        conn.setClientID(clientId);
        output.writeLong(NioTool.getStartTime());
        output.flush();
    }

    private void handleClientHello(SocketConnection conn) throws IOException {
        ObjectInputStream input = new ObjectInputStream(conn.getInputStream());
        ObjectOutputStream output = new ObjectOutputStream(conn.getOutputStream());
        ClientHello clientHello = null;
        try {
            clientHello = (ClientHello)input.readObject();
        }
        catch (ClassNotFoundException e) {
            log.error((Object)("read client hello fail" + conn));
            throw new IOException("read client hello fail");
        }
        ServerHello serverHello = new ServerHello();
        if (!VersionUtil.checkVersion(clientHello.getVersion())) {
            log.warn((Object)("peer version unmatch, set the upgrade info." + conn));
            serverHello.setUpgradeInfo(RpcRegister.instance().getRpcUpgradeHandler().getUpgradeInfo());
        }
        output.writeObject(serverHello);
        output.flush();
        conn.setUseOldProtocal(false);
        conn.setClientID(clientHello.getClientId());
    }

    private void closeIOStream(DataInputStream input, DataOutputStream output) throws IOException {
        if (input != null) {
            try {
                input.close();
            }
            catch (IOException e) {
                log.error((Object)"", (Throwable)e);
            }
        }
        if (output != null) {
            try {
                output.close();
            }
            catch (IOException e) {
                log.error((Object)"", (Throwable)e);
            }
        }
    }

    class RequestHandler {
        private SocketConnection conn;
        private RpcSocketPDU pdu;
        private long begin;
        private RpcSocketPDU rsp = null;

        RequestHandler(SocketConnection conn, RpcSocketPDU pdu) {
            this.conn = conn;
            this.pdu = pdu;
        }

        public void handleRequest() throws Exception {
            String path = this.pdu.getPath();
            RpcMessage message = this.pdu.getMessage();
            try {
                RpcRegisterItem call;
                if (log.isDebugEnabled()) {
                    this.begin = System.currentTimeMillis();
                    RpcSocketPDU trace = new RpcSocketPDU(this.pdu.getType(), this.pdu.getPath(), this.pdu.getMessage().clone());
                    RpcSocketProtocolTCP.this.sendTrace(trace, this.conn, true);
                }
                if ((call = RpcRegister.instance().getItem(path, message.getParameterNames())) == null) {
                    this.responseException(new RpcErrorMethodNotFound(path));
                    if (log.isInfoEnabled()) {
                        StringBuilder sb = new StringBuilder(path + '?');
                        Set<String> names = message.getParameterNames();
                        for (String name : names) {
                            sb.append(name);
                            sb.append(",");
                        }
                        log.warn((Object)("rpc method not found: " + sb.toString()));
                    }
                    return;
                }
                if (call.notExported() && this.conn.getConnector().isRestricted()) {
                    this.responseException(new RpcErrorSecurity("RPC method not exported"));
                    log.warn((Object)("Method not exported: " + path));
                    return;
                }
                this.executeCall(call, path, message);
            }
            catch (InvocationTargetException ite) {
                Throwable cause = ite.getCause();
                if (cause instanceof OutOfMemoryError) {
                    throw (OutOfMemoryError)cause;
                }
                if (cause instanceof AppError || cause instanceof RpcError) {
                    this.responseException(cause);
                } else if (cause != null) {
                    this.responseException(new RpcError(cause));
                } else {
                    this.responseException(new RpcError(ite));
                }
            }
            catch (RpcError e) {
                this.responseException(e);
            }
            catch (MDPError e) {
                this.responseException(e);
            }
            catch (Exception e) {
                this.responseException(new RpcError(e));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void executeCall(RpcRegisterItem call, String path, RpcMessage message) throws Exception {
            RpcContext rpcContext = new RpcContext(call.getMethod(), path, message);
            rpcContext.setClientID(this.conn.getClientID());
            rpcContext.setConnectionID(this.conn.getConnectionID());
            rpcContext.setClientAddress(this.conn.getRemoteAddress());
            try {
                if (this.conn.getConnector().isRestricted() && RpcRegister.instance().hasSecurityChecker()) {
                    RpcRegister.instance().getSecurityChecker().checkSecurity(rpcContext);
                }
                call.invoke(message.getParameterMap(), rpcContext);
                this.writeResponse(rpcContext.getResponseMessage());
            }
            finally {
                rpcContext.close();
            }
        }

        void responseException(Throwable e) throws Exception {
            if (e instanceof AppError) {
                log.warn((Object)"", e);
            } else if (e instanceof RpcErrorMethodNotFound && e.getMessage().startsWith("/balance/getFreeDS")) {
                log.warn((Object)"RpcErrorMethodNotFound : /balance/getFreeDS");
            } else {
                log.warn((Object)"Call RPC method failed", e);
            }
            RpcMessage exception = RpcMessage.getErrorMessage(e);
            RpcMessage message = this.pdu.getMessage();
            if (message != null && message.hasMessageId()) {
                exception.setMessageId(message.getMessageId());
            }
            this.writeResponse(exception);
        }

        void writeResponse(RpcMessage message) throws Exception {
            if (this.pdu.getType() != 3 && this.pdu.getType() != 4) {
                RpcSocketPDU responsePDU = new RpcSocketPDU(2, "", message);
                responsePDU.attach(this.pdu.getAttachment());
                if (this.pdu.isTimeout()) {
                    log.error((Object)String.format("reponse pdu, rpc pdu timeout path=%s", this.pdu.getPath()));
                    return;
                }
                responsePDU.setCancelTime(this.pdu.getCancelTime());
                responsePDU.setPath(this.pdu.getPath());
                this.conn.writePDU(responsePDU);
                this.rsp = responsePDU;
            }
        }
    }
}

