控件中国网现已改版,您看到的是老版本网站的镜像,系统正在为您跳转到新网站首页,请稍候.......
中国最专业的商业控件资讯网产品咨询电话:023-67870900 023-67871946
产品咨询EMAIL:SALES@COMPONENTCN.COM

C#实现线程安全的网络流

作者:佚名 出处:互联网 2011年11月09日 阅读:

C#实现线程安全的网络流 

一.线程安全

如果一个类是线程安全的,表明该类的任何一个实例是线程安全的。也就是说,在多线程的环境中,对任何一个这样的实例的并发访问将被自动同步。由于,同步是由类自己完成的,所以,类的使用者不需要再使用额外的同步机制。

二.线程安全网络流的意义

按照前面讲述的通信的一般模式,我们也许觉得NetworkStream不需要线程安全机制,因为好像其总是只被单线程访问。简单的情况下确实如此。但是,更常见的需求是,NetworkStream必须支持并发访问。

设想这样一个场景,当一个Client通过Tcp连接上Server后,需要每隔一分钟定时向服务器发送一个Check消息,以表明自己还在线上,这通常是在一个后台线程中完成的。假设某个时刻,主线程正在向Server发送某个功能请求,而同时,后台线程也在发送Check消息。如果网络流是非线程安全的,那么Server端接收到的将是一堆无法解析的数据。有两种方法可以解决这个问题,一是仍然使用System.Net.Sockets.NetworkStream类,只不过由该类的使用者提供同步机制,就像这样,每次调用的NetworkStream方法时先lock:

lock(this.networkStream)
{

    this.networkStream.Write(buffer ,offset ,size ) ;
}     从客户端开发者的角度来看,这个方法是可以勉强一用的。但是,如果你是一个服务器开发者了,你需要管理的可能是成千上万个NetworkStream,并且,任何时刻都可能有新的NetworkStream建立,也会有NetworkStream被销毁。在这种情况下,仍然使用第一种方法将会死的很难看!
      “所有的软件问题都可以通过引入一个间接层来得到解决”,这是我经常引用的一句话,它可以导出上述问题的第二种解决方案,那就是设计一个线程安全的网络流类SafeNetworkStream,它将自动同步所有的并发访问,它把所有分散的同步代码(比如,lock、Monitor等)全部简化并集中在自己的身体里,极大的方便了使用者。

三.线程安全的网络流的接口与实现

线程安全的网络流的接口ISafeNetworkStream中的方法覆盖了类NetworkStream的几个常用方法,在我们的通信框架中,这个接口中的内容已经够我们使用了。ISafeNetworkStream接口定义如下:

