Example17.10

※上記の広告は60日以上更新のないWIKIに表示されています。更新することで広告が下部へ移動します。

17.10 メールボックス

メールボックスはプロセス同期と通信のための、高度で柔軟な構造物です。メッセージの送受信が可能です。ここで メッセージ とは任意のオブジェクトを指します。シグナルのタイムアウトに使う TIMEOUT という特別のメッセージがあります。

 case object TIMEOUT

メールボックスは次のシグネチャを実装します。

 class   MailBox {
   def   send(msg: Any)
   def   receive[A](f: PartialFunction[Any, A]): A
   def   receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A
 }

メールボックスの状態はメッセージのマルチセットから成ります。メッセージは send メソッドで メールボックスへ追加されます。メッセージは receive メソッドで メールボックスから取り除かれ、メッセージプロセッサ f の引数に渡されます。 f はメッセージから何らかの結果型への部分関数です。通常、この関数はパターンマッチ式で実装されます。 receive メソッドは、そのメッセージプロセッサが定義されたメールボックスにメッセージが届くまで、ブロックされます。マッチしたメッセージはメールボックスから取り除かれ、ブロックされていたスレッドは再スタートしてメッセージプロセッサをそのメッセージに適用します。送られたメッセージとレシーバの双方とも時間順に並べられます。レシーバ r は、マッチしたメッセージ m へ適用されますが、それは、各コンポーネントを時間順に並べた個別の順序中で、m, r に先立つ {メッセージ,レシーバ}ペアが他にないときに限ります。

メールボックスの使い方の簡単な例として one-place バッファを考えてみましょう。

 class OnePlaceBuffer {
   private val m = new MailBox            // An internal mailbox
   private case class Empty, Full(x: Int) // Types of messages we deal with 
   m send Empty                           // Initialization
   def write(x: Int)
     { m receive { case Empty => m send Full(x) } }
   def read: Int =
     m receive { case Full(x) => m send Empty; x }
 }

メールボックスクラスは、次のようにも実装できます。

 class MailBox {
   private abstract class Receiver extends Signal {
     def isDefined(msg: Any): Boolean
     var msg = null
   }

テストメソッド isDefined を備えたレシーバ用の内部クラスを定義し、与えられたメッセージに対してレシーバが定義されているかどうかを示すようにします。レシーバは Signal クラスから、レシーバスレッドを起動するのに使われる nofify メソッドを継承します。レシーバスレッドが起動されると、適用すべきメッセージは Reciever の msg 変数に保存されます。

   private   val   sent = new LinkedList[Any]
   private   var   lastSent = sent
   private   val   receivers = new LinkedList[Receiver]
   private   var   lastReceiver = receivers

メールボックスクラスは2つの連結リストを保持していて、一つは、送信されたけれど取り出されていないメッセージ用で、もう一つは、ウェイトしているレシーバ用のものです。

 def send(msg: Any) = synchronized {
   var r = receivers, r1 = r.next
   while (r1 != null && !r1.elem.isDefined(msg)) {
     r = r1; r1 = r1.next
   }
   if (r1 != null) {
     r.next = r1.next; r1.elem.msg = msg; r1.elem.notify
   } else {
     lastSent = insert(lastSent, msg)
   }
 }

send メソッドは最初に、ウェイトしているレシーバがその送信されたメッセージに適用可能かどうかチェックします。もしそうなら、レシーバに通知されます。そうでなければ、メッセージは送信されたメッセージの連結リストに追加されます。

 def receive[A](f: PartialFunction[Any, A]): A = {
   val msg: Any = synchronized {
     var s = sent, s1 = s.next
     while (s1 != null && !f.isDefinedAt(s1.elem)) {
       s = s1; s1 = s1.next
     }
     if (s1 != null) {
       s.next = s1.next; s1.elem
     } else {
       val r = insert(lastReceiver, new Receiver {
         def isDefined(msg: Any) = f.isDefinedAt(msg)
       })
       lastReceiver = r
       r.elem.wait()
       r.elem.msg
     }
   }
   f(msg)
 }

recieve メソッドは最初に、メッセージプロセッサ関数 f が、既に送信されたけれどもまだ取り出されていないメッセージに適用可能かどうかチェックします。もしそうなら、スレッドは続けてすぐに f をそのメッセージに適用します。そうでなければ、新しいレシーバが作られてレシーバリストへリンクされ、スレッドはそのレシーバ上の通知を待ちます。スレッドは再び起動されると、f をそのレシーバに保存されたメッセージに適用します。 連結リストの insert メソッドは次のように定義されています。

     def insert(l: LinkedList[A], x: A): LinkedList[A] = {
       l.next = new LinkedList[A]
       l.next.elem = x
       l.next.next = l.next
       l
     }

メールボックスクラスは、指定された最大時間だけブロックする receiveWithin メソッドも提供しています。メッセージを指定された時間(ミリ秒で与えられる)以内に受信しなければ、メッセージプロセッサ引数 f は TIMEOUT という特別のメッセージでアンブロックされます。 recieveWithin の実装は receive とほとんど同じです。

     def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A = {
       val msg: Any = synchronized {
         var s = sent, s1 = s.next
         while (s1 != null && !f.isDefinedAt(s1.elem)) {
           s = s1; s1 = s1.next
         }
         if (s1 != null) {
           s.next = s1.next; s1.elem
         } else {
           val r = insert(lastReceiver, new Receiver {
               def isDefined(msg: Any) = f.isDefinedAt(msg)
           })
           lastReceiver = r
           r.elem.wait(msec)
           if (r.elem.msg == null) r.elem.msg = TIMEOUT
           r.elem.msg
         }
       }
       f(msg)
     }
 } // end MailBox

違いは、制限時間つきの wait コールと、その後の文だけです。


名前:
コメント:
最終更新:2011年02月24日 09:16
ツールボックス

下から選んでください:

新しいページを作成する
ヘルプ / FAQ もご覧ください。