jdk 源码分析(20)java NIO包简单分析
优点,采用多路复用,而且因为采用buffer机制,当读写buffer时不需要阻塞。nio 也有缺点,因为nio需要很多代码去出去半包问题,而底层采用epoll也是有问题,这些问题在多并发是可能出现,因为这些问题,所以出现netty,netty能快速开发出稳定的通信框架,所以spark/kafka都有netty。
·
BIO 是一种阻塞的io,主要体现在:
1)accept 时候或者客户端尝试连接时是阻塞的,
2)数据读写是阻塞的,即使是没有读到数据,而且每次都是读写一个字节。
对于服务端一般系统中常用的方式是没接收一个请求new 一个thread,然后由这个handler去处理,如果请求数目过多new 的thread 将会很多,有的时候会建立一个线程池,将由线程池管理。
如果线程过多,在线程中相互切换会消耗大量的性能,而实际上这些线程并没有做什么事情。同时因为重连接到数据读取结束是一个阻塞过程,如果网络,或者服务器性能问题,会极大消耗客户端的性能去等待服务端的返回结果。
基于以上问题,nio出来了,
Buffer
nio定义了很多的Buffer,buffer是一个对象,存储需要写入或者读出的数据。底层就是一个数组。
下面是一个位的buffer。
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{
//数组,数据直接放在堆区。
- final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
下面是个floatBuffer的定义的数组。
final float[] hb;
其分配方法:
ByteBuffer buffer = ByteBuffer.allocate(
10
);
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
直接调用,在这里new 的数组,
super(-1, 0, lim, cap, new byte[cap], 0);
channel
channel定义了一个管道(pile),用于数据读写,双向的,可以同时读写,其 实现可以分为两大类,FileChannel
(文件)
和SelectorChannel
(网络)
。nio 会用到ServiceSocketChannel 和SocketChannel。
1)ServerSocketChannel serverChannel = ServerSocketChannel.open();
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
这个SelectorProvider.provider() 会调用
provider = sun.nio.ch.DefaultSelectorProvider.create();
而这个就是
public static SelectorProvider create() {
return new WindowsSelectorProvider();
}
最后
openServerSocketChannel 会new 一个
ServerSocketChannelImpl
new ServerSocketChannelImpl(this);
这个会删除一个FileDescriptor
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
selector
选择器(多路复用器),通过channel注册个selector ,selector去查询是是否有channel准备好数据了,如果有数据准备好了,将启动新的线程处理。一般selector会轮询方式查询。
selector底层代码参考的openjdk。selector 同步open生存,
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
底层是一个
WindowsSelectorImpl
public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); }
底层会定义一个
pollWrapper 同时会启动一个管道pipe
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
selector.select();
底层主要操作
这个方法主要是查询channel是否准备好,如果准备就将准备好的channel写到publicSelectedKeys。
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
updateSelectedKeys最后会调用
processFDSet,将
SelectionKeyImpl 放入
publicSelectedKeys集合中,然后selector 通过迭代器遍历
SelectionKeyImpl,因为
SelectionKeyImpl包含channel信息,可以重channel读取数据。
private int processFDSet(long updateCount, int[] fds, int rOps,
boolean isExceptFds)
{
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
if (selectedKeys.contains(sk)) { // Key in selected set
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
//
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
}
分析到此结束。
优点,采用多路复用,而且因为采用buffer机制,当读写buffer时不需要阻塞。
nio 也有缺点,因为nio需要很多代码去出去半包问题,而底层采用epoll也是有问题,这些问题在多并发是可能出现,因为这些问题,所以出现netty,netty能快速开发出稳定的通信框架,所以spark/kafka都有netty。
推荐书netty 权威指南
更多推荐
已为社区贡献1条内容
所有评论(0)