# 原则37:构造并行算法的异常考量
**By D.S.Qiu**
**尊重他人的劳动,支持原创,转载请注明出处:[http://dsqiu.iteye.com](http://dsqiu.iteye.com)**
前面两个原则幸福地忽略了任何子线程运行出错的可能性。这显然不是现实世界所进行的。异常会在你的子线程发生,你不得不转向去收拾残局。当然,后台线程的异常在某种程度上增加复杂性。异常不能继续调用线程边界的函数栈。而是,如果在线程启动方法出现异常,这个线程就会终止。没有任何方式调用线程检索错误,或者对异常做任何事。更重要的是,如果出现异常你的并行算法就必须支持回滚,你不得不理解异常出现的副作用并且能从异常恢复过来。每个算法都有不同需求,所以在并行的环境中没有通用的答案处理异常。我只能提供的指导就是:对于特定的应用你可以使用最好的策略。
我们从上个原则的异步下载开始说。最简单策略就是没有副作用,并且从所有的 web 主机上持续下载不用考虑其中一个会失败。并行操作使用新的 AggregateException 类处理并行操作的异常。 AggregateException 的 InnerException 属性包含所有在并行操作可能产生的异常。在这个过程有好几个方式处理这个异常。首先,我会演示一个更普遍的情况,怎么在外部处理子任务产生的错误。
在上一个原则的 RunAsync() 方法在多个并行操作使用。这意味你要捕获 AggregateException 的 InnerException 集合的异常。越多并行操作,嵌套就越深。因为并行操作有不同的构成,你应该防止多次复制在异常集合的元素异常。我修改 RunAsync() 以处理可能的错误:
```
try
{
urls.RunAsync( url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
}
catch (AggregateException problems)
{
ReportAggregateError(problems);
}
private static void ReportAggregateError(AggregateException aggregate)
{
foreach (var exception in aggregate.InnerExceptions)
if (exception is AggregateException)
ReportAggregateError( exception as AggregateException);
else
Console.WriteLine(exception.Message);
}
```
ReportAggregateError 输出所有不是 AggregateException 自身异常的信息。当然,这会掩盖所有异常,不管你是否有没有预料到。这是相当危险的。相反,你需要处理你可以从中恢复的异常,或者重抛出其他异常。
这里有很多集合递归,所以有一个试用的函数式很有意义的。泛型方法必须知道哪些异常类你要处理,哪些异常你是不期望的并且你要处理那些你想要处理的异常。你需要为这个方法确定要处理的异常类型和处理异常的代码。这是简单的类型和 Action<T> lambda 表达式的字典。并且,如果处理没有处理 InnerException 集合的每个异常,清楚哪些异常出现异常。这说明你要重新抛出原来的异常。下面是新的 Runsync 代码:
```
try
{
urls.RunAsync(url => startDownload(url), task => finishDownload(task.AsyncState.ToString(), task.Result));
}
catch (AggregateException problems)
{
var handlers = new Dictionary<Type, Action<Exception>>();
handlers.Add(typeof(WebException),ex => Console.WriteLine(ex.Message));
if (!HandleAggregateError(problems, handlers))
throw;
}
```
HandleAggregateError 方法递归查看每个异常。如果异常是预料的,处理器会被调用。否则, HandleAggregateError 返回 false ,说明这类异常不能被正确处理:
```
private static bool HandleAggregateError(AggregateException aggregate, Dictionary<Type, Action<Exception>> exceptionHandlers)
{
foreach (var exception in aggregate.InnerExceptions)
if (exception is AggregateException)
return HandleAggregateError( exception as AggregateException, exceptionHandlers);
else if (exceptionHandlers.ContainsKey( exception.GetType()))
{
exceptionHandlers[exception.GetType()] (exception);
}
else
return false;
return true;
}
```
这点代码看着有些密集,但是并不难。当它传入一个 AggregateException ,它会对子列递归评估。当遍历到任何其他异常,它会查询字典。如果处理器 Action<> 已经被注册,就会调用这个处理器。如果没有,就会理解返回 false ,即发现一个不能处理的异常。
你会奇怪为什么当没有注册处理器抛出的是元素的 AggregateException 而不是单一的异常。抛出集合中的单一异常会丢失重要信息。 InnerException 可能包含很多异常。可能会有多个异常是没有预料到的。你必须返回这个集合而避免丢失太多信息。很多情况,AggregateException 的 InnerException 集合只有一个异常。然而,你不能那样写代码因为当你想要额外的信息,它却不在那。
当然,这会感觉有一点丑陋。还有没有更好的防止异常出现使得任务离开运行的后台工作的办法。在几乎所有情况,这是更好的。修改代码使得正在运行的后台任务确保没有异常能停止这个后台任务。当你使用 TaskCompletionSource<> 类,就说明你没有调用 TrySetException() ,而是确保每个任何调用 TrySetResult() 表示完成。这就有了下面对 startDownload() 的修改。但是,正如和我前面说的,你不能只是捕获每个异常。你应该只捕获可以从中恢复的异常。在这个例子中,你可以从 WebException 恢复,这个异常出现因为远程主机不可访问。其他异常类型可能表明更严重的问题。那些会持续产生的异常会终止所有处理。 startDownload 方法有了下面的修改:
```
private static Task<byte[]> startDownload(string url)
{
var tcs = new TaskCompletionSource<byte[]>(url);
var wc = new WebClient();
wc.DownloadDataCompleted += (sender, e) =>
{
if (e.UserState == tcs)
{
if (e.Cancelled)
tcs.TrySetCanceled();
else if (e.Error != null)
{
if (e.Error is WebException)
tcs.TrySetResult(new byte[0]);
else
tcs.TrySetException(e.Error);
}
else
tcs.TrySetResult(e.Result);
}
};
wc.DownloadDataAsync(new Uri(url), tcs);
return tcs.Task;
}
```
WebException 的返回说明0字节数组读取,而且所有其他异常会通过正常的通道抛出。对的,也就是说当 AggregateException 被抛出仍可以继续对正在发生进行处理。很可能你只是把它们当做致命错误,而你的后台任务可以继续处理其他错误。但是你需要理解所有不同类型的异常。
当然,如果你使用 LINQ 语法,后台任务的错误有引起其他问题。记得原则35我描述了三条和并行算法的区别。在所有情况下,使用 PLINQ 和正常的懒评估会有些变化,而这些变化是你在 PLINQ 算法处理异常时必须考虑的。请记住,通常,一个查询只有在其他代码请求这个查询产生的项时才执行。这当然不是 PLINQ 的工作。后台线程运行产生结果,而且另一个任务组合最后的结果序列。它不能立即评估。查询的结果不是立即产生的。然而,后台线程只要调用运行就会开始产生结果。现在,你意味着必须改变异常处理的代码。在典型的 LINQ 查询,你可以将使用查询结果的代码放在 try/catch 块内。这不需要包裹定义 LINQ 查询表达式的代码:
```
var nums = from n in data
where n < 150
select Factorial(n);
try
{
foreach (var item in nums)
Console.WriteLine(item);
}
catch (InvalidOperationException inv)
{
// elided
}
```
一旦涉及 PLINQ ,你必须在 tyr/cathc 块中封闭查询的定义。而且,当然,记住如果你使用 PLINQ ,你必须捕获 AggregateException 而不是无论你原来期望的是什么异常。不管你使用 Pipelining ,Stop&Go ,或者 Inverted Enumeration 算法都是正确的。
异常在任何算法中都是复杂的。并行任务引起更多并发症。 Parallel Task 库使用 AggregateExceptoion 类持有并行算法排除的所有异常。只要有一个后台线程抛出一个异常,其他后端操作都会被停止。你最好的计划是确保在你的并行任务执行代码时没有任何异常抛出。即使这,其他你没有期望的异常也有可能在某些地方抛出。这意味着你必须处理 AggregateException 以控制线程初始化所有后台工作。
小结:
异常,无论什么时候都要考虑进来,好吧,还没有用到 LINQ 和 PLINQ (工作没有这个需求),暂且没有深入的感受。
跑步去,加油,加油!
欢迎各种不爽,各种喷,写这个纯属个人爱好,秉持”分享“之德!
有关本书的其他章节翻译请[点击查看](/category/297763),转载请注明出处,尊重原创!
如果您对D.S.Qiu有任何建议或意见可以在文章后面评论,或者发邮件(gd.s.qiu@gmail.com)交流,您的鼓励和支持是我前进的动力,希望能有更多更好的分享。
转载请在**文首**注明出处:[http://dsqiu.iteye.com/blog/2088794](/blog/2088794)
更多精彩请关注D.S.Qiu的博客和微博(ID:静水逐风)
- 第一章 C# 语言习惯
- 原则1:使用 属性(Poperty)代替可直接访问的数据成员(Data Member)
- 原则2:偏爱 readonly 而不是 const
- 原则3:选择 is 或 as 而不是强制类型转换
- 原则4:使用条件特性(conditional attribute)代替 #if
- 原则5:总是提供 ToString()
- 原则6:理解几个不同相等概念的关系
- 原则7:明白 GetHashCode() 的陷阱
- 原则8:优先考虑查询语法(query syntax)而不是循环结构
- 原则9:在你的 API 中避免转换操作
- 原则10:使用默认参数减少函数的重载
- 原则11:理解小函数的魅力
- 第二章 .NET 资源管理
- 原则12:选择变量初始化语法(initializer)而不是赋值语句
- 原则13:使用恰当的方式对静态成员进行初始化
- 原则14:减少重复的初始化逻辑
- 原则15:使用 using 和 try/finally 清理资源
- 原则16:避免创建不需要的对象
- 原则17:实现标准的 Dispose 模式
- 原则17:实现标准的 Dispose 模式
- 原则18:值类型和引用类型的区别
- 原则19:确保0是值类型的一个有效状态
- 原则20:更倾向于使用不可变原子值类型
- 第三章 用 C# 表达设计
- 原则21:限制你的类型的可见性
- 原则22:选择定义并实现接口,而不是基类
- 原则23:理解接口方法和虚函数的区别
- 原则24:使用委托来表达回调
- 原则25:实现通知的事件模式
- 原则26:避免返回类的内部对象的引用
- 原则27:总是使你的类型可序列化
- 原则28:创建大粒度的网络服务 APIs
- 原则29:让接口支持协变和逆变
- 第四章 和框架一起工作
- 原则30:选择重载而不是事件处理器
- 原则31:用 IComparable&lt;T&gt; 和 IComparer&lt;T&gt; 实现排序关系
- 原则32:避免 ICloneable
- 原则33:只有基类更新处理才使用 new 修饰符
- 原则34:避免定义在基类的方法的重写
- 原则35:理解 PLINQ 并行算法的实现
- 原则36:理解 I/O 受限制(Bound)操作 PLINQ 的使用
- 原则37:构造并行算法的异常考量
- 第五章 杂项讨论
- 原则38:理解动态(Dynamic)的利与弊
- 原则39:使用动态对泛型类型参数的运行时类型的利用
- 原则40:使用动态接收匿名类型参数