1. 五、CLR 线程池的 I/O 线程

在前一节所介绍的线程都属于 CLR 线程池的工作者线程,这一节开始为大家介绍一下 CLR 线程池的 I/O 线程

I/O 线程是.NET 专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET 为多个 I/O 操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService 等等,而且每个异步方法的使用方式都非常类似,都是以 BeginXXX 为开始,以 EndXXX 结束,下面为大家一一解说。

1.1.1. 5.1 异步读写 FileStream

需要在 FileStream 异步调用 I/O 线程,必须使用以下构造函数建立 FileStream 对象,并把 useAsync 设置为 true。

FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;

其中 path 是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为 8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync 代表是否启动异步 I/O 线程。

注意:当使用 BeginRead 和 BeginWrite 方法在执行大量读或写时效果更好,但对于少量的读/写,这些方法速度可能比同步读取还要慢,因为进行线程间的切换需要大量时间。

1.1.2. 5.1.1 异步写入

FileStream 中包含 BeginWrite、EndWrite 方法可以启动 I/O 线程进行异步写入。

public override IAsyncResult BeginWrite (byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
public override void EndWrite (IAsyncResult asyncResult)

BeginWrite 返回值为 IAsyncResult, 使用方式与委托的 BeginInvoke 方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数 AsyncCallback 用于绑定回调函数; 参数 Object 用于传递外部数据。要注意一点:AsyncCallback 所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。

在例子中,把 FileStream 作为外部数据传递到回调函数当中,然后在回调函数中利用 IAsyncResult.AsyncState 获取 FileStream 对象,最后通过 FileStream.EndWrite(IAsyncResult)结束写入。

class Program
{
    static void Main(string[] args)
    {
        //把线程池的最大值设置为1000
        ThreadPool.SetMaxThreads(1000, 1000);
        ThreadPoolMessage("Start");
        //新立文件File.sour
        FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,
                                  FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
        byte[] bytes = new byte[16384];
        string message = "An operating-system ThreadId has no fixed relationship........";
        bytes = Encoding.Unicode.GetBytes(message);
        //启动异步写入
        stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
        stream.Flush();

        Console.ReadKey();
    }
    static void Callback(IAsyncResult result)
    {
        //显示线程池现状
        Thread.Sleep(200);
        ThreadPoolMessage("AsyncCallback");
        //结束异步写入
        FileStream stream = (FileStream)result.AsyncState;
        stream.EndWrite(result);
        stream.Close();
    }
    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

由输出结果可以看到,在使用 FileStream.BeginWrite 方法后,系统将自动启动 CLR 线程池中 I/O 线程。

img

5.1.2 异步读取

FileStream 中包含 BeginRead 与 EndRead 可以异步调用 I/O 线程进行读取。

public override IAsyncResult BeginRead (byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)

其使用方式与 BeginWrite 和 EndWrite 相似,AsyncCallback 用于绑定回调函数; Object 用于传递外部数据。在回调函数只需要使用 IAsyncResut.AsyncState 就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。

首先定义 FileData 类,里面包含 FileStream 对象,byte[] 数组和长度。然后把 FileData 对象作为外部数据传到回调函数,在回调函数中,把 IAsyncResult.AsyncState 强制转换为 FileData,然后通过 FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。

class Program
{
    public class FileData
    {
        public FileStream Stream;
        public int Length;
        public byte[] ByteData;
    }
      static void Main(string[] args)
    {
        //把线程池的最大值设置为1000
        ThreadPool.SetMaxThreads(1000, 1000);
        ThreadPoolMessage("Start");
        ReadFile();
        Console.ReadKey();
    }
    static void ReadFile()
    {
        byte[] byteData=new byte[80961024];
        FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate,
                                FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);

        //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数
        FileData fileData = new FileData();
        fileData.Stream = stream;
        fileData.Length = (int)stream.Length;
        fileData.ByteData = byteData;

        //启动异步读取
        stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
    }
    static void Completed(IAsyncResult result)
    {
        ThreadPoolMessage("Completed");
        //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取
        FileData fileData = (FileData)result.AsyncState;
        int length=fileData.Stream.EndRead(result);
        fileData.Stream.Close();
        //如果读取到的长度与输入长度不一致,则抛出异常
        if (length != fileData.Length)
            throw new Exception("Stream is not complete!");
        string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
        Console.WriteLine(data.Substring(2,22));
    }
    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
                      "WorkerThreads is:{2}  CompletionPortThreads is :{3}",
                      data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
        Console.WriteLine(message);
    }

}

由输出结果可以看到,在使用 FileStream.BeginRead 方法后,系统将自动启动 CLR 线程池中 I/O 线程。

img

注意:如果你看到的测试结果正好相反:工作者线程为 999,I/O 线程为 1000,这是因为 FileStream 的文件容量小于缓冲值 1024 所致的。此时文件将会一次性读取或写入,而系统将启动工作者线程而非 I/O 线程来处理回调函数。**

1.1.3. 5.2 异步操作 TCP/IP 套接字

在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中 BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于 CLR 线程池的 I/O 线程。

public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)

public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)

public override IAsyncResult BeginRead (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)

public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size,  AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)

若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET 中使用 TCP/IP 套接字不需要直接与 Socket 打交道,因为.NET 把 Socket 的大部分操作都放在 System.Net.TcpListener 和 System.Net.Sockets.TcpClient 里面,这两个类大大地简化了 Socket 的操作。一般套接字对象 Socket 包含一个 Accept()方法,此方法能产生阻塞来等待客户端的请求,而在 TcpListener 类里也包含了一个相似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个 TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取 NetworkStream 对象,控制套接字数据的发送与接收。

下面以一个例子说明异步调用 TCP/IP 套接字收发数据的过程。

