package com.snowballfinance.message.io.net;

import com.snowballfinance.message.io.Fragment;
import com.snowballfinance.message.io.logger.Logger;
import com.snowballfinance.message.io.logger.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.a;
import rx.b.f;
import rx.c;
import rx.c.a.d;
import rx.f.b;

/* loaded from: classes.dex */
public class FragmentConnector {
    private static final Logger logger = LoggerFactory.getLogger(FragmentConnector.class);
    private final InetSocketAddress address;
    private FragmentChannel channel;
    private ReadThread readThread;
    private final String uuid;
    private final AtomicBoolean CONNECTED = new AtomicBoolean(false);
    private final AtomicBoolean WRITABLE = new AtomicBoolean(false);
    private final AtomicBoolean SHUTTEDDOWN = new AtomicBoolean(false);
    private final b<Fragment> fragmentPublishSubject = b.b();

    public FragmentConnector(String str, InetSocketAddress inetSocketAddress) {
        this.uuid = str;
        this.address = inetSocketAddress;
    }

    public FragmentChannel channel() {
        return this.channel;
    }

    public synchronized a<Boolean> connect() {
        a<Boolean> a2;
        if (!this.CONNECTED.get()) {
            try {
                start();
            } catch (Throwable th) {
                a2 = a.a(th);
            }
        }
        if (this.SHUTTEDDOWN.get()) {
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("channel %s already marked as shutdown.", this.channel));
            }
            this.channel.close();
            this.readThread.interrupt();
            a2 = a.a((Throwable) new Exception("channel already marked as shutdown."));
        } else {
            Fragment allocate = this.channel.allocate(7);
            this.channel.write(allocate);
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("sent [SYN:%s] to %s.", Long.valueOf(allocate.getSequence()), this.channel));
            }
            this.channel.isClosed().a(new f<Boolean, a<Boolean>>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.3
                @Override // rx.b.f
                public a<Boolean> call(Boolean bool) {
                    if (!bool.booleanValue()) {
                        return a.a(false);
                    }
                    return FragmentConnector.this.channel.write(FragmentConnector.this.channel.allocate(0));
                }
            }).a(new rx.b.b<Boolean>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.4
                @Override // rx.b.b
                public void call(Boolean bool) {
                    if (bool.booleanValue() && FragmentConnector.logger.isDebugEnabled()) {
                        FragmentConnector.logger.debug(String.format("sent [FIN] to %s.", FragmentConnector.this.channel));
                    }
                }
            });
            this.channel.isConnected().a(new rx.b.b<Boolean>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.5
                @Override // rx.b.b
                public void call(Boolean bool) {
                    FragmentConnector.this.WRITABLE.set(bool.booleanValue());
                }
            });
            a2 = this.channel.isConnected().a((c<? extends R, ? super Boolean>) d.f3560a);
        }
        return a2;
    }

    public synchronized boolean isActive() {
        return this.WRITABLE.get();
    }

    public a<FragmentEvent> observable() {
        return this.fragmentPublishSubject.b(new f<Fragment, FragmentEvent>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.6
            @Override // rx.b.f
            public FragmentEvent call(Fragment fragment) {
                return new FragmentEvent((ChannelContext) FragmentConnector.this.channel.getAttribute(ChannelContext.CONTEXT), fragment);
            }
        });
    }

    public synchronized void shutdown() {
        if (this.readThread != null) {
            this.readThread.interrupt();
        }
        if (this.CONNECTED.compareAndSet(true, false)) {
            this.WRITABLE.set(false);
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("connector[%s] is shutting down.", this.address));
            }
            this.channel.close();
        } else {
            logger.warn(String.format("connector[%s] is not available or has been shutdown.", this.address));
        }
        this.SHUTTEDDOWN.set(true);
    }

    public synchronized FragmentConnector start() {
        if (this.CONNECTED.get()) {
            logger.warn(String.format("connector already connect to %s", this.address.toString()));
        } else {
            SocketChannel open = SocketChannel.open(this.address);
            open.configureBlocking(false);
            this.channel = new FragmentChannel(open);
            this.channel.addAttribute(ChannelContext.CONTEXT, new ChannelContext(this.channel));
            this.channel.uuid(this.uuid);
            this.readThread = new ReadThread(open);
            this.readThread.observable().a(new rx.b.b<Fragment>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.1
                @Override // rx.b.b
                public void call(Fragment fragment) {
                    FragmentConnector.this.fragmentPublishSubject.a_((b) fragment);
                }
            }, new rx.b.b<Throwable>() { // from class: com.snowballfinance.message.io.net.FragmentConnector.2
                @Override // rx.b.b
                public void call(Throwable th) {
                    FragmentConnector.logger.warn("read fragment error", th);
                    FragmentConnector.this.fragmentPublishSubject.a_(th);
                }
            });
            this.readThread.start();
        }
        this.CONNECTED.set(true);
        return this;
    }

    public String toString() {
        return String.format("%s", this.address);
    }
}
