package com.snowballfinance.message.io.net;

import com.snowballfinance.message.io.Fragment;
import com.snowballfinance.message.io.buffer.ChunkBufferAdapter;
import com.snowballfinance.message.io.buffer.CycleChunkBuffer;
import com.snowballfinance.message.io.codec.ByteToFragmentDecoder;
import com.snowballfinance.message.io.codec.CorruptedFragmentException;
import com.snowballfinance.message.io.logger.Logger;
import com.snowballfinance.message.io.logger.LoggerFactory;
import com.snowballfinance.message.io.util.Buffers;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import rx.a;
import rx.c;
import rx.c.a.d;
import rx.f.b;

/* loaded from: classes.dex */
public class ReadThread extends Thread {
    private static final int DEFAULT_BUFFER_SIZE = 524288;
    private CycleChunkBuffer buffer;
    private SocketChannel channel;
    private b<Fragment> fragmentPublishSubject;
    protected final Logger logger;
    private boolean running;
    private Selector selector;

    public ReadThread(SocketChannel socketChannel) {
        super("ReadThread");
        this.logger = LoggerFactory.getLogger(getClass());
        this.running = false;
        this.fragmentPublishSubject = b.b();
        this.channel = socketChannel;
        this.buffer = new CycleChunkBuffer(ByteBuffer.allocate(524288));
        this.selector = Selector.open();
        socketChannel.register(this.selector, 1);
    }

    @Override // java.lang.Thread
    public void interrupt() {
        super.interrupt();
        this.running = false;
    }

    public a<Fragment> observable() {
        return this.fragmentPublishSubject.a((c<? extends R, ? super Fragment>) d.f3560a);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                if (this.selector.select() > 0) {
                    Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        it.remove();
                        if (next.isValid() && next.isReadable()) {
                            int i = Integer.MAX_VALUE;
                            while (this.buffer.remaining() > 0 && i > 0) {
                                int remaining = this.buffer.remaining();
                                int capacity = this.buffer.capacity();
                                i = this.channel.read(this.buffer.toByteBuffer());
                                if (i > 0) {
                                    ChunkBufferAdapter.adjust(this.buffer, i);
                                }
                                if (i >= remaining) {
                                    if (this.logger.isDebugEnabled()) {
                                        this.logger.debug(String.format("increase buffer to %d.", Integer.valueOf(capacity * 2)));
                                    }
                                    this.buffer = Buffers.realloc(this.buffer, capacity * 2);
                                }
                            }
                            while (true) {
                                Fragment decode = ByteToFragmentDecoder.decode(this.buffer);
                                if (decode == null) {
                                    break;
                                } else {
                                    this.fragmentPublishSubject.a_((b<Fragment>) decode);
                                }
                            }
                            if (this.buffer.capacity() - this.buffer.remaining() < 524288 && this.buffer.capacity() > 524288) {
                                if (this.logger.isDebugEnabled()) {
                                    this.logger.debug(String.format("decrease buffer to %d.", 524288));
                                }
                                this.buffer = Buffers.realloc(this.buffer, 524288);
                            }
                            if (this.buffer.remaining() <= 0) {
                                throw new CorruptedFragmentException("too large fragment!");
                            }
                        }
                    }
                }
            } catch (Throwable th) {
                this.fragmentPublishSubject.a_(th);
                return;
            }
        }
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        super.start();
        this.running = true;
    }
}