首先在服务器端建立默认地址 127.0.0.1 用于收发信息,使用此地址与端口 500 新建 TcpListener 对象,调用 TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。

在 ChatClient 类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数 ReceiveAsyncCallback 中输出信息内容,若接收到的信息的大小小于 1 时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端

class Program
{
    static void Main(string[] args)
    {
        //设置CLR线程池最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);

        //默认地址为127.0.0.1
        IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        TcpListener tcpListener = new TcpListener(ipAddress, 500);
        tcpListener.Start();

        //以一个死循环来实现监听
        while (true)
        {   //调用一个ChatClient对象来实现监听
            ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());
        }
    }
}

public class ChatClient
{
    static TcpClient tcpClient;
    static byte[] byteMessage;
    static string clientEndPoint;

    public ChatClient(TcpClient tcpClient1)
    {
        tcpClient = tcpClient1;
        byteMessage = new byte[tcpClient.ReceiveBufferSize];

        //显示客户端信息
        clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();
        Console.WriteLine("Client's endpoint is " + clientEndPoint);

        //使用NetworkStream.BeginRead异步读取信息
        NetworkStream networkStream = tcpClient.GetStream();
        networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
                                    new AsyncCallback(ReceiveAsyncCallback), null);
    }

    public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)
    {
        //显示CLR线程池状态
        Thread.Sleep(100);
        ThreadPoolMessage("\nMessage is receiving");

        //使用NetworkStream.EndRead结束异步读取
        NetworkStream networkStreamRead = tcpClient.GetStream();
        int length=networkStreamRead.EndRead(iAsyncResult);

        //如果接收到的数据长度少于1则抛出异常
        if (length < 1)
        {
            tcpClient.GetStream().Close();
            throw new Exception("Disconnection!");
        }

        //显示接收信息
        string message = Encoding.UTF8.GetString(byteMessage, 0, length);
        Console.WriteLine("Message:" + message);

        //使用NetworkStream.BeginWrite异步发送信息
        byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");
        NetworkStream networkStreamWrite=tcpClient.GetStream();
        networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,
                                        new AsyncCallback(SendAsyncCallback), null);
    }

    //把信息转换成二进制数据,然后发送到客户端
    public void SendAsyncCallback(IAsyncResult iAsyncResult)
    {
        //显示CLR线程池状态
        Thread.Sleep(100);
        ThreadPoolMessage("\nMessage is sending");

        //使用NetworkStream.EndWrite结束异步发送
        tcpClient.GetStream().EndWrite(iAsyncResult);

        //重新监听
        tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
                                          new AsyncCallback(ReceiveAsyncCallback), null);
    }

    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

而在客户端只是使用简单的开发方式,利用 TcpClient 连接到服务器端,然后调用 NetworkStream.Write 方法发送信息,最后调用 NetworkStream.Read 方法读取回馈信息

static void Main(string[] args)
{
    //连接服务端
    TcpClient tcpClient = new TcpClient("127.0.0.1", 500);

    //发送信息
    NetworkStream networkStream = tcpClient.GetStream();
    byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!");
    networkStream.Write(sendMessage, 0, sendMessage.Length);
    networkStream.Flush();

    //接收信息
    byte[] receiveMessage=new byte[1024];
    int count=networkStream.Read(receiveMessage, 0,1024);
    Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));
    Console.ReadKey();
}

注意观察运行结果,服务器端的异步操作线程都是来自于 CLR 线程池的 I/O 线程

img

1.1.4. 5.3 异步 WebRequest

System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用 WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路径” 的 URI 方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用 FTP 文件传输协议实现文件请求/响应、HttpWebRequest 用于处理 HTTP 的页面请求/响应。由于使用方法相类似,下面就以常用的 HttpWebRequest 为例子介绍一下异步 WebRequest 的使用方法。

在使用 ASP.NET 开发网站的时候,往往会忽略了 HttpWebRequest 的使用,因为开发都假设客户端是使用浏览器等工具去阅读页面的。但如果你对 REST 开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:

public override Stream GetRequestStream ()
public override WebResponse GetResponse ()

public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )

其中 BeginGetRequestStream、EndGetRequestStream 用于异步向 HttpWebRequest 对象写入请求信息; BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作 Internet 的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自 CLR 线程池的 I/O 线程。

请求与响应不能使用同步与异步混合开发模式,即当请求写入使用 GetRequestStream 同步模式,即使响应使用 BeginGetResponse 异步方法,操作也与 GetRequestStream 方法在于同一线程内。

下面以简单的例子介绍一下异步请求的用法。

首先为 Person 类加上可序列化特性,在服务器端建立 Hanlder.ashx,通过 Request.InputStream 获取到请求数据并把数据转化为 String 对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据 Id 查找对应的 Person 对象,并把 Person 对象写入 Response.OutStream 中返还到客户端。

在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过 BeginGetRequireStream 获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法 BeginGetResponse 获取回复数据,最后把数据反序列化为 Person 对象显示出来。

HttpWebRequire.Method 默认为 get,在写入请求前必须把 HttpWebRequire.Method 设置为 post,否则在使用 BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。

Model

namespace Model
{
    [Serializable]
    public class Person
    {
        public int ID
        {
            get;
            set;
        }
        public string Name
        {
            get;
            set;
        }
        public int Age
        {
            get;
            set;
        }
    }
}

服务器端

public class Handler : IHttpHandler {

    public void ProcessRequest(HttpContext context)
    {
        //把信息转换为String,找出输入条件Id
        byte[] bytes=new byte[1024];
        int length=context.Request.InputStream.Read(bytes,0,1024);
        string condition = Encoding.Default.GetString(bytes);
        int id = int.Parse(condition.Split(new string[] { ":" },
                          StringSplitOptions.RemoveEmptyEntries)[1]);

        //根据Id查找对应Person对象
        var person = GetPersonList().Where(x => x.ID == id).First();

        //所Person格式化为二进制数据写入OutputStream
        BinaryFormatter formatter = new BinaryFormatter();
        formatter.Serialize(context.Response.OutputStream, person);
    }

