博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
C#高性能大容量SOCKET并发(三):接收、发送
阅读量:7199 次
发布时间:2019-06-29

本文共 7396 字,大约阅读时间需要 24 分钟。

原文:

异步数据接收有可能收到的数据不是一个完整包,或者接收到的数据超过一个包的大小,因此我们需要把接收的数据进行缓存。异步发送我们也需要把每个发送的包加入到一个队列,然后通过队列逐个发送出去,如果每个都实时发送,有可能造成上一个数据包未发送完成,这时再调用SendAsync会抛出异常,提示SocketAsyncEventArgs正在进行异步操作,因此我们需要建立接收缓存和发送缓存。

接收

通过Completed事件响应后调用AsyncSocketInvokeElement.ProcessReceive,在ProcessReceive中,我们把收到数据先写入一个缓存,然后进行分包,分包后压给包处理函数ProcessPacket,ProcessPacket函数然后调用ProcessCommand处理具体的命令,也是各个协议实现业务逻辑的地方,具体代码如下:

public virtual bool ProcessReceive(byte[] buffer, int offset, int count) //接收异步事件返回的数据,用于对数据进行缓存和分包        {            m_activeDT = DateTime.UtcNow;            DynamicBufferManager receiveBuffer = m_asyncSocketUserToken.ReceiveBuffer;            receiveBuffer.WriteBuffer(buffer, offset, count);            if (receiveBuffer.DataCount > sizeof(int))            {                //按照长度分包                int packetLength = BitConverter.ToInt32(receiveBuffer.Buffer, 0); //获取包长度                if (NetByteOrder)                    packetLength = System.Net.IPAddress.NetworkToHostOrder(packetLength); //把网络字节顺序转为本地字节顺序                if ((packetLength > 10 * 1024 * 1024) | (receiveBuffer.DataCount > 10 * 1024 * 1024)) //最大Buffer异常保护                    return false;                if ((receiveBuffer.DataCount - sizeof(int)) >= packetLength) //收到的数据达到包长度                {                    bool result = ProcessPacket(receiveBuffer.Buffer, sizeof(int), packetLength);                    if (result)                        receiveBuffer.Clear(packetLength + sizeof(int)); //从缓存中清理                    return result;                }                else                {                    return true;                }            }            else            {                return true;            }        }        public virtual bool ProcessPacket(byte[] buffer, int offset, int count) //处理分完包后的数据,把命令和数据分开,并对命令进行解析        {            if (count < sizeof(int))                return false;            int commandLen = BitConverter.ToInt32(buffer, offset); //取出命令长度            string tmpStr = Encoding.UTF8.GetString(buffer, offset + sizeof(int), commandLen);            if (!m_incomingDataParser.DecodeProtocolText(tmpStr)) //解析命令              return false;            return ProcessCommand(buffer, offset + sizeof(int) + commandLen, count - sizeof(int) - commandLen); //处理命令        }        public virtual bool ProcessCommand(byte[] buffer, int offset, int count) //处理具体命令,子类从这个方法继承,buffer是收到的数据        {            return true;        }

发送

通过Completed事件响应后调用AsyncSocketInvokeElement.SendCompleted,在SendCompleted中我们需要在队列中清除已发送的包,并检测是否还有剩余需要发送的数据包,如果有,则继续发送,具体实现如下:

public virtual bool SendCompleted()        {            m_activeDT = DateTime.UtcNow;            m_sendAsync = false;            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;            asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包            int offset = 0;            int count = 0;            if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count))            {                m_sendAsync = true;                return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,                    asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count);            }            else                return SendCallback();        }        //发送回调函数,用于连续下发数据        public virtual bool SendCallback()        {            return true;        }
在AsyncSocketInvokeElement中提供函数给子类发送数据,业务逻辑是把当前数据包写入缓存,并检测当前是否正在发送包,如果正在发送,则等待回调,如果没有正在发送的数据包,则投递发送请求。

public bool DoSendResult()        {            string commandText = m_outgoingDataAssembler.GetProtocolText();            byte[] bufferUTF8 = Encoding.UTF8.GetBytes(commandText);            int totalLength = sizeof(int) + bufferUTF8.Length; //获取总大小            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;            asyncSendBufferManager.StartPacket();            asyncSendBufferManager.DynamicBufferManager.WriteInt(totalLength, false); //写入总大小            asyncSendBufferManager.DynamicBufferManager.WriteInt(bufferUTF8.Length, false); //写入命令大小            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(bufferUTF8); //写入命令内容            asyncSendBufferManager.EndPacket();            bool result = true;            if (!m_sendAsync)            {                int packetOffset = 0;                int packetCount = 0;                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))                {                    m_sendAsync = true;                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,                         asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);                }                            }            return result;        }        public bool DoSendResult(byte[] buffer, int offset, int count)        {            string commandText = m_outgoingDataAssembler.GetProtocolText();            byte[] bufferUTF8 = Encoding.UTF8.GetBytes(commandText);            int totalLength = sizeof(int) + bufferUTF8.Length + count; //获取总大小            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;            asyncSendBufferManager.StartPacket();            asyncSendBufferManager.DynamicBufferManager.WriteInt(totalLength, false); //写入总大小            asyncSendBufferManager.DynamicBufferManager.WriteInt(bufferUTF8.Length, false); //写入命令大小            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(bufferUTF8); //写入命令内容            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(buffer, offset, count); //写入二进制数据            asyncSendBufferManager.EndPacket();            bool result = true;            if (!m_sendAsync)            {                int packetOffset = 0;                int packetCount = 0;                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))                {                    m_sendAsync = true;                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,                         asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);                }            }            return result;        }        public bool DoSendBuffer(byte[] buffer, int offset, int count)        {            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;            asyncSendBufferManager.StartPacket();            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(buffer, offset, count);            asyncSendBufferManager.EndPacket();            bool result = true;            if (!m_sendAsync)            {                int packetOffset = 0;                int packetCount = 0;                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))                {                    m_sendAsync = true;                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,                         asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);                }            }            return result;        }
DEMO下载地址:

免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

转载地址:http://ujcum.baihongyu.com/

你可能感兴趣的文章
轻量级开源嵌入式关系数据库sqlite基本使用及接口初识
查看>>
Hibernate Criterion
查看>>
自定义定时组件
查看>>
当开发者产生一个伟大的想法之后应该做的10件事
查看>>
高仿微信5.2.1主界面架构 包含消息通知
查看>>
Struts2
查看>>
spftlayer 安装及简单使用
查看>>
桶排序
查看>>
URL中“#” “?” &“”号的作用
查看>>
指针强转和void*
查看>>
第40周三国庆
查看>>
Windows下FFmpeg快速入门
查看>>
RelativeLayout经常使用属性介绍
查看>>
图像处理——灰度化、二值化、膨胀算法、腐蚀算法以及开运算和闭运算
查看>>
Android 中PopupWindow使用 (转)
查看>>
Java-Junit 的Hello world
查看>>
文本文件与二进制文件区别
查看>>
Linux 基础命令
查看>>
获取项目
查看>>
Listview 异步加载图片之优化篇(有图有码有解释)
查看>>