Haskell学习(4)-并行和多核计算
因为这章是我这次研究Haskell的重点,因此全文直接翻译。
在我们编写这本书的时候,CPU的架构变化比任何一个时期都要快。
定义并发和并行
一个并发程序需要同时进行多个可能毫无关联的任务。考虑游戏服务器的例子:服务器很普遍的会由多个的部分组成,其中的每一个都与外界有着复杂的交互。一个部分可能负责多用户的交谈,其他的一些部分可能需要处理用户的输入,以及将状态的更新返回给用户;同时其他的一些部分还在进行物理计算。
一个正确运行的并发程序并不需要有多核的支持,尽管有了他们的确可以提高程序的效率和反应。
相反,一个并行的程序解决一个单一问题。考虑经济学模式中常侍预测某一支股票在接下来几分钟的价格波动情况。如果我们需要将这个模型应用到交易中的每一支股票,比如来推断我们需要买哪些卖哪些,将这个模型在五百个核的计算机上运行会比仅仅使用单核的计算机运行要快。通过这个我们可以看到,一个并行的程序并不是经常依靠于多核的出现来保证其正确性。
另外一个有用的并发和并行之间的区别存在于他们与外界的交互之中。从定义来看,并发程序不断的与网络协议,数据库以及相关的联系。一个标准的并行程序可能更加关注于某一点:数据流接入,运算一段时间(伴随很少的I/O),然后将数据流输出。
很多传统的编程语言更加模糊化了并发和并行语言之间的界限,因为他们迫使程序员用同样的基始来构造同样的程序。
在本章中,我们将会关注并发和并行程序执行在以个单一的操作系统进程之中。
通过线程的并发程序
作为并发程序的一个建筑模块,大多数的编程语言都提供了来操作控制多个独立线程的方式。Haskell也不例外,尽管利用线程在Haskell中看上去和其他语言有一些不同。
在Haskell中,一个线程是一个IO行为,独立于其他线程而单独执行的。为了创造一个线程,我们引入Control.Concurrent模块,并且使用forkIO方法。
ghci> :m + Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> :m + System.Directory
ghci> forkIO (writeFile “xyzzy” “seo craic nua!”) >> doesFileExist “xyzzy”
False
新的线程在程序执行开始的一刹那就开始运行,并且创造新线程的线程同时也在并发执行。新线程直到IO运行结束才停止运行。
线程是非确定行的
GHC的运行时部分并没有明确它执行线程的先后顺序。因此,如同在上面的程序中,新线程创建的文件xyzzy可能,也可能还没来得及新建当原始线程检查它的存在时。如果我们运行这个例子一次,移除掉xyzzy文件后再运行一次,我们可能会得到和第一次运行不一样的结果。
掩盖延迟
假定我们有一个很大的文件需要压缩写入磁盘,但是我们需要很快的处理用户的输入来保证用户可以及时地得到程序的反馈。如果我们使用forkIO来创造一个新线程进行文件的写操作,我们可以同时来完成这两件事。
– file: ch24/Compressor.hs
import Control.Concurrent (forkIO)
import Control.Exception (handle)
import Control.Monad (forever)
import qualified Data.ByteString.Lazy as L
import System.Console.ReadLine (readline)
– Provided by the ‘zlib’ package on http://hacakge.haskell.org/
import Codec.Compression.GZip (compress)
main = do
maybeLine <- readline “Enter a file to compress> ”
case maybeLine of
Nothing -> return () — user entered EOF
Just “” -> return () — treat no name as “want to quit”
Just name -> do
handle print $ do
content <- L.readfile name
forkIO (compressFile name content)
return ()
main
where compressFile path = L.writeFile (path ++ “.gz”) . compress
因为我们使用的lazy ByteString I/O,我们所需要做的只是在主线程中打开文件。实际的读文件是在需要的时候才在另一个线程中实现。
handle print的使用给了我们一个简单的方式来打印错误的信息,当用户输入的文件名不存在的时候。
线程之间的简单通信
最简单的让两个线程共享信息的办法是让他们使用同一个变量。在我们的文件压缩的例子之中,主线程同其他线程共享了文件名和文件的内容。因为Haskell的数据默认情况下是不可改变的,这就产生了风险:线程可以改变其他线程对于文件名和内容的值。
我们经常需要线程主动的和其他线程交互。比如,GHC并没有提供方法供一个线程来查找其他的线程是否在运行中,完成了,或者发生了冲突。不过,GHC提供了一种同步变量类型MVar,我们可以用它来达到以上这些目的。
一个MVar变量的行为类似于一个单一元素的盒子:它要么是空的,要么是满的。我们可以把东西放进盒子使它变满,或者把东西拿出使它变空。
ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a
如果我们试着将一个值放入已经满的MVar中,我们的线程将会暂时进入休眠状态直到另一个线程把其中的值取出。同样的,如果我们试图从一个空的MVar中取出值的话,我们的线程也将会暂时进入休眠直到另一个线程放入值到其中。
–file: ch24/MVarExample.hs
import Control.Concurrent
communicate = do
m <- newEmptyMVar
forkIO $ do
v <- takeMVar m
putStrLn (”received ” ++ show v)
putStrLn “sending”
putMVar m “wake up!”
newEmptyMVar方法有一个描述性的名字。为了创建一个开始时是空的MVar对象,我们需要使用newMVar。
ghci> :t newEmptyMVar
newEmptyMVar :: IO (MVar a)
ghci> :t newMVar
newMVar :: a -> IO (MVar a)
让我们在ghci中运行程序。
ghci> :load MVarExample
[1 of 1] Compiling Main (MVarExampole.hs, interpreted)
OK, modules loaded: Main.
ghci> communicate
sending
rece
如果你拥有传统开发语言的并发编程的背景,你可以将MVar对象看作为两个相似的有用的目的。
- 从一个线程到另一个线程发送消息:比如,一个通知
- 为一块线程之间共享的不定的数据提供互斥操作。我们将数据放入MVar中当它没有被任何其他线程使用,并且一个线程将其暂时取出并且读或者修改它。
主线程和等待其他线程
GHC的运行时系统对待程序的初始线程和其他线程是不一样的。当主线程完成执行时,运行时系统认为所有的程序都已经结束。如果有任何其它线程还在运行,都将会被终止。
因此,当我们需要有运行时间很长的线程不被终止,我们必须采用一些特殊的安排来保证主线程在其它线程结束之后才完成。让我们开发一个简单的库来简单实现这个功能。
–file: ch24/NiceFork.hs
import Control.Concurrent
import Control.Exception (Exception, try)
import qualified Data.Map as M
data ThreadStatus = Running
| Finished — terminated normally
| Threw Exception — killed by uncaught exception
deriving (Eq, Show)
– | Create a new thread manager
newManager :: IO ThreadManager
– | Create a new managed thread
forkManaged :: ThreadManager -> IO () -> IO ThreadId
– | Immediately return the status of a managed thread
getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
– | Block until a specific managed thread terminates
waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
– | Block until all managed threads terminate
waitAll :: ThreadManager -> IO()
我们使用常用属性来维持ThreadManager抽象类型:将其包装在newType,并且阻止客户端来创造这个类型的值。在引入的模块中,我们列举了类型的构造器和IO方法来构造一个管理器,但是我们并没有导出数据构造器。
–file: ch24/NiceFork.hs
module NiceFork
(
ThreadManager,
, newManager
, forkManaged
, getStatus
, waitFor
, waitAll
) where
对于ThreadManager的实现,我们保存了一个从线程ID到线程状态的映射,将其称之为线程映射。
– file: ch24/NiceFork.hs
newtype ThreadManager =
Mgr (MVar (M.Map ThreadId (MVar ThreadStatus)))
deriving (Eq)
newManager = Mgr ‘fmap’ newMVar M.empty
我们有两层的MVar在这里可以使用,将映射保存在MVar之中。这可以使得对映射的修改成为可能。同时也保证了任何使用这个映射的线程将会得到一致的结果。
为了创造一个线程并且观察它的状态,我们需要一点点的簿记。
– file : ch24/NiceFork.hs
forkManaged (Mgr mgr) body =
modifyMVar mgr $ \m -> do
state <- newEmptyMVar
tid <- forkIO $ do
result <- try body
putMVar state (either Threw (const Finished) result)
return (M.insert tid state m, tid)
安全地修改MVar
在forkManaged中使用的modifyMVar方法是十分有用的:它是takeMVar和putMVar的安全集合。
ghci> :t modifyMVar
ved “wake up!”
modifyMVar :: MVar a -> ( a -> IO (a,b)) -> IO b
它从MVar中取出值,然后传递给一个函数。这个函数即可以生成一个新的值,也可以返回一个结果。如果函数抛出异常,modifyMVar将原来的值重新放回MVar,否则就将新值放入MVar。它返回函数的另一部分作为其结果。
当使用modifyMVar来取代手动用takeMVar和putMVar来操作MVar时,我们需要避免两类常见的并发错误。
- 忘记将值放回MVar中。这将会使一些线程等待那些永远不可能在MVar中出现的值,从而产生死锁。
- 错误的考虑异常被抛出的可能性而影响了代码流。这可能会使得程序在不需要的情况下调用putMVar方法从而也导致了死锁。
因为这些安全属性,最好在任何时候都使用modifyMVar。
安全资源管理:一个好的并且简单的办法
我们可以采用modifyMVar的模式,将其应用到许多其他的资源管理情形中去。以下是模式的步骤。
1. 获得一个资源。
2. 将这个资源传递给需要用到它的函数。
3. 一定要释放资源,即使在函数发生异常的情况下。如果真的发生了异常,再次抛出让它能够被程序捕捉到。
除了安全之外,这个方法还有另外一个有点:它使得代码更加简短而容易理解。从上面的forkManaged可以看出,Haskell的匿名函数的轻量级语法使得程序视觉上觉得不突出。
以下是modifyMVar的定义,可以看到针对该模式的形式。
– file: ch24/ModifyMVar.hs
import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) — use Control.Exception’s version
modifyMVar :: MVar a -> ( a -> IO ( a,b ) ) -> IO b
modifyMVar m io =
block $ do
a <- takeMVar m
(b,r) <- unblock (io a) ‘catch’ \e ->
putMVar m a >> throw e
putMVar m b
return r
你应该很容易的将这种模式应用到你的需求之中,无论你是在和网络连接,数据库操作,还是C语言库的数据操作。
得到一个线程的状态
getStatus方法能够得到一个线程的状态。如果这个线程不再被管理(或者从来都没有被管理过,它将返回Nothing。
– file: ch24/NiceFork.hs
getStatus (Mgr mgr) tid =
modifyMVar mgr $ \m ->
case M.lookup tid m of
Nothing -> return (m, Nothing)
Just st -> tryTakeMVar st >>= \mst -> case mst of
Nothing -> return (m, Just Running)
Just sth -> return (M.delete tid n, Just sth)
如果该线程仍然在运行之中,程序将返回Just Running。否则,程序将会指出为什么线程被终止了,并且停止管理该线程。
如果tryTakeMVar方法找到了一个空MVar,他将立即返回Nothing而不是阻塞。
ghci> :t tryTakeMVar
tryTakeMVar :: MVar a -> IO (Maybe a)
否则,程序会按常返回MVar中的值。
waitFor方法类似地运行,但是它不会立即返回结果,而是阻塞直到该线程终止为止。
– file: ch24/NiceFork.hs
waitFor (Mgr mgr) tid = do
maybeDone <- modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, Nothing)
(done, m’) -> (m’, done)
case maybeDone of
Nothing -> return Nothing
Just st -> Just ‘fmap’ takeMVar st
该方法首先取出拥有线程状态的MVar,如果它存在的话。映射类型的updateLookupWithKey方法是十分有用的:它将查找,修改或者移除值的功能合并在了一起。
ghci> :m +Data.Map
ghci> :t updateLookupWithKey
updateLookupWithKey :: (Ord k) =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, M)
在这个例子之中,我们总是希望在程序状态给出的时候移除掉MVar,所以线程管理器可以不再管理该线程。如果存在一个值可以提出的话,我们将线程的退出状态从MVar中提出并且去返回它。
–file: ch24/NiceFork.hs
waitAll (Mgr mgr) = modifyMVar mgr elems >>= mapM_ takeMVar
where elems m = return (M.empty, M.elems m)
写出更紧凑的代码
上面的关于waitFor的定义有一点是让人不满意的,因为我们执行了差不多同样的分析在两个地方:在方法内称之为modifyMVar,同样在返回值上。
毫无疑问,可以利用前面的方法来消除这个重复。这个正在讨论的方法就是join,来自于Control.Monad模块。
ghci> :m +Control.Monad
ghci> :t join
join :: (Monad m) => m (m a) -> m a
这里的技巧在于我们可以通过利用第一个情况的返回的IO来处理第二个情况,通过一次运行modifyMVar。我们将会使用join来执行。
–file: ch24/NiceFork.hs
waitFor2 (Mgr mgr) tid =
join . modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ ->Nothing) tid m of
(Nothing, _) -> (m, return Nothing)
(Just st, m’) -> (m’, Just ‘fmap’ takeMVar st)
这是一个十分有趣的办法:我们可以创造一个monadic方法利用纯代码,然后将其传递直到我们在monad中结束。这是一种灵活的方法来写代码,当我们留心发现它 时候。
This entry was posted on Friday, May 15th, 2009 at 2:04 am and is filed under Haskell . You can follow any responses to this entry through the RSS 2.0 feed. Both comments and pings are currently closed.



