在本文的前两篇文章里对MSMQ的相关知识点进行了介绍,很多阅读过这前两篇文章的朋友都曾问到过这样一些问题:
1、如何把MSMQ应用到实际的项目中去呢?
2、可不可以介绍一个实际的应用实例?
3、......
在前两篇文章里,关于MSMQ常用的技术点基本介绍完毕了,本文主要以MS开源项目PetShop中的MSMQ应用作为案例来介绍MSMQ在实际项目中的应用。在PetShop里,由于系统使用了多线程的专用应用程序来监控消息队列,在进入PetShop应用分析前,我们先来了解下关于多线程和MSMQ的相关知识点。
一、多线程和MSMQ
现在有这样一个需求,指定的消息队列里不管有无消息数据,我们通过一个多线程来监控这个队列,一但队列里的数据发生变化就做出相应的处理,比如把消息读取出来。根据这个需求,我们来做个示例,用一多线程把队列监控起来,如果队列里有消息数据,就把消息读取出来,如果没有则一直监视队列,当队列数据发生改变(有新的消息加入)的时候就作出处理(读取消息)。
首先定义一个线程数组用于存储线程数;
2static private Thread[] ThreadArray = new Thread[ThreadNumber];
我们把需要启动的线程装载入ThreadArray数组,通过一个遍历数组把所以的线程启动,实际这里只有5个线程。
2{
3 StartThreads();
4}
5
6private void StartThreads()
7{
8 int counter; //线程计数
9 for (counter = 0; counter < ThreadNumber; counter++)
10 {
11 ThreadArray[counter] = new Thread(new ThreadStart(MSMQListen));
12 ThreadArray[counter].Start();
13 this.richTextBox2.Text += (counter + 1).ToString() + "号线程开始!";
14 }
15}
16
17private void MSMQListen()
18{
19 while (true)
20 {
21 //取出队列里的消息
22 MessageBox.Show(MsgQueue.ReceiveMessage());
23 }
24}
如果上面这段代码阅读起存在问题,建议先去了解下多线程的相关知识点。在StartThreads方法里启动数组里存储的所以线程,并委托给MSMQListen方法进行处理,MSMQListen方法完成的就是读取队列里的消息,这里我使用了在第二篇文章里所使用的MsgQueue类和Book类,详细请阅读第二篇文章ASP.NET中进行消息处理(MSMQ) 二 。 |
动了5个线程,用来监视指定的消息队列,如上图。那好,我们现在就来测试一下,通过给队列里发送消息,看线程是否会有响应。从上面启动线程的代码上可以很清晰的看出,只要队列里有消息在多线程的监视下,线程就会把队列里的消息读取出来。
1private void button3_Click(object sender, EventArgs e)
2{ 3 Book book = new Book(); 4 book.BookId = 1; 5 book.BookName = "asp.net"; 6 book.BookAuthor = "abcd"; 7 book.BookPrice = 50.80; 8 9 MsgQueue.SendMessage(book); 10} 那么这里的测试,向队列里发送了一Book对象消息,根据上面分析,则多线程便会同时把此条消息读取出来: |
呵呵,测试结果出来,看来此测试达到了我们之前所提出的需求。!OK,关于MSMQ和多线程就简单介绍于此。 二、MSMQ在开塬项目PetShop中的应用分析。 在PetShop 4.0中,利用消息队列临时存放要插入的数据,来避免因为频繁访问数据库的操作。而队列中的消息,则等待系统的专用的应用程序来处理,最后将数据插入到数据库中。 PetShop 4.0中的消息处理,主要分为下面几大部分:订单策略接口IOrderStategy、消息接口IMessageing、消息工厂MessageFactory、MSMQ实现MSMQMessaging、后台处理应用程序OrderProessor。如下图: |
1、订单策略接口IOrderStategy PetShop 4.0的体系结构是非常庞大,在订单处理上有两种处理策略,这里也是策略模式的一个应用,IOrderStrategy接口作为订单策略的高层抽象,实现不同订单处理的具体策略去实现它,UML如下: |
这里需要注意的是,Order类既继承了基类PetShopQueue,同时还实现了接口IOrder,而在消息接口和基类中所定义的接收消息(Receive)方法在方法的签名上是相同的。所以在Order的Receive方法实现中,必须使用new而非override关键字来重写父类PetShopQueue的Receive虚方法。此时Order类的Receive方法代表两个含义,一是实现了消息接口IOrder中的Receive方法;二则是利用了new关键字重写了父类PetShopQueue的Receive虚方法。
在PetShop4.0中,将面向对象的知识点应用得非常精妙,如上分析,此时我门可以怎么来调用Order呢?是这样吗?
2PetShopQueue order = new Order();
3order.Receive();
4
5//2、使用消息接口IOrder
6IOrder order = new Order();
7order.Receive();
根据多态原理,上面这两种实现都是正确的,那我们那取谁舍谁呢?在PetShop4.0中正确的调用是第2种方法,这种调用方法也更符合“面向接口设计”的原则。----详细请查看消息工厂MessageFactory部分的介绍。
5、后台处理应用程序OrderProessor
前面一系列的操作,最终都会走向到这里,这里实现了最终的插入数据库的操作。在PetShop 4.0中OrderProessor是一个控制台应用程序,根据需求也可以将其设计为Windows Service。他完成的操作就是接收消息队列里的订单数据,将其插入到数据库。在OrderProessor里使用了多线程技术,监视队列里的订单信息,定期的将其处理。在主方法Main方法中用于控制线程序,核心的执行任何则委托给ProcessOrders方法去实现。
2{
3 TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));
4 Order order = new Order(); //逻辑层的PetShop.BLL.Order
5 while (true)
6 {
7 TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks);
8 double elapsedTime = 0;
9
10 int processedItems = 0;
11 ArrayList queueOrders = new ArrayList();
12
13 //首先验证事务
14 using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
15 {
16 //从队列中检索订单
17 for (int i = 0; i < batchSize; i++)
18 {
19 try
20 {
21 //在一定时间 类接收队列的订单
22 if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
23 {
24 queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
25 }
26 else
27 {
28 i = batchSize; // 结束循环
29 }
30 elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds;
31 }
32 catch (TimeoutException)
33 {
34 //没有可以等待的消息也结束循环
35 i = batchSize;
36 }
37 }
38
39 //处理队列的订单
40 for (int k = 0; k < queueOrders.Count; k++)
41 {
42 order.Insert((OrderInfo)queueOrders[k]);
43 processedItems++;
44 totalOrdersProcessed++;
45 }
46 //处理完毕或者是超时
47 ts.Complete();
48 }
49 Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");
50 }
51}
ProcessOrders方法首先通过业务逻辑层PetShop.BLL.Order类的方法ReceiveFromQueue去获取消息队列中的订单数据,并将其放入一个ArrayList对象,然后将其插入到数据库。 OrderProcessor的完整代码定义如下:
MSMQ技术除了用于异步处理之外,还可作为一种分布式处理技术应用。关于使用MSMQ进行分布式处理相关的内容,本人能力有限,还请大家查看相关的资料进行了解。
本文介绍了MSMQ和多线程以及对PetShop 4.0中对MSMQ的应用进行了分析,结合之前我写的两篇关于MSMQ的相关知识点的介绍文章,对MSMQ算是建立了一个全面的认识,希望这三篇文章对大家学习MSMQ有所帮助。