asp.net core supersocket介绍以及源码分析( 二 )


protected virtual async Task ProcessReads(){var pipe = In;Task writing = FillPipeAsync(pipe.Writer);Task reading = ReadPipeAsync(pipe.Reader);await Task.WhenAll(reading, writing);}8当Pipe有数据写入后,通知Pipe读线程去解析数据,这里通知用的方法是
ManualResetValueTaskSourceCore,写线程写入数据后执行 _taskSourceCore.SetResult(target);
就会触发读线程去读,读的时候会根据你设置的协议模版去解析,这个过程会去处理粘包和拆包的过程,
因为Pipe是可以定向的从流中取部分数据的
内置的协议模板如下;
TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility) 
9 读取到的数据解析成packageInfo 后继续往下执行
await foreach (var p in packageChannel.RunAsync()){if(_packageHandlingContextAccessor!=null){_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);}await packageHandlingScheduler.HandlePackage(session, p);}10再执行到我们定义的command即可
ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package){return HandlePackage(session, PackageMapper.Map(package));}11 执行我们预制的command代码
 
[Command("add")]public class ADD : IAsyncCommand<StringPackageInfo>{public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package){var result = package.Parameters.Select(p => int.Parse(p)).Sum();await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "rn"));}}





推荐阅读