protected virtual async Task ProcessReads(){var pipe = In;Task writing = FillPipeAsync(pipe.Writer);Task reading = ReadPipeAsync(pipe.Reader);await Task.WhenAll(reading, writing);}8当Pipe有数据写入后,通知Pipe读线程去解析数据,这里通知用的方法是
ManualResetValueTaskSourceCore,写线程写入数据后执行 _taskSourceCore.SetResult(target);
就会触发读线程去读,读的时候会根据你设置的协议模版去解析,这个过程会去处理粘包和拆包的过程,
因为Pipe是可以定向的从流中取部分数据的
内置的协议模板如下;
TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility)
9 读取到的数据解析成packageInfo 后继续往下执行
await foreach (var p in packageChannel.RunAsync()){if(_packageHandlingContextAccessor!=null){_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);}await packageHandlingScheduler.HandlePackage(session, p);}10再执行到我们定义的command即可
ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package){return HandlePackage(session, PackageMapper.Map(package));}11 执行我们预制的command代码
[Command("add")]public class ADD : IAsyncCommand<StringPackageInfo>{public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package){var result = package.Parameters.Select(p => int.Parse(p)).Sum();await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "rn"));}}
推荐阅读
- asp.net core源码如何实现监听Http请求,分析Kestrel看一下过程
- 基于 CoreDNS 和 K8s 构建云原生场景下的企业级 DNS
- CoreDNS粗解
- 一个适合于.NET Core的超轻量级工作流引擎:Workflow-Core
- CDR教程—教你如何使用CorelDRAW文字排版技巧 cdr入门教程cdr排版教程
- HMS Core音频编辑服务,助力快速进入3D音频的世界
- 微软|.NET Core 3.1正式结束使命:微软官宣于年底终止支持
- HMS Core FIDO无密码的身份验证,比密码更安全?
- ARM|ARM发布Coretx-X3 /A715/A510 CPU:最大12核、全面迈向64位
- hmscore是什么意思?可以关闭吗?
