Socket请求轮询上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller的队列上,并可能唤醒了Selector,本小节就来看看,Poller是如何进行socket的轮询的 。
首先org.apache.tomcat.util.net.NioEndpoint.Poller也是实现了Runnable接口,是一个可以单独启动的线程
初始化及启动是在org.apache.tomcat.util.net.NioEndpoint#startInternal
重要的属性:
- JAVA.nio.channels.Selector:在Poller对象初始化的时候,就会启动轮询器
- SynchronizedQueue<PollerEvent>:同步的事件队列
public void run() {// Loop until destroy() is calledwhile (true) {boolean hasEvents = false;try {if (!close) {// 去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true// 如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为SelectionKey.OP_READ,添加处理函数attachment(为Accetpor添加到Poller时的// NioSocketWrapper)hasEvents = events();if (wakeupCounter.getAndSet(-1) > 0) {// If we are here, means we have other stuff to do// Do a non blocking selectkeyCount = selector.selectNow();} else {keyCount = selector.select(selectorTimeout);}wakeupCounter.set(0);}if (close) {events();timeout(0, false);try {selector.close();} catch (IOException ioe) {log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);}break;}// Either we timed out or we woke up, process events firstif (keyCount == 0) {hasEvents = (hasEvents | events());}} catch (Throwable x) {ExceptionUtils.handleThrowable(x);log.error(sm.getString("endpoint.nio.selectorLoopError"), x);continue;}Iterator<SelectionKey> iterator =keyCount > 0 ? selector.selectedKeys().iterator() : null;// Walk through the collection of ready keys and dispatch// any active event.// selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();// 获取到事件后,从迭代器删除事件,防止事件重复轮询iterator.remove();// 获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();// Attachment may be null if another thread has called// cancelledKey()if (socketWrapper != null) {processKey(sk, socketWrapper);}}// Process timeoutstimeout(keyCount,hasEvents);}getStopLatch().countDown();}在这里,有一个很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events(),他是从Poller的事件队列中获取Acceptor接收到的可用socket,并将其注册到Selector上/*** Processes events in the event queue of the Poller.** @return <code>true</code> if some events were processed,*<code>false</code> if queue was empty*/public boolean events() {boolean result = false;PollerEvent pe = null;// 如果Acceptor将socket添加到队列中,那么events.poll()方法就能拿到对应的事件,否则拿不到就返回falsefor (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {result = true;NioSocketWrapper socketWrapper = pe.getSocketWrapper();SocketChannel sc = socketWrapper.getSocket().getIOChannel();int interestOps = pe.getInterestOps();if (sc == null) {log.warn(sm.getString("endpoint.nio.nullSocketChannel"));socketWrapper.close();} else if (interestOps == OP_REGISTER) {// 如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTERtry {,// 将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrappersc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);} catch (Exception x) {log.error(sm.getString("endpoint.nio.registerFail"), x);}} else {// ??这里的逻辑,不清楚什么情况下会进入到这个分支里面final SelectionKey key = sc.keyFor(getSelector());if (key == null) {// The key was cancelled (e.g. due to socket closure)// and removed from the selector while it was being// processed. Count down the connections at this point// since it won't have been counted down when the socket// closed.socketWrapper.close();} else {final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();if (attachment != null) {// We are registering the key to start with, reset the fairness counter.try {int ops = key.interestOps() | interestOps;attachment.interestOps(ops);key.interestOps(ops);} catch (CancelledKeyException ckx) {cancelledKey(key, socketWrapper);}} else {cancelledKey(key, socketWrapper);}}}if (running && !paused && eventCache != null) {pe.reset();eventCache.push(pe);}}return result;}还有一个重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase,扔到线程池里开始处理
推荐阅读
- 红茶的产地和特征,功夫红茶的质量特征
- 百色古树红茶如何,古树红茶水温多少
- 红茶好坏如何区分,川红茶怎么样
- 深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力
- 如何让代码管理更简单高效?
- 架构师自诉:如何做到百万数据半小时跑批结束
- 红茶保存方法,红茶老茶如何保存方法
- 红茶茶的制作,红茶的红汤如何形成
- 如何评价宋太宗赵光义?宋太宗赵光义的后代在哪里_1
- 喝红茶失眠了如何解,7茶网红茶