    //模拟源数据
    private IList<Person> GetPersonList()
    {
        var personList = new List<Person>();

        var person1 = new Person();
        person1.ID = 1;
        person1.Name = "Leslie";
        person1.Age = 30;
        personList.Add(person1);
        ...........
        return personList;
    }

    public bool IsReusable
    {
        get { return true;}
    }
}

客户端

class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(1000, 1000);
        Request();
        Console.ReadKey();
    }

    static void Request()
    {
        ThreadPoolMessage("Start");
        //使用WebRequest.Create方法建立HttpWebRequest对象
        HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(
                                        "http://localhost:5700/Handler.ashx");
        webRequest.Method = "post";

        //对写入数据的RequestStream对象进行异步请求
        IAsyncResult result=webRequest.BeginGetRequestStream(
            new AsyncCallback(EndGetRequestStream),webRequest);
    }

    static void EndGetRequestStream(IAsyncResult result)
    {
        ThreadPoolMessage("RequestStream Complete");
        //获取RequestStream
        HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
        Stream stream=webRequest.EndGetRequestStream(result);

        //写入请求条件
        byte[] condition = Encoding.Default.GetBytes("Id:1");
        stream.Write(condition, 0, condition.Length);

        //异步接收回传信息
        IAsyncResult responseResult = webRequest.BeginGetResponse(
            new AsyncCallback(EndGetResponse), webRequest);
    }

    static void EndGetResponse(IAsyncResult result)
    {
        //显出线程池现状
        ThreadPoolMessage("GetResponse Complete");

        //结束异步请求,获取结果
        HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
        WebResponse webResponse = webRequest.EndGetResponse(result);

        //把输出结果转化为Person对象
        Stream stream = webResponse.GetResponseStream();
        BinaryFormatter formatter = new BinaryFormatter();
        var person=(Person)formatter.Deserialize(stream);
        Console.WriteLine(string.Format("Person    Id:{0} Name:{1} Age:{2}",
            person.ID, person.Name, person.Age));
    }

    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

从运行结果可以看到,BeginGetRequireStream、BeginGetResponse 方法是使用 CLR 线程池的 I/O 线程。

img

1.1.5. 5.4 异步调用 WebService

相比 TCP/IP 套接字,在使用 WebService 的时候,服务器端需要更复杂的操作处理,使用时间往往会更长。为了避免客户端长期处于等待状态,在配置服务引用时选择 “生成异步操作”,系统可以自动建立异步调用的方式。

以.NET 2.0 以前,系统都是使用 ASMX 来设计 WebService,而近年来 WCF 可说是火热登场,下面就以 WCF 为例子简单介绍一下异步调用 WebService 的例子。

由于系统可以自动生成异步方法,使用起来非常简单,首先在服务器端建立服务 ExampleService,里面包含方法 Method。客户端引用此服务时,选择 “生成异步操作”。然后使用 BeginMethod 启动异步方法, 在回调函数中调用 EndMethod 结束异步调用。

服务端

[ServiceContract]
public interface IExampleService
{
    [OperationContract]
    string Method(string name);
}

public class ExampleService : IExampleService
{
    public string Method(string name)
    {
        return "Hello " + name;
    }
}

class Program
{
    static void Main(string[] args)
    {
        ServiceHost host = new ServiceHost(typeof(ExampleService));
        host.Open();
        Console.ReadKey();
        host.Close();
      }
}

<configuration>
    <system.serviceModel>
        <services>
            <service name="Example.ExampleService">
                <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService">
                    <identity>
                        <dns value="localhost" />
                    </identity>
                </endpoint>
                <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
                <host>
                    <baseAddresses>
                        <add baseAddress="http://localhost:7200/Example/ExampleService/" />
                    </baseAddresses>
                </host>
            </service>
        </services>
    </system.serviceModel>
</configuration>

客户端

class Program
{
    static void Main(string[] args)
    {
        //设置最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);
        ThreadPoolMessage("Start");

        //建立服务对象,异步调用服务方法
        ExampleServiceReference.ExampleServiceClient exampleService = new
                                ExampleServiceReference.ExampleServiceClient();
        exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),
                                    exampleService);
        Console.ReadKey();
    }

    static void AsyncCallbackMethod(IAsyncResult result)
    {
        Thread.Sleep(1000);
        ThreadPoolMessage("Complete");
        ExampleServiceReference.ExampleServiceClient example =
            (ExampleServiceReference.ExampleServiceClient)result.AsyncState;
        string data=example.EndMethod(result);
        Console.WriteLine(data);
    }

    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  " +
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

<configuration>
    <system.serviceModel>
        <bindings>
            <wsHttpBinding>
                <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00"
                    openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00"
                    bypassProxyOnLocal="false" transactionFlow="false"
                    hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288"
                    maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8"
                    useDefaultWebProxy="true" allowCookies="false">
                    <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384"
                        maxBytesPerRead="4096" maxNameTableCharCount="16384" />
                    <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" />
                    <security mode="Message">
                        <transport clientCredentialType="Windows" proxyCredentialType="None"
                          realm="" />
                        <message clientCredentialType="Windows" negotiateServiceCredential="true"
                            algorithmSuite="Default" />
                    </security>
                </binding>
            </wsHttpBinding>
        </bindings>
        <client>
            <endpoint address="http://localhost:7200/Example/ExampleService/"
                binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService"
                contract="ExampleServiceReference.IExampleService"
                name="WSHttpBinding_IExampleService">
                <identity>
                    <dns value="localhost" />
                </identity>
            </endpoint>
        </client>
    </system.serviceModel>
