/*
 * Decompiled with CFR 0.152.
 */
package com.swimap.base.rpc.em;

import com.swimap.base.rpc.RpcUtil;
import com.swimap.base.rpc.em.EMPendingRequest;
import com.swimap.base.rpc.em.EMSocketPDU;
import com.swimap.base.rpc.em.IEMListener;
import com.swimap.base.rpc.nio.ISocketPDU;
import com.swimap.base.rpc.nio.ISocketProtocol;
import com.swimap.base.rpc.nio.SSLSocketInputStream;
import com.swimap.base.rpc.nio.SocketConnection;
import com.swimap.external.dsf.base.rpc.RpcEvent;
import com.swimap.external.dsf.base.rpc.cdr.CdrEvent;
import com.swimap.external.dsf.base.rpc.em.EventClassType;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.omg.CosNotification.EventType;

class SocketProtocolEM
implements ISocketProtocol {
    private static Log log = LogFactory.getLog(SocketProtocolEM.class);
    private static final int EM_SUBSCRIBE = 1;
    private static final int EM_SUBSCRIBE_ACK = 2;
    private static final int EM_SEND_EVENT = 3;
    private static final int EM_EVENT = 4;
    private static final int EM_UNSUBSCRIBE = 5;
    private static final int EM_HEARTBEAT = 10000;
    private static final String EM = "em";
    private static final String EMS = "ems";
    private static final String SUB_REQUEST_PATH = "/sub";
    private static final String DEFAULT_CONSTRAINT = "none";
    private IEMListener listener = null;
    private int restart = 0;
    private final Set<EventClassType> subscribeSet = new HashSet<EventClassType>();
    private Map<Integer, EMPendingRequest> pendingRequests = new ConcurrentHashMap<Integer, EMPendingRequest>();
    private long lastReadTime = System.currentTimeMillis();
    private final EmRequest emRequest = new EmRequest();
    private boolean isSSL = false;
    private EMShakehand shakeHandTask;

    SocketProtocolEM(IEMListener listener, boolean isSSL) {
        this.isSSL = isSSL;
        this.listener = listener;
    }

    @Override
    public String getSchema() {
        if (this.isSSL) {
            return EMS;
        }
        return EM;
    }

    @Override
    public boolean willHandleConnected() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleConnected(SocketConnection conn) throws IOException {
        InputStream is = conn.getInputStream();
        OutputStream os = conn.getOutputStream();
        int ack = 0;
        int randomNum = 0;
        SocketChannel socketChannel = conn.getSocketChannel();
        synchronized (socketChannel) {
            DataOutputStream output = new DataOutputStream(os);
            output.write(new byte[0]);
            os.flush();
            if (is instanceof SSLSocketInputStream) {
                boolean lab = true;
                while (lab) {
                    int retVal = ((SSLSocketInputStream)is).doRead();
                    if (retVal <= 0) {
                        if (retVal != -1) continue;
                        throw new IOException("input closed");
                    }
                    lab = false;
                }
            }
            DataInputStream input = new DataInputStream(is);
            randomNum = input.readInt();
            ack = input.readInt();
            output = new DataOutputStream(os);
            output.writeInt(~randomNum);
            output.writeInt(ack);
            output.writeInt(0);
            os.flush();
            if (input != null) {
                try {
                    input.close();
                }
                catch (IOException ex) {
                    log.warn((Object)"", (Throwable)ex);
                }
            }
            if (output != null) {
                try {
                    output.close();
                }
                catch (IOException ex) {
                    log.warn((Object)"", (Throwable)ex);
                }
            }
        }
        this.startResubscribeThread(conn.getSocketChannel(), os);
        if (this.restart != 0 && this.restart != ack) {
            log.warn((Object)"EM restart, send out EventManager.Restart notice");
            try {
                EMSocketPDU pdu = new EMSocketPDU();
                pdu.setEvent(new CdrEvent("EventManager.Restart"));
                this.handlePDU(null, pdu);
            }
            catch (Exception e) {
                log.error((Object)"", (Throwable)e);
            }
        }
        this.restart = ack;
        this.lastReadTime = System.currentTimeMillis();
        this.startEMShakehand(conn);
    }

    private void startEMShakehand(SocketConnection connection) {
        if (this.shakeHandTask != null) {
            this.shakeHandTask.cancel();
            this.shakeHandTask = null;
        }
        this.shakeHandTask = new EMShakehand(connection);
        Timer timer = new Timer();
        int shakehand_interval = 600000;
        timer.schedule((TimerTask)this.shakeHandTask, shakehand_interval, (long)shakehand_interval);
    }

    private void stopEMShakehand() {
        if (this.shakeHandTask != null) {
            this.shakeHandTask.cancel();
            this.shakeHandTask = null;
        }
    }

    private void startResubscribeThread(final SocketChannel socketChannel, final OutputStream os) {
        new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Set set = SocketProtocolEM.this.subscribeSet;
                synchronized (set) {
                    if (!SocketProtocolEM.this.subscribeSet.isEmpty()) {
                        SocketProtocolEM.this.doResubscribe(socketChannel, os);
                    }
                }
            }
        }.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doResubscribe(SocketChannel socketChannel, OutputStream os) {
        SocketChannel socketChannel2 = socketChannel;
        synchronized (socketChannel2) {
            try {
                for (EventClassType eventType : this.subscribeSet) {
                    this.emRequest.start(null, true, 0, null);
                    this.sendEmRequest(os, true, eventType);
                    this.emRequest.finish();
                }
                RpcEvent.create("local.em.reconnect").send();
            }
            catch (Exception e) {
                log.error((Object)"", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ISocketPDU readPDU(SocketConnection conn) throws Exception {
        this.lastReadTime = System.currentTimeMillis();
        DataInputStream input = new DataInputStream(conn.getInputStream());
        int length = input.readInt();
        int type = input.readInt();
        if (type == 4) {
            int dataLength = input.readInt();
            byte[] data = new byte[dataLength];
            input.readFully(data);
            try {
                CdrEvent event = CdrEvent.decode(data);
                EMSocketPDU eMSocketPDU = new EMSocketPDU(4, null, event);
                return eMSocketPDU;
            }
            catch (Exception e) {
                log.error((Object)"Event parse failed", (Throwable)e);
                ISocketPDU iSocketPDU = null;
                return iSocketPDU;
            }
            finally {
                input.close();
            }
        }
        if (type == 2) {
            int result = input.readInt();
            this.emRequest.setResult(result);
            input.close();
            return null;
        }
        if (type != 10000) {
            log.warn((Object)("Wrong EM type code:" + type));
        }
        input.skip(length - 8);
        input.close();
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void writePDU(SocketConnection conn, ISocketPDU socketPDU) throws IOException {
        OutputStream os = conn.getOutputStream();
        EMSocketPDU pdu = (EMSocketPDU)socketPDU;
        CdrEvent event = pdu.getEvent();
        if (pdu.getType() == 4) {
            this.sendEvent(os, event);
        } else if (pdu.getType() == 1) {
            SocketChannel socketChannel;
            String path = pdu.getPath();
            boolean sub = path.equals(SUB_REQUEST_PATH);
            EventClassType eventType = pdu.getEvent().getEventClassType();
            EMPendingRequest pending = pdu.getPendingRequest();
            int mid = pdu.getMessageId();
            SocketChannel socketChannel2 = socketChannel = conn.getSocketChannel();
            synchronized (socketChannel2) {
                this.syncEmRequest(os, sub, eventType, pending, mid);
            }
        }
    }

    private void sendEvent(OutputStream os, CdrEvent event) throws IOException {
        ByteArrayOutputStream header = this.getEventHeader(event);
        ByteArrayOutputStream body = this.getEventBody(event);
        DataOutputStream out = new DataOutputStream(os);
        out.writeInt(header.size() + body.size() + 8);
        header.writeTo(out);
        out.writeInt(body.size());
        body.writeTo(out);
    }

    ByteArrayOutputStream getEventHeader(CdrEvent event) throws IOException {
        ByteArrayOutputStream header = new ByteArrayOutputStream();
        DataOutputStream output = new DataOutputStream(header);
        output.writeInt(3);
        EventClassType type = event.getEventClassType();
        EventType eventType = EventClassType.getCdrEventType(type);
        this.writeString(output, eventType.type_name);
        this.writeString(output, eventType.domain_name);
        return header;
    }

    ByteArrayOutputStream getEventBody(CdrEvent event) throws IOException {
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        DataOutputStream bodyStream = new DataOutputStream(buffer);
        if (event != null) {
            bodyStream.write(event.encode());
        }
        return buffer;
    }

    private void syncEmRequest(OutputStream os, boolean sub, EventClassType eventType, EMPendingRequest pending, int mid) throws IOException {
        this.emRequest.start(pending, sub, mid, eventType);
        if (this.needRequestEM(sub, eventType)) {
            this.sendEmRequest(os, sub, eventType);
            if (!sub) {
                this.emRequest.setResult(0);
            }
        } else {
            this.emRequest.setResult(0);
        }
        this.emRequest.finish();
    }

    private void sendEmRequest(OutputStream os, boolean sub, EventClassType eventType) throws IOException {
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        DataOutputStream output = new DataOutputStream(buffer);
        EventType cosEventType = EventClassType.getCdrEventType(eventType);
        int requestType = sub ? 1 : 5;
        output.writeInt(requestType);
        this.writeString(output, cosEventType.domain_name);
        this.writeString(output, DEFAULT_CONSTRAINT);
        output.writeInt(1);
        this.writeString(output, cosEventType.type_name);
        DataOutputStream out = new DataOutputStream(os);
        out.writeInt(buffer.size() + 4);
        buffer.writeTo(out);
        os.flush();
        log.warn((Object)String.format("SocketProptocol sendEmRequest eventType=%s", eventType));
    }

    private void writeString(DataOutputStream output, String str) throws IOException {
        byte[] buf = str.getBytes("ASCII");
        output.writeInt(buf.length);
        output.write(buf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addSubscribe(EventClassType eventType) {
        if (eventType == null) {
            return;
        }
        Set<EventClassType> set = this.subscribeSet;
        synchronized (set) {
            this.subscribeSet.add(eventType);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeSubscribe(EventClassType eventType) {
        if (eventType == null) {
            return;
        }
        Set<EventClassType> set = this.subscribeSet;
        synchronized (set) {
            this.subscribeSet.remove(eventType);
        }
    }

    @Override
    public void close(SocketConnection conn) {
        for (EMPendingRequest request : this.pendingRequests.values()) {
            request.cancel();
        }
        this.pendingRequests.clear();
        this.stopEMShakehand();
    }

    @Override
    public String getPrefixPath() {
        return null;
    }

    @Override
    public void handlePDU(SocketConnection conn, ISocketPDU pdu) throws Exception {
        if (this.listener != null) {
            EMSocketPDU socketPDU = (EMSocketPDU)pdu;
            this.listener.handlePDU(conn, socketPDU.getEvent());
        } else {
            log.warn((Object)"listener is null.");
        }
    }

    @Override
    public void handleError(SocketConnection conn, ISocketPDU socketPDU, int errorCode) throws Exception {
        log.error((Object)"em error, may be queue is full");
    }

    Map<Integer, EMPendingRequest> getRequsetPendings() {
        return this.pendingRequests;
    }

    @Override
    public void handleAccepted(SocketConnection conn) throws IOException {
    }

    @Override
    public boolean willHandleAccepted() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean needRequestEM(boolean sub, EventClassType eventType) {
        Set<EventClassType> set = this.subscribeSet;
        synchronized (set) {
            if (sub) {
                return !this.subscribeSet.contains(eventType);
            }
            return this.subscribeSet.contains(eventType);
        }
    }

    EmRequest getEmRequest() {
        return this.emRequest;
    }

    public String toString() {
        return this.getSchema();
    }

    class EMShakehand
    extends TimerTask {
        private SocketConnection connection;

        public EMShakehand(SocketConnection connection) {
            this.connection = connection;
        }

        @Override
        public void run() {
            try {
                if (this.connection.isConnected() && System.currentTimeMillis() - SocketProtocolEM.this.lastReadTime > RpcUtil.getEMReadTimeOut()) {
                    this.connection.close(new Exception("em read timeout"));
                    return;
                }
                ISocketPDU socketPDU = this.createSocketPDU();
                SocketProtocolEM.this.writePDU(this.connection, socketPDU);
            }
            catch (Exception e) {
                log.error((Object)"", (Throwable)e);
                try {
                    this.connection.close(new Exception("em heart failed"));
                }
                catch (Exception ee) {
                    log.error((Object)"", (Throwable)ee);
                }
            }
        }

        private ISocketPDU createSocketPDU() {
            EMSocketPDU pdu = new EMSocketPDU(4, "", new CdrEvent("heart.shake"));
            return pdu;
        }
    }

    class EmRequest {
        private static final int ACK_TIMEOUT = 10000;
        private boolean sub;
        private int mid;
        private EventClassType event;
        private EMPendingRequest pending;
        private Integer result;

        EmRequest() {
        }

        void start(EMPendingRequest pending, boolean sub, int mid, EventClassType event) {
            this.pending = pending;
            this.sub = sub;
            this.mid = mid;
            this.event = event;
            this.result = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void setResult(int result) {
            EmRequest emRequest = this;
            synchronized (emRequest) {
                this.result = result;
                this.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void finish() throws IOException {
            EmRequest emRequest = this;
            synchronized (emRequest) {
                this.result = 0;
                if (this.result == null) {
                    try {
                        this.wait(10000L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    if (this.result == null) {
                        throw new SocketTimeoutException("EM ack timeout.");
                    }
                }
                if (this.result == 0) {
                    if (this.sub) {
                        SocketProtocolEM.this.addSubscribe(this.event);
                    } else {
                        SocketProtocolEM.this.removeSubscribe(this.event);
                    }
                }
                if (this.pending != null) {
                    EMSocketPDU rsp = this.result == 0 ? new EMSocketPDU() : EMSocketPDU.getErrorObj("EM rejected subscribe");
                    rsp.setMessageId(this.mid);
                    this.pending.setResponse(rsp);
                }
            }
        }
    }
}

