protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {try {if (close) {cancelledKey(sk, socketWrapper);} else if (sk.isValid()) {if (sk.isReadable() || sk.isWritable()) {if (socketWrapper.getSendfileData() != null) {processSendfile(sk, socketWrapper, false);} else {unreg(sk, socketWrapper, sk.readyOps());boolean closeSocket = false;// Read goes before writeif (sk.isReadable()) {//这里如果是异步的操作,就会走这里if (socketWrapper.readOperation != null) {if (!socketWrapper.readOperation.process()) {closeSocket = true;}} else if (socketWrapper.readBlocking) {// readBlocking默认为falsesynchronized (socketWrapper.readLock) {socketWrapper.readBlocking = false;socketWrapper.readLock.notify();}} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {// 处理正常的事件,这里的processSocket就要正式开始处理请求了 。// 将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务closeSocket = true;}}if (!closeSocket && sk.isWritable()) {if (socketWrapper.writeOperation != null) {if (!socketWrapper.writeOperation.process()) {closeSocket = true;}} else if (socketWrapper.writeBlocking) {synchronized (socketWrapper.writeLock) {socketWrapper.writeBlocking = false;socketWrapper.writeLock.notify();}} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {closeSocket = true;}}if (closeSocket) {cancelledKey(sk, socketWrapper);}}}} else {// Invalid keycancelledKey(sk, socketWrapper);}} catch (CancelledKeyException ckx) {cancelledKey(sk, socketWrapper);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(sm.getString("endpoint.nio.keyProcessingError"), t);}}请求具体处理上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理 。本节就是从工作线程封装开始,org.apache.tomcat.util.net.SocketProcessorBase为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例
以下为org.apache.tomcat.util.net.AbstractEndpoint#processSocket方法源码
/*** Process the given SocketWrapper with the given status. Used to trigger* processing as if the Poller (for those endpoints that have one)* selected the socket.** @param socketWrapper The socket wrapper to process* @param eventThe socket event to be processed* @param dispatchShould the processing be performed on a new*container thread** @return if processing was triggered successfully*/public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {try {if (socketWrapper == null) {return false;}// 优先使用已经存在的线程SocketProcessorBase<S> sc = null;if (processorCache != null) {sc = processorCache.pop();}if (sc == null) {sc = createSocketProcessor(socketWrapper, event);} else {sc.reset(socketWrapper, event);}// 获取线程池 。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建// tomcat使用了自定义的org.apache.tomcat.util.threads.TaskQueue,这块tomcat也进行了小的适配开发// 核心线程为10个,最大200线程Executor executor = getExecutor();if (dispatch && executor != null) {executor.execute(sc);} else {sc.run();}} catch (RejectedExecutionException ree) {getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);return false;} catch (Throwable t) {ExceptionUtils.handleThrowable(t);// This means we got an OOM or similar creating a thread, or that// the pool and its queue are fullgetLog().error(sm.getString("endpoint.process.fail"), t);return false;}return true;}上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类org.apache.tomcat.util.net.NioEndpoint.SocketProcessor继承了这个抽象类,也就是具体的业务处理逻辑在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun方法中,最终调用到我们的Servlet
protected void doRun() {/** Do not cache and re-use the value of socketWrapper.getSocket() in* this method. If the socket closes the value will be updated to* CLOSED_NIO_CHANNEL and the previous value potentially re-used for* a new connection. That can result in a stale cached value which* in turn can result in unintentionally closing currently active* connections.*/Poller poller = NioEndpoint.this.poller;if (poller == null) {socketWrapper.close();return;}try {int handshake = -1;try {// 握手相关判断逻辑...} catch (IOException x) {...}// 三次握手成功了if (handshake == 0) {SocketState state = SocketState.OPEN;// Process the request from this socket// event为SocketEvent.OPEN_READ,这个变量是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法赋值if (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {// 这里就开始正式处理请求了state = getHandler().process(socketWrapper, event);}if (state == SocketState.CLOSED) {poller.cancelledKey(getSelectionKey(), socketWrapper);}} else if (handshake == -1 ) {getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);poller.cancelledKey(getSelectionKey(), socketWrapper);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {poller.cancelledKey(getSelectionKey(), socketWrapper);} catch (VirtualmachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error(sm.getString("endpoint.processing.fail"), t);poller.cancelledKey(getSelectionKey(), socketWrapper);} finally {socketWrapper = null;event = null;//return to cacheif (running && !paused && processorCache != null) {processorCache.push(this);}}}
推荐阅读
- 红茶的产地和特征,功夫红茶的质量特征
- 百色古树红茶如何,古树红茶水温多少
- 红茶好坏如何区分,川红茶怎么样
- 深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力
- 如何让代码管理更简单高效?
- 架构师自诉:如何做到百万数据半小时跑批结束
- 红茶保存方法,红茶老茶如何保存方法
- 红茶茶的制作,红茶的红汤如何形成
- 如何评价宋太宗赵光义?宋太宗赵光义的后代在哪里_1
- 喝红茶失眠了如何解,7茶网红茶