</configuration>

注意观察运行结果,异步调用服务时,回调函数都是运行于 CLR 线程池的 I/O 线程当中。

img

2. 六、异步 SqlCommand

从 ADO.NET 2.0 开始,SqlCommand 就新增了几个异步方法执行 SQL 命令。相对于同步执行方式,它使主线程不需要等待数据库的返回结果,在使用复杂性查询或批量插入时将有效提高主线程的效率。使用异步 SqlCommand 的时候,请注意把 ConnectionString 的 Asynchronous Processing 设置为 true 。

SqlCommand 异步操作的特别之处在于线程并不依赖于 CLR 线程池,而是由 Windows 内部提供,这比使用异步委托更有效率。但如果需要使用回调函数的时候,回调函数的线程依然是来自于 CLR 线程池的工作者线程。

SqlCommand 有以下几个方法支持异步操作:

public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)

public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)

public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)

由于使用方式相似,此处就以 BeginExecuteNonQuery 为例子,介绍一下异步 SqlCommand 的使用。首先建立 connectionString,注意把 Asynchronous Processing 设置为 true 来启动异步命令,然后把 SqlCommand.CommandText 设置为 WAITFOR DELAY "0:0:3" 来虚拟数据库操作。再通过 BeginExecuteNonQuery 启动异步操作,利用轮询方式监测操作情况。最后在操作完成后使用 EndExecuteNonQuery 完成异步操作。

class Program
{
    //把Asynchronous Processing设置为true
    static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;"+
                                    "Integrated Security=True;Asynchronous Processing=true";

    static void Main(string[] args)
    {
        //把CLR线程池最大线程数设置为1000
        ThreadPool.SetMaxThreads(1000, 1000);
        ThreadPoolMessage("Start");

        //使用WAITFOR DELAY命令来虚拟操作
        SqlConnection connection = new SqlConnection(connectionString);
        SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
        connection.Open();

        //启动异步SqlCommand操作,利用轮询方式监测操作
        IAsyncResult result = command.BeginExecuteNonQuery();
        ThreadPoolMessage("BeginRead");
        while (!result.AsyncWaitHandle.WaitOne(500))
            Console.WriteLine("Main thread do work........");

        //结束异步SqlCommand
        int count= command.EndExecuteNonQuery(result);
        ThreadPoolMessage("\nCompleted");
        Console.ReadKey();
    }

    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

注意运行结果,SqlCommand 的异步执行线程并不属于 CLR 线程池。

img

如果觉得使用轮询方式过于麻烦,可以使用回调函数,但要注意当调用回调函数时,线程是来自于 CLR 线程池的工作者线程。

class Program
{
    //把Asynchronous Processing设置为true
    static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;"+
                                      "Integrated Security=True;Asynchronous Processing=true";
    static void Main(string[] args)
    {
        //把CLR线程池最大线程数设置为1000
        ThreadPool.SetMaxThreads(1000, 1000);
        ThreadPoolMessage("Start");

        //使用WAITFOR DELAY命令来虚拟操作
        SqlConnection connection = new SqlConnection(connectionString);
        SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
        connection.Open();

        //启动异步SqlCommand操作,并把SqlCommand对象传递到回调函数
        IAsyncResult result = command.BeginExecuteNonQuery(
                                    new AsyncCallback(AsyncCallbackMethod),command);
        Console.ReadKey();
    }

    static void AsyncCallbackMethod(IAsyncResult result)
    {
        Thread.Sleep(200);
        ThreadPoolMessage("AsyncCallback");
        SqlCommand command = (SqlCommand)result.AsyncState;
        int count=command.EndExecuteNonQuery(result);
        command.Connection.Close();
    }

    //显示线程池现状
    static void ThreadPoolMessage(string data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
              "WorkerThreads is:{2}  CompletionPortThreads is :{3}\n",
              data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

运行结果:

img

3. 七、并行编程与 PLINQ

要使用多线程开发,必须非常熟悉 Thread 的使用,而且在开发过程中可能会面对很多未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库 System.Threading.Tasks,它可以简化并行开发,你无需直接跟线程或线程池打交道,就可以简单建立多线程应用程序。此外,.NET 还提供了新的一组扩展方法 PLINQ,它具有自动分析查询功能,如果并行查询能提高系统效率,则同时运行,如果查询未能从并行查询中受益,则按原顺序查询。下面将详细介绍并行操作的方式。

3.1.1. 7.1 泛型委托

使用并行编程可以同时操作多个委托,在介绍并行编程前先简单介绍一下两个泛型委托 System.Func<>与 System.Action<>。

Func<>是一个能接受多个参数和一个返回值的泛型委托,它能接受 0 个到 16 个输入参数, 其中 T1,T2,T3,T4......T16 代表自定的输入类型,TResult 为自定义的返回值。

public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
..............
public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

Action<>与Func<>十分相似,不同在于Action<>的返回值为void,Action能接受0~16个参数
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

3.1.2. 7.2 任务并行库(TPL)

System.Threading.Tasks 中的类被统称为任务并行库(Task Parallel Library,TPL),TPL 使用 CLR 线程池把工作分配到 CPU,并能自动处理工作分区、线程调度、取消支持、状态管理以及其他低级别的细节操作,极大地简化了多线程的开发。

TPL 比 Thread 更具智能性,当它判断任务集并没有从并行运行中受益,就会选择按顺序运行。但并非所有的项目都适合使用并行开发,创建过多并行任务可能会损害程序的性能,降低运行效率。

TPL 包括常用的数据并行与任务并行两种执行方式:

3.1.3. 7.2.1 数据并行

数据并行的核心类就是 System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与 for、foreach 相仿。通过这两个方法可以并行处理 System.Func<>、System.Action<>委托。

以下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Action) 方法对 List进行并行查询。假设使用单线程方式查询 3 个 Person 对象,需要用时大约 6 秒,在使用并行方式,只需使用 2 秒就能完成查询,而且能够避开 Thread 的繁琐处理。

class Program
{
    static void Main(string[] args)
    {
        //设置最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);
        //并行查询
        Parallel.For(0, 3,n =>
            {
                Thread.Sleep(2000);  //模拟查询
                ThreadPoolMessage(GetPersonList()[n]);
            });
        Console.ReadKey();
    }

