引领先锋|携程基于Quasar协程的NIO实践( 三 )


2.2 声明挂起方法
Quasar需要织入字节码接管挂起方法的调度 , 在项目主pom下添加quasar-maven-plugin插件 , 该插件将在编译后的class文件中修改字节码 。
com.vlkanquasar-maven-plugin0.7.9instrumentQuasar通过识别方法是否抛出了该框架定义的SuspendExecution异常决定是否修改字节码 。 Quasar框架在AsyncCompletionStage.get方法上声明了SuspendExceution异常 , 该异常是捕获异常 , 但仅作为识别挂起方法的声明 , 在运行时不会实际抛出 。 使用者必须逐层抛出该异常直至新建协程的一层 。 当方法内部存在try/catch语句时 , 也必须抛出该异常 。
public void startFiber() throws ExecutionException, InterruptedException {Fiber fiber = new Fiber(() -> {//不用继续抛出异常Response response = waitNextLayer1();deal(response);}).start();}private Response waitNextLayer1() throws SuspendExecution {return waitNextLayer2();}private Response waitNextLayer2() throws SuspendExecution {CompletableFuture future = httpClient.executeRequest(request).toCompletableFuture();try {// Quasar框架工具类抛出SuspendExecutionreturn AsyncCompletionStage.get(future);} catch (Exception e) {return null;}}2.3 异步RPC调用
目前主流的RPC框架都基于NIO实现 , 支持异步回调 , 有的RPC框架已经直接提供了返回CompletableFuture或ListenableFuture(Guava工具类提供)的异步接口 , 通过使用ComplatableFuture , 可以按前文类似的方法将Quasar与RPC框架结合起来 。 当RPC框架没有该返回类型时 , 一般会提供如下类似的带泛型的异步回调接口:
interface Callback {void callback(TResponse TResponse, Exception e);}这种情况 , 可以使用者自己创建ComplatableFuture , 在回调中设置其状态 , 并调用AsyncCompletionStage.get等待这个future 。
CompletableFuture future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback() {public void callback(Response response, Exception e) {if (e == null) future.complete(response);else future.completeExceptionally(e);}});//在此处调用Quasar的API , 挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);上述代码依然具有异步回调不直观的缺点 , 通过JDK8的函数式接口可以实现一个通用的调用模板 , 将异步回调变为同步等待的形式 。
@FunctionalInterfaceprivate interface RpcAsyncCall {void request(TRequest request, Callback callback);}publicTResponse waitRpc(RpcAsyncCall call, TRequest request) throws SuspendExecution {CompletableFuture future = new CompletableFuture<>();call.request(request, (response, e) -> {if (e == null) future.complete(response);else future.completeExceptionally(e);});try {//使用Quasar等待Future结果return AsyncCompletionStage.get(future);} catch (Exception e) {return null;}}最后的调用可简化一行代码 , 该方法适用于所有该Rpc框架提供的异步接口 。
Response response= waitRpc(new RpcClient()::helloAsync, request);2.4 阻塞操作的处理
Quasar协程使用的时候有一定的限制 , 由于调度器线程池大小固定 , 在协程中不能阻塞线程 , 执行线程将被占用 。 对于某些暂时只能依靠阻塞IO的调用 , 如数据库 , 消息队列等 , 无法使用协程等待其结果 , 当这些阻塞操作量不大的情况下 , 可使用另一个可伸缩的线程池等待结果 , 避免对协程调度器的影响 。


推荐阅读