BIO 是一种阻塞的io,主要体现在:
1)accept 时候或者客户端尝试连接时是阻塞的,
2)数据读写是阻塞的,即使是没有读到数据,而且每次都是读写一个字节。

对于服务端一般系统中常用的方式是没接收一个请求new 一个thread,然后由这个handler去处理,如果请求数目过多new 的thread 将会很多,有的时候会建立一个线程池,将由线程池管理。
如果线程过多,在线程中相互切换会消耗大量的性能,而实际上这些线程并没有做什么事情。同时因为重连接到数据读取结束是一个阻塞过程,如果网络,或者服务器性能问题,会极大消耗客户端的性能去等待服务端的返回结果。

基于以上问题,nio出来了,

Buffer
nio定义了很多的Buffer,buffer是一个对象,存储需要写入或者读出的数据。底层就是一个数组。
下面是一个位的buffer。
   
   
  1. public abstract class ByteBuffer
  2. extends Buffer
  3. implements Comparable<ByteBuffer>
  4. {
  5. //数组,数据直接放在堆区。
  6. final byte[] hb; // Non-null only for heap buffers
  7. final int offset;
  8. boolean isReadOnly; // Valid only for heap buffers
  9. ByteBuffer(int mark, int pos, int lim, int cap, // package-private
  10. byte[] hb, int offset)
  11. {
  12. super(mark, pos, lim, cap);
  13. this.hb = hb;
  14. this.offset = offset;
  15. }
下面是个floatBuffer的定义的数组。
final float[] hb;
其分配方法:   ByteBuffer buffer = ByteBuffer.allocate( 10 ); 
   
   
  1. public static ByteBuffer allocate(int capacity) {
  2. if (capacity < 0)
  3. throw new IllegalArgumentException();
  4. return new HeapByteBuffer(capacity, capacity);
  5. }
直接调用,在这里new 的数组,
    
    
  1. super(-1, 0, lim, cap, new byte[cap], 0);

channel
channel定义了一个管道(pile),用于数据读写,双向的,可以同时读写,其 实现可以分为两大类,FileChannel (文件) 和SelectorChannel (网络) 。nio 会用到ServiceSocketChannel 和SocketChannel。
1)ServerSocketChannel serverChannel = ServerSocketChannel.open();  
   
   
  1. public static ServerSocketChannel open() throws IOException {
  2. return SelectorProvider.provider().openServerSocketChannel();
  3. }
这个SelectorProvider.provider() 会调用
   
   
  1. provider = sun.nio.ch.DefaultSelectorProvider.create();
而这个就是
   
   
  1. public static SelectorProvider create() {
  2. return new WindowsSelectorProvider();
  3. }
最后 openServerSocketChannel 会new 一个 ServerSocketChannelImpl
   
   
  1. new ServerSocketChannelImpl(this);

这个会删除一个FileDescriptor
   
   
  1. ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
  2. super(var1);
  3. this.fd = Net.serverSocket(true);
  4. this.fdVal = IOUtil.fdVal(this.fd);
  5. this.state = 0;
  6. }
selector
选择器(多路复用器),通过channel注册个selector ,selector去查询是是否有channel准备好数据了,如果有数据准备好了,将启动新的线程处理。一般selector会轮询方式查询。

selector底层代码参考的openjdk。selector 同步open生存,
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
底层是一个 WindowsSelectorImpl
public AbstractSelector openSelector() throws IOException {
    return new WindowsSelectorImpl(this);
}
底层会定义一个 pollWrapper 同时会启动一个管道pipe
   
   
  1. WindowsSelectorImpl(SelectorProvider sp) throws IOException {
  2. super(sp);
  3. pollWrapper = new PollArrayWrapper(INIT_CAP);
  4. wakeupPipe = Pipe.open();
  5. wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
  6. // Disable the Nagle algorithm so that the wakeup is more immediate
  7. SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
  8. (sink.sc).socket().setTcpNoDelay(true);
  9. wakeupSinkFd = ((SelChImpl)sink).getFDVal();
  10. pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
  11. }