/// <summary>
    /// INetworkStreamSafe 线程安全的网络流 。
    /// 注意:如果调用的异步的begin方法,就一定要调用对应的End方法,否则锁将得不到释放。
    /// 作者:朱伟 sky.zhuwei@163.com
    /// 2005.04.22
    /// </summary>
   
    //用于在TCP连接上发送数据,支持同步和异步
    public interface ITcpSender
    {                                                     
        void Write(byte[] buffer ,int offset ,int size) ;
        IAsyncResult BeginWrite(byte[] buffer, int offset, int size, AsyncCallback callback, object state );
        void EndWrite(IAsyncResult asyncResult    );
    }

    //用于在TCP连接上接收数据,支持同步和异步
    public interface ITcpReciever
    {
        int Read (byte[] buffer ,int offset ,int size) ;
        IAsyncResult BeginRead(    byte[] buffer, int offset, int size, AsyncCallback callback, object state );
        int EndRead(IAsyncResult asyncResult );
    }

    public interface ISafeNetworkStream :ITcpSender ,ITcpReciever
    {       
        void Flush();
        void Close() ;   
   
        NetworkStream NetworkStream{get ;}
    }        接口中各方法的含义与NetworkStream类的方法含义相同。从接口的NetworkStream属性可以推断出,SafeNetworkStream是对NetworkStream的一层薄薄的封装,在这封装的夹层中存在的就是同步机制。
      ISafeNetworkStream接口的实现SafeNetworkStream主要是通过lock来进行同步处理的,其实现代码如下:    整个实现中,没有什么难的,所以就不多费口舌了。

    //SafeNetworkStream
    public class SafeNetworkStream : ISafeNetworkStream
    ...{
        private System.Net.Sockets.NetworkStream stream = null;
        private object lockForRead = null ;
        private object lockForWrite = null ;
        private volatile bool isReading = false ;
        private volatile bool isWriting = false ;
        private int retryCount = 10 ;       

        public SafeNetworkStream(NetworkStream netStream)
        ...{
            this.stream = netStream ;
            if(netStream == null)
            ...{
                throw new NullReferenceException("the wrapped netStream is null in SafeNetworkStream's ctor !") ;
            }

            this.lockForRead = this ;
            this.lockForWrite = this.stream ;           
        }

        StartReadAction ,EndReadAction ,StartWriteAction ,EndWriteAction#region StartReadAction ,EndReadAction ,StartWriteAction ,EndWriteAction
        private bool StartReadAction()
        ...{
            lock(this.lockForRead)
            ...{
                if(this.isReading)
                ...{
                    return false ;
                }

                this.isReading = true ;
                return true ;
            }
        }

        private void EndReadAction()
        ...{
            lock(this.lockForRead)
            ...{
                this.isReading = false ;
            }
        }

        private bool StartWriteAction()
        ...{
            lock(this.lockForWrite)
            ...{
                if(this.isWriting)
                ...{
                    return false ;
                }

                this.isWriting = true ;
                return true ;
            }
        }

        private void EndWriteAction()
        ...{
            lock(this.lockForWrite)
            ...{
                this.isWriting = false ;
            }
        }
        #endregion

        ISafeNetworkStream 成员#region ISafeNetworkStream 成员

        Write ,BeginWrite ,EndWrite#region Write ,BeginWrite ,EndWrite
        public void Write(byte[] buffer, int offset, int size)
        ...{
            int count = 0 ;
            while(! this.StartWriteAction())
            ...{
                if(count >= this.retryCount)
                ...{
                    return ;
                }
                Thread.Sleep(200) ;
                ++ count ;
            }           
           
            this.stream.Write(buffer ,offset ,size ) ;

            this.EndWriteAction() ;           
        }

        public IAsyncResult BeginWrite(byte[] buffer, int offset, int size, System.AsyncCallback callback, object state)
        ...{
            int count = 0 ;
            while(! this.StartWriteAction())
            ...{
                if(count >= this.retryCount)
                ...{
                    return null ;
                }
                Thread.Sleep(200) ;
                ++ count ;
            }

            return this.stream.BeginWrite(buffer, offset, size, callback, state) ;
           
        }

        public void EndWrite(IAsyncResult asyncResult)
        ...{
            this.EndWrite(asyncResult) ;
            this.EndWriteAction() ;
        }
        #endregion

        Read ,BeginRead ,EndRead#region Read ,BeginRead ,EndRead
        public int Read(byte[] buffer, int offset, int size)
        ...{
            int count = 0 ;
            while(! this.StartReadAction())
            ...{
                if(count >= this.retryCount)
                ...{
                    return -1 ;
                }
                Thread.Sleep(200) ;
                ++ count ;
            }

            int readCount = this.stream.Read(buffer ,offset ,size ) ;
           
            this.EndReadAction() ;

            return readCount ;
        }

        public IAsyncResult BeginRead(byte[] buffer, int offset, int size, System.AsyncCallback callback, object state)
        ...{
            int count = 0 ;
            while(! this.StartReadAction())
            ...{
                if(count >= this.retryCount)
                ...{
                    return null ;
                }
                Thread.Sleep(200) ;
                ++ count ;
            }

            return this.stream.BeginRead(buffer, offset, size, callback, state) ;
        }

        public int EndRead(IAsyncResult asyncResult)
        ...{
            int count = this.stream.EndRead(asyncResult) ;
            this.EndReadAction() ;

            return count ;
        }   
        #endregion

        Flush ,Close#region Flush ,Close
        public void Flush()
        ...{
            this.stream.Flush() ;
        }

        public void Close()
        ...{
            this.stream.Close() ;
        }
        #endregion

        property#region property
        public NetworkStream NetworkStream
        ...{
            get
            ...{
                return this.stream ;
            }
        }
        #endregion

        #endregion
    }  

热推产品

  • ActiveReport... 强大的.NET报表设计、浏览、打印、转换控件,可以同时用于WindowsForms谀坔攀戀Forms平台下......
  • AnyChart AnyChart使你可以创建出绚丽的交互式的Flash和HTML5的图表和仪表控件。可以用于仪表盘的创......
首页 | 新闻中心 | 产品中心 | 技术文档 | 友情连接 | 关于磐岩 | 技术支持中心 | 联系我们 | 帮助中心 Copyright-2006 ComponentCN.com all rights reserved.重庆磐岩科技有限公司(控件中国网) 版权所有 电话:023 - 67870900 传真:023 - 67870270 产品咨询:sales@componentcn.com 渝ICP备12000264号 法律顾问:元炳律师事务所 重庆市江北区塔坪36号维丰创意绿苑A座28-5 邮编:400020
在线客服
在线客服系统
在线客服
在线客服系统