    //模拟源数据
    static IList<Person> GetPersonList()
    {
        var personList = new List<Person>();

        var person1 = new Person();
        person1.ID = 1;
        person1.Name = "Leslie";
        person1.Age = 30;
        personList.Add(person1);
        ...........
        return personList;
    }

    //显示线程池现状
    static void ThreadPoolMessage(Person person)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +
              "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +
              "  CompletionPortThreads is :{5}\n",
              person.ID, person.Name, person.Age,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

观察运行结果,对象并非按照原排列顺序进行查询,而是使用并行方式查询。

img

若想停止操作,可以利用 ParallelLoopState 参数,下面以 ForEach 作为例子。 public static ParallelLoopResult ForEach( IEnumerable source, Action action) 其中 source 为数据集,在 Action委托的 ParallelLoopState 参数当中包含有 Break()和 Stop()两个方法都可以使迭代停止。Break 的使用跟传统 for 里面的使用方式相似,但因为处于并行处理当中,使用 Break 并不能保证所有运行能立即停止,在当前迭代之前的迭代会继续执行。若想立即停止操作,可以使用 Stop 方法,它能保证立即终止所有的操作,无论它们是处于当前迭代的之前还是之后。

class Program
{
    static void Main(string[] args)
    {
        //设置最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);

        //并行查询
        Parallel.ForEach(GetPersonList(), (person, state) =>
            {
                if (person.ID == 2)
                    state.Stop();
                ThreadPoolMessage(person);
            });
        Console.ReadKey();
    }

    //模拟源数据
    static IList<Person> GetPersonList()
    {
        var personList = new List<Person>();

        var person1 = new Person();
        person1.ID = 1;
        person1.Name = "Leslie";
        person1.Age = 30;
        personList.Add(person1);
        ..........
        return personList;
    }

    //显示线程池现状
    static void ThreadPoolMessage(Person person)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +
              "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +
              "  CompletionPortThreads is :{5}\n",
              person.ID, person.Name, person.Age,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

观察运行结果,当 Person 的 ID 等于 2 时,运行将会停止。

img

当要在多个线程中调用本地变量,可以使用以下方法: public static ParallelLoopResult ForEach(IEnumerable, Func, Func, Action) 其中第一个参数为数据集; 第二个参数是一个 Func 委托,用于在每个线程执行前进行初始化; 第 三个参数是委托 Func,它能对数据集的每个成员进行迭代,当中 T1 是数据集的成员,T2 是一个 ParallelLoopState 对 象,它可以控制迭代的状态,T3 是线程中的本地变量; 第四个参数是一个 Action 委托,用于对每个线程的最终状态进行最终操作。

在以下例子中,使用 ForEach 计算多个 Order 的总体价格。在 ForEach 方法中,首先把参数初始化为 0f,然后用把同一个 Order 的多个 OrderItem 价格进行累加,计算出 Order 的价格,最后把多个 Order 的价格进行累加,计算出多个 Order 的总体价格。

public class Order
{
    public int ID;
    public float Price;
}

public class OrderItem
{
    public int ID;
    public string Goods;
    public int OrderID;
    public float Price;
    public int Count;
}

class Program
{
    static void Main(string[] args)
    {
        //设置最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);
        float totalPrice = 0f;
        //并行查询
        var parallelResult = Parallel.ForEach(GetOrderList(),
                () => 0f,   //把参数初始值设为0
                (order, state, orderPrice) =>
                {
                    //计算单个Order的价格
                    orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
                          .Sum(item => item.Price * item.Count);
                    order.Price = orderPrice;
                    ThreadPoolMessage(order);

                    return orderPrice;
                },
                (finallyPrice) =>
                {
                    totalPrice += finallyPrice;//计算多个Order的总体价格
                }
            );

        while (!parallelResult.IsCompleted)
            Console.WriteLine("Doing Work!");

        Console.WriteLine("Total Price is:" + totalPrice);
        Console.ReadKey();
    }
    //虚拟数据
    static IList<Order> GetOrderList()
    {
        IList<Order> orderList = new List<Order>();
        Order order1 = new Order();
        order1.ID = 1;
        orderList.Add(order1);
        ............
        return orderList;
    }
    //虚拟数据
    static IList<OrderItem> GetOrderItem()
    {
        IList<OrderItem> itemList = new List<OrderItem>();

        OrderItem orderItem1 = new OrderItem();
        orderItem1.ID = 1;
        orderItem1.Goods = "iPhone 4S";
        orderItem1.Price = 6700;
        orderItem1.Count = 2;
        orderItem1.OrderID = 1;
        itemList.Add(orderItem1);
        ...........
        return itemList;
    }