selector.select();
底层主要操作
这个方法主要是查询channel是否准备好,如果准备就将准备好的channel写到publicSelectedKeys。
   
   
  1. protected int doSelect(long timeout) throws IOException {
  2. if (channelArray == null)
  3. throw new ClosedSelectorException();
  4. this.timeout = timeout; // set selector timeout
  5. processDeregisterQueue();
  6. if (interruptTriggered) {
  7. resetWakeupSocket();
  8. return 0;
  9. }
  10. // Calculate number of helper threads needed for poll. If necessary
  11. // threads are created here and start waiting on startLock
  12. adjustThreadsCount();
  13. finishLock.reset(); // reset finishLock
  14. // Wakeup helper threads, waiting on startLock, so they start polling.
  15. // Redundant threads will exit here after wakeup.
  16. startLock.startThreads();
  17. // do polling in the main thread. Main thread is responsible for
  18. // first MAX_SELECTABLE_FDS entries in pollArray.
  19. try {
  20. begin();
  21. try {
  22. subSelector.poll();
  23. } catch (IOException e) {
  24. finishLock.setException(e); // Save this exception
  25. }
  26. // Main thread is out of poll(). Wakeup others and wait for them
  27. if (threads.size() > 0)
  28. finishLock.waitForHelperThreads();
  29. } finally {
  30. end();
  31. }
  32. // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
  33. finishLock.checkForException();
  34. processDeregisterQueue();
  35. int updated = updateSelectedKeys();
  36. // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
  37. resetWakeupSocket();
  38. return updated;
  39. }

updateSelectedKeys最后会调用 processFDSet,将 SelectionKeyImpl 放入 publicSelectedKeys集合中,然后selector 通过迭代器遍历 SelectionKeyImpl,因为 SelectionKeyImpl包含channel信息,可以重channel读取数据。
   
   
  1. private int processFDSet(long updateCount, int[] fds, int rOps,
  2. boolean isExceptFds)
  3. {
  4. int numKeysUpdated = 0;
  5. for (int i = 1; i <= fds[0]; i++) {
  6. int desc = fds[i];
  7. if (desc == wakeupSourceFd) {
  8. synchronized (interruptLock) {
  9. interruptTriggered = true;
  10. }
  11. continue;
  12. }
  13. MapEntry me = fdMap.get(desc);
  14. // If me is null, the key was deregistered in the previous
  15. // processDeregisterQueue.
  16. if (me == null)
  17. continue;
  18. SelectionKeyImpl sk = me.ski;
  19. // The descriptor may be in the exceptfds set because there is
  20. // OOB data queued to the socket. If there is OOB data then it
  21. // is discarded and the key is not added to the selected set.
  22. if (isExceptFds &&
  23. (sk.channel() instanceof SocketChannelImpl) &&
  24. discardUrgentData(desc))
  25. {
  26. continue;
  27. }
  28. if (selectedKeys.contains(sk)) { // Key in selected set
  29. if (me.clearedCount != updateCount) {
  30. if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
  31. (me.updateCount != updateCount)) {
  32. me.updateCount = updateCount;
  33. numKeysUpdated++;
  34. }
  35. } else { // The readyOps have been set; now add
  36. if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
  37. (me.updateCount != updateCount)) {
  38. me.updateCount = updateCount;
  39. numKeysUpdated++;
  40. }
  41. }
  42. me.clearedCount = updateCount;
  43. } else { // Key is not in selected set yet
  44. if (me.clearedCount != updateCount) {
  45. sk.channel.translateAndSetReadyOps(rOps, sk);
  46. if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
  47.                             //
  48. selectedKeys.add(sk);
  49. me.updateCount = updateCount;
  50. numKeysUpdated++;
  51. }
  52. } else { // The readyOps have been set; now add
  53. sk.channel.translateAndUpdateReadyOps(rOps, sk);
  54. if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
  55. selectedKeys.add(sk);
  56. me.updateCount = updateCount;
  57. numKeysUpdated++;
  58. }
  59. }
  60. me.clearedCount = updateCount;
  61. }
  62. }
  63. return numKeysUpdated;
  64. }
  65. }

分析到此结束。

优点,采用多路复用,而且因为采用buffer机制,当读写buffer时不需要阻塞。
nio 也有缺点,因为nio需要很多代码去出去半包问题,而底层采用epoll也是有问题,这些问题在多并发是可能出现,因为这些问题,所以出现netty,netty能快速开发出稳定的通信框架,所以spark/kafka都有netty。

推荐书netty 权威指南


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