    //显示线程池现状
    static void ThreadPoolMessage(Order order)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("OrderID:{0}  OrderPrice:{1}\n" +
              "  CurrentThreadId is {2}\n  WorkerThreads is:{3}" +
              "  CompletionPortThreads is:{4}\n",
              order.ID, order.Price,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }
}

运行结果

img

3.1.4. 7.2.2 任务并行

在 TPL 当中还可以使用 Parallel.Invoke 方法触发多个异步任务,其中 actions 中可以包含多个方法或者委托,parallelOptions 用于配置 Parallel 类的操作。

public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )

下面例子中利用了 Parallet.Invoke 并行查询多个 Person,actions 当中可以绑定方法、lambda 表达式或者委托,注意绑定方法时必须是返回值为 void 的无参数方法。

class Program
{
    static void Main(string[] args)
    {
        //设置最大线程数
        ThreadPool.SetMaxThreads(1000, 1000);

        //任务并行
        Parallel.Invoke(option,
            PersonMessage,
            ()=>ThreadPoolMessage(GetPersonList()[1]),
            delegate(){
                ThreadPoolMessage(GetPersonList()[2]);
            });
        Console.ReadKey();
    }

    static void PersonMessage()
    {
        ThreadPoolMessage(GetPersonList()[0]);
    }

    //显示线程池现状
    static void ThreadPoolMessage(Person person)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +
              "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +
              "  CompletionPortThreads is :{5}\n",
              person.ID, person.Name, person.Age,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());

        Console.WriteLine(message);
    }

    //模拟源数据
    static IList<Person> GetPersonList()
    {
        var personList = new List<Person>();

        var person1 = new Person();
        person1.ID = 1;
        person1.Name = "Leslie";
        person1.Age = 30;
        personList.Add(person1);
        ..........
        return personList;
    }
}

运行结果

img

3.1.5. 7.3 Task 简介

以 Thread 创建的线程被默认为前台线程,当然你可以把线程 IsBackground 属性设置为 true,但 TPL 为此提供了一个更简单的类 Task。 Task 存在于 System.Threading.Tasks 命名空间当中,它可以作为异步委托的简单替代品。通过 Task 的 Factory 属性将返回 TaskFactory 类,以 TaskFactory.StartNew(Action)方法可以创建一个新线程,所创建的线程默认为后台线程。

class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(1000, 1000);
        Task.Factory.StartNew(() => ThreadPoolMessage());
        Console.ReadKey();
    }

    //显示线程池现状
    static void ThreadPoolMessage()
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("CurrentThreadId is:{0}\n" +
            "CurrentThread IsBackground:{1}\n" +
            "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",
            Thread.CurrentThread.ManagedThreadId,
            Thread.CurrentThread.IsBackground.ToString(),
            a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

运行结果

img

若要取消处理,可以利用 CancellationTakenSource 对象,在 TaskFactory 中包含有方法

public Task StartNew( Action action, CancellationToken cancellationToken ) 在方法中加入 CancellationTakenSource 对象的 CancellationToken 属性,可以控制任务的运行,调用 CancellationTakenSource.Cancel 时任务就会自动停止。下面以图片下载为例子介绍一下 TaskFactory 的使用。

服务器端页面

<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
    <title></title>
    <script type="text/C#" runat="server">
        private static List<string> url=new List<string>();

        protected void Page_Load(object sender, EventArgs e)
        {
            if (!Page.IsPostBack)
            {
                url.Clear();
                Application["Url"] = null;
            }
        }

        protected void CheckBox_CheckedChanged(object sender, EventArgs e)
        {
            CheckBox checkBox = (CheckBox)sender;
            if (checkBox.Checked)
                url.Add(checkBox.Text);
            else
                url.Remove(checkBox.Text);
            Application["Url"]= url;
        }
</script>
</head>
<body>
    <form id="form1" runat="server" >
    <div align="left">
      <div align="center" style="float: left;">
        <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
        <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
              oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
      </div>
      <div align="center" style="float: left">
          <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
          <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
              oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
      </div>
      <div align="center" style="float: left">
          <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
          <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
              oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
      </div>
      <div align="center" style="float: left">
          <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
          <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
              oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
      </div>
      <div align="center" style="float: left">
          <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
          <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
              oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
      </div>
    </div>
    </form>
</body>
</html>

首先在服务器页面中显示多个*.jpg 图片,每个图片都有对应的 CheckBox 检测其选择情况。所选择图片的路径会记录在 Application["Url"]当中传递到 Handler.ashx 当中。

Application 是一个全局变量,此处只是为了显示 Task 的使用方式,在 ASP.NET 开发应该慎用 Application。

Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成 byte[]二进制数据。再把图片的数量,每副图片的二进制数据的长度记录在 OutputStream 的头部。最后把图片的二进制数据记入 OutputStream 一并输出。

public class Handler : IHttpHandler
{
    public void ProcessRequest(HttpContext context)
    {
        //获取图片名,把图片数量写OutputStream
        List<String> urlList = (List<string>)context.Application["Url"];
        context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);

        //把图片转换成二进制数据
        List<string> imageList = GetImages(urlList);

        //把每副图片长度写入OutputStream
        foreach (string image in imageList)
        {
            byte[] imageByte=Convert.FromBase64String(image);
            context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
        }

        //把图片写入OutputStream
        foreach (string image in imageList)
        {
            byte[] imageByte = Convert.FromBase64String(image);
            context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
        }
    }

    //获取多个图片的二进制数据
    private List<string> GetImages(List<string> urlList)
    {
        List<string> imageList = new List<string>();
        foreach (string url in urlList)
            imageList.Add(GetImage(url));
        return imageList;
    }

    //获取单副图片的二进制数据
    private string GetImage(string url)
    {
        string path = "E:/My Projects/Example/WebSite/Images/"+url;
        FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
        byte[] imgBytes = new byte[10240];
        int imgLength = stream.Read(imgBytes, 0, 10240);
        return Convert.ToBase64String(imgBytes,0,imgLength);
    }

    public bool IsReusable
    {
        get{ return false;}
    }
}

客户端

建立一个 WinForm 窗口,里面加入一个 WebBrowser 连接到服务器端的 Default.aspx 页面。当按下 Download 按键时,系统就会利用 TaskFactory.StartNew 的方法建立异步线程,使用 WebRequest 方法向 Handler.ashx 发送请求。接收到回传流时,就会根据头文件的内容判断图片的数量与每副图片的长度,把二进制数据转化为*.jpg 文件保存。

img

系统利用 TaskFactory.StartNew(action,cancellationToken) 方式异步调用 GetImages 方法进行图片下载。当用户按下 Cancel 按钮时,异步任务就会停止。值得注意的是,在图片下载时调用了 CancellationToken.ThrowIfCancellationRequested 方法,目的在检查并行任务的运行情况,在并行任务被停止时释放出 OperationCanceledException 异常,确保用户按下 Cancel 按钮时,停止所有并行任务。

public partial class Form1 : Form
{
    private CancellationTokenSource tokenSource = new CancellationTokenSource();

    public Form1()
    {
        InitializeComponent();
        ThreadPool.SetMaxThreads(1000, 1000);
    }

    private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
    {
        Task.Factory.StartNew(GetImages,tokenSource.Token);
    }

    private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
    {
        tokenSource.Cancel();
    }

    private void GetImages()
    {
        //发送请求,获取输出流
        WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
        Stream responseStream=webRequest.GetResponse().GetResponseStream();

        byte[] responseByte = new byte[81960];
        IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
        int responseLength = responseStream.EndRead(result);

        //获取图片数量
        int imageCount = BitConverter.ToInt32(responseByte, 0);

        //获取每副图片的长度
        int[] lengths = new int[imageCount];
        for (int n = 0; n < imageCount; n++)
        {
            int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
            lengths[n] = length;
        }
        try
        {
            //保存图片
            for (int n = 0; n < imageCount; n++)
            {
                string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
                FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);

                //计算字节偏移量
                int offset = (imageCount + 1) * 4;
                for (int a = 0; a < n; a++)
                    offset += lengths[a];

                file.Write(responseByte, offset, lengths[n]);
                file.Flush();

                //模拟操作
                Thread.Sleep(1000);

                //检测CancellationToken变化
                tokenSource.Token.ThrowIfCancellationRequested();
            }
        }
        catch (OperationCanceledException ex)
        {
            MessageBox.Show("Download cancel!");
        }
    }
}

3.1.6. 7.4 并行查询(PLINQ)

并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的所有处理器。 它利用所有处理器的方法,把数据源分成片段,然后在多个处理器上对单独工作线程上的每个片段并行执行查询, 在许多情况下,并行执行意味着查询运行速度显著提高。但这并不说明所有 PLINQ 都会使用并行方式,当系统测试要并行查询会对系统性能造成损害时,那将自动化地使用同步执行。在 System.Linq.ParallelEnumerable 类中,包含了并行查询的大部分方法。

方法成员  说明
AsParallel PLINQ 的入口点。 指定如果可能,应并行化查询的其余部分。
AsSequential(Of TSource) 指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行。
AsOrdered 指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。
AsUnordered(Of TSource) 指定查询的其余部分的 PLINQ 不需要保留源序列的排序。
WithCancellation(Of TSource) 指定 PLINQ 应定期监视请求取消时提供的取消标记和取消执行的状态。
WithDegreeOfParallelism(Of TSource) 指定 PLINQ 应当用来并行化查询的处理器的最大数目。
WithMergeOptions(Of TSource) 提供有关 PLINQ 应当如何(如果可能)将并行结果合并回到使用线程上的一个序列的提示。
WithExecutionMode(Of TSource) 指定 PLINQ 应当如何并行化查询(即使默认行为是按顺序运行查询)。
ForAll(Of TSource) 多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回到使用者线程的情况下并行处理结果。
Aggregate 重载 对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。

3.1.7. 7.4.1 AsParallel

通常想要实现并行查询,只需向数据源添加 AsParallel 查询操作即可。

class Program
{
    static void Main(string[] args)
    {
        var personList=GetPersonList().AsParallel()
              .Where(x=>x.Age>30);
        Console.ReadKey();
    }

    //模拟源数据
    static IList<Person> GetPersonList()
    {
        var personList = new List<Person>();

        var person1 = new Person();
        person1.ID = 1;
        person1.Name = "Leslie";
        person1.Age = 30;
        personList.Add(person1);
        ...........
        return personList;
    }
}

3.1.8. 7.4.2 AsOrdered

若要使查询结果必须保留源序列排序方式,可以使用 AsOrdered 方法。 AsOrdered 依然使用并行方式,只是在查询过程加入额外信息,在并行结束后把查询结果再次进行排列。

class Program
{
    static void Main(string[] args)
    {
        var personList=GetPersonList().AsParallel().AsOrdered()
            .Where(x=>x.Age<30);
        Console.ReadKey();
    }

    static IList<Person> GetPersonList()
    {......}
}

3.1.9. 7.4.3 WithDegreeOfParallelism

默认情况下,PLINQ 使用主机上的所有处理器,这些处理器的数量最多可达 64 个。通过使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定数量的处理器。

class Program
{
    static void Main(string[] args)
    {
        var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
            .Where(x=>x.Age<30);
        Console.ReadKey();
    }

    static IList<Person> GetPersonList()
    {.........}
}

3.1.10. 7.4.4 ForAll

如果要对并行查询结果进行操作,一般会在 for 或 foreach 中执行,执行枚举操作时会使用同步方式。有见及此,PLINQ 中包含了 ForAll 方法,它可以使用并行方式对数据集进行操作。

class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(1000, 1000);
        GetPersonList().AsParallel().ForAll(person =>{
            ThreadPoolMessage(person);
        });
        Console.ReadKey();
    }

    static IList<Person> GetPersonList()
    {.......}

    //显示线程池现状
    static void ThreadPoolMessage(Person person)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +
              "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +
              "  CompletionPortThreads is :{5}\n",
              person.ID, person.Name, person.Age,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

运行结果

img

3.1.11. 7.4.5 WithCancellation

如果需要停止查询,可以使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例作为参数。与第三节 Task 的例子相似,如果标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并停止所有线程上的处理,然后引发 OperationCanceledException。这可以保证并行查询能够立即停止。

class Program
{
    static CancellationTokenSource tokenSource = new CancellationTokenSource();

    static void Main(string[] args)
    {
        Task.Factory.StartNew(Cancel);
        try
        {
            GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
                .ForAll(person =>
                {
                    ThreadPoolMessage(person);
                });
        }
        catch (OperationCanceledException ex)
        { }
        Console.ReadKey();
    }

    //在10~50毫秒内发出停止信号
    static void Cancel()
    {
        Random random = new Random();
        Thread.Sleep(random.Next(10,50));
        tokenSource.Cancel();
    }

    static IList<Person> GetPersonList()
    {......}

    //显示线程池现状
    static void ThreadPoolMessage(Person person)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("Person  ID:{0} Name:{1} Age:{2}\n" +
              "  CurrentThreadId is {3}\n  WorkerThreads is:{4}" +
              "  CompletionPortThreads is :{5}\n",
              person.ID, person.Name, person.Age,
              Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

八、定时器与锁

3.1.12. 8.1 定时器

若要长期定时进行一些工作,比如像邮箱更新,实时收听信息等等,可以利用定时器 Timer 进行操作。在 System.Threading 命名空间中存在 Timer 类与对应的 TimerCallback 委托,它可以在后台线程中执行一些长期的定时操作,使主线程不受干扰。 Timer 类中最常用的构造函数为 public Timer( timerCallback , object , int , int ) timerCallback 委托可以绑定执行方法,执行方法必须返回 void,它可以是无参数方法,也可以带一个 object 参数的方法。第二个参数是为 timerCallback 委托输入的参数对象。第三个参数是开始执行前等待的时间。第四个参数是每次执行之间的等待时间。

开发实例

class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(1000, 1000);

        TimerCallback callback = new TimerCallback(ThreadPoolMessage);
        Timer t = new Timer(callback,"Hello Jack! ", 0, 1000);
        Console.ReadKey();
    }

    //显示线程池现状
    static void ThreadPoolMessage(object data)
    {
        int a, b;
        ThreadPool.GetAvailableThreads(out a, out b);
        string message = string.Format("{0}\n   CurrentThreadId is:{1}\n" +
            "   CurrentThread IsBackground:{2}\n" +
            "   WorkerThreads is:{3}\n   CompletionPortThreads is:{4}\n",
            data + "Time now is " + DateTime.Now.ToLongTimeString(),
            Thread.CurrentThread.ManagedThreadId,
            Thread.CurrentThread.IsBackground.ToString(),
            a.ToString(), b.ToString());
        Console.WriteLine(message);
    }
}

注意观察运行结果,每次调用 Timer 绑定的方法时不一定是使用同一线程,但线程都会是来自工作者线程的后台线程。

img

3.1.13. 8.2 锁

在使用多线程开发时,存在一定的共用数据,为了避免多线程同时操作同一数据,.NET 提供了 lock、Monitor、Interlocked 等多个锁定数据的方式。

3.1.14. 8.2.1 lock

lock 的使用比较简单,如果需要锁定某个对象时,可以直接使用 lock(this)的方式。

private void Method()
{
    lock(this)
    {
        //在此进行的操作能保证在同一时间内只有一个线程对此对象操作
    }
}

如果操作只锁定某段代码,可以事先建立一个 object 对象,并对此对象进行操作锁定,这也是.net 提倡的锁定用法。

class Control
{
    private object obj=new object();

    public void Method()
    {
        lock(obj)
        {.......}
    }
}

3.1.15. 8.2.2 Montior

Montior 存在于 System.Thread 命名空间内,相比 lock,Montior 使用更灵活。它存在 Enter, Exit 两个方法,它可以对对象进行锁定与解锁,比 lock 使用更灵活。

class Control
{
    private object obj=new object();

    public void Method()
    {
        Monitor.Enter(obj);
        try
        {......}
        catch(Excetion ex)
        {......}
        finally
        {
            Monitor.Exit(obj);
        }
    }
}

使用 try 的方式,能确保程序不会因死锁而释放出异常!而且在 finally 中释放 obj 对象能够确保无论是否出现死锁状态,系统都会释放 obj 对象。而且 Monitor 中还存在 Wait 方法可以让线程等待一段时间,然后在完成时使用 Pulse、PulseAll 等方法通知等待线程。

3.1.16. 8.2.3 Interlocked

Interlocked 存在于 System.Thread 命名空间内,它的操作比 Monitor 使用更简单。它存在 CompareExchange、Decrement、Exchange、Increment 等常用方法让参数在安全的情况进行数据交换。

Increment、Decrement 可以使参数安全地加 1 或减 1 并返回递增后的新值。

class Example
{
    private int a=1;

    public void AddOne()
    {
            int newA=Interlocked.Increment(ref a);
    }
}

Exchange 可以安全地变量赋值。

public void SetData()
{
    Interlocked.Exchange(ref a,100);
}

CompareExchange 使用特别方便,它相当于 if 的用法,当 a 等于 1 时,则把 100 赋值给 a。

public void CompareAndExchange()
{
    Interlocked.CompareExchange(ref a,100,1);
}
Copyright © Guanghui Wang all right reserved,powered by GitbookFile Modified: 2019-08-25 13:56:34

results matching ""

    No results matching ""