Example17.10

「Example17.10」の編集履歴(バックアップ)一覧はこちら

Example17.10 - (2010/09/17 (金) 13:03:51) の1つ前との変更点

追加された行は緑色になります。

削除された行は赤色になります。

**17.10 Mailboxes **17.10 メールボックス Mailboxes are high-level, flexible constructs for process synchronization and communication. They allow sending and receiving of messages. A message in this context is an arbitrary object. There is a special message TIMEOUT which is used to signal a time-out. メールボックスはプロセス同期と通信のための、高度で柔軟な構造物です。メッセージの送受信が可能です。ここでメッセージとは任意のオブジェクトをさします。シグナルのタイムアウトに使う TIMEOUT という特別のメッセージがあります。 case object TIMEOUT Mailboxes implement the following signature. メールボックスは次のシグナチャを実装します。 class MailBox { def send(msg: Any) def receive[A](f: PartialFunction[Any, A]): A def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A } The state of a mailbox consists of a multi-set of messages. Messages are added to the mailbox with the send method. Messages are removed using the receive method, which is passed a message processor f as argument, which is a partial function from messages to some arbitrary result type. Typically, this function is implemented as a pattern matching expression. The receive method blocks until there is a message in the mailbox for which its message processor is defined. The matching message is then removed from the mailbox and the blocked thread is restarted by applying the message processor to the message. Both sent messages and receivers are ordered in time. A receiver r is applied to a matching message m only if there is no other {message, receiver} pair which precedes m, r in the partial ordering on pairs that orders each component in time. メールボックスの状態はメッセージのマルチセットから成ります。メッセージは send メソッドで メールボックスへ追加されます。メッセージは receive メソッドで メールボックスから取り除かれ、メッセージプロセッサ f の引数に渡されます。 f はメッセージから何らかの結果型への部分関数です。典型的にはこの関数はパターンマッチ式で実装されます。 receive メソッドは、そのメッセージプロセッサが定義されたメールボックスにメッセージが届くまで、ブロックされます。マッチしたメッセージはメールボックスから取り除かれ、ブロックされていたスレッドは再スタートしてメッセージプロセッサをそのメッセージに適用します。送られたメッセージとレシーバの双方とも時間順に並べられます。レシーバ r は、マッチしたメッセージ m へ適用されますが、それは、各コンポーネントを時間順に並べた個別の順序中で、m, r に先立つ {メッセージ,レシーバ}ペアが他にないときに限ります。 As a simple example of how mailboxes are used, consider a one-place buffer: メールボックスの使い方の簡単な例として one-place バッファを考えてみましょう。 class OnePlaceBuffer { private val m = new MailBox // An internal mailbox private case class Empty, Full(x: Int) // Types of messages we deal with m send Empty // Initialization def write(x: Int) { m receive { case Empty => m send Full(x) } } def read: Int = m receive { case Full(x) => m send Empty; x } } Here's how the mailbox class can be implemented: メールボックスクラスは次のようにも実装できます。 class MailBox { private abstract class Receiver extends Signal { def isDefined(msg: Any): Boolean var msg = null } We define an internal class for receivers with a test method isDefined, which indicates whether the receiver is defined for a given message. The receiver inherits from class Signal a notify method which is used to wake up a receiver thread. When the receiver thread is woken up, the message it needs to be applied to is stored in the msg variable of Receiver. レシーバ用にテストメソッド isDefined を備えた内部クラスを定義し、レシーバが与えられたメッセージに対して定義されているかをどうかを示すようにします。レシーバは Signalクラスから、レシーバスレッドを起動するのに使われる nofify メソッドを継承します。レシーバスレッドが起動されると、適用されるべきメッセージは Reciever の msg 変数に保存されます。 private val sent = new LinkedList[Any] private var lastSent = sent private val receivers = new LinkedList[Receiver] private var lastReceiver = receivers The mailbox class maintains two linked lists, one for sent but unconsumed messages, the other for waiting receivers. メールボックスクラスは2つの連結リストを保持していて、一つは送信されたけれど取り出されていないメッセージ用で、もう一つは待っているレシーバ用のものです。 def send(msg: Any) = synchronized { var r = receivers, r1 = r.next while (r1 != null && !r1.elem.isDefined(msg)) { r = r1; r1 = r1.next } if (r1 != null) { r.next = r1.next; r1.elem.msg = msg; r1.elem.notify } else { lastSent = insert(lastSent, msg) } } The send method first checks whether a waiting receiver is applicable to the sent message. If yes, the receiver is notified. Otherwise, the message is appended to the linked list of sent messages. send メソッドは最初に、待っているレシーバがその送信されたメッセージに適用可能かどうかチェックします。もしそうなら、レシーバに通知されます。そうでなければ、メッセージは送信されたメッセージの連結リストに追加されます。 def receive[A](f: PartialFunction[Any, A]): A = { val msg: Any = synchronized { var s = sent, s1 = s.next while (s1 != null && !f.isDefinedAt(s1.elem)) { s = s1; s1 = s1.next } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = insert(lastReceiver, new Receiver { def isDefined(msg: Any) = f.isDefinedAt(msg) }) lastReceiver = r r.elem.wait() r.elem.msg } } f(msg) } The receive method first checks whether the message processor function f can be applied to a message that has already been sent but that was not yet consumed. If yes, the thread continues immediately by applying f to the message. Otherwise, a new receiver is created and linked into the receivers list, and the thread waits for a notification on this receiver. Once the thread is woken up again, it continues by applying f to the message that was stored in the receiver. The insert method on linked lists is defined as follows. recieve メソッドは最初に、メッセージプロセッサ関数 f が既に送信されたけれどもまだ取り出されていないメッセージに適用可能かどうかチェックします。もしそうなら、スレッドは続けてすぐに f をそのメッセージに適用します。そうでなければ、新しいレシーバが作られてレシーバリストへリンクされ、スレッドはそのレシーバ上の通知を待ちます。スレッドは再び起動されると、f をそのレシーバに保存されたメッセージに適用します。 連結リストの insert メソッドは次のように定義されています。 def insert(l: LinkedList[A], x: A): LinkedList[A] = { l.next = new LinkedList[A] l.next.elem = x l.next.next = l.next l } The mailbox class also offers a method receiveWithin which blocks for only a specified maximal amount of time. If no message is received within the specified time interval (given in milliseconds), the message processor argument f will be unblocked with the special TIMEOUT message. The implementation of receiveWithin is quite similar to receive: メールボックスクラスは、指定された最大時間の間だけブロックする receiveWithinメソッドも提供しています。メッセージを指定された時間(ミリ秒で与えられる)以内に受信しなければ、メッセージプロセッサ引数 f は特別の TIMEOUT メッセージを持ってアンブロックされます。 recieveWithin の実装は receive とほとんど同じです。 def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A = { val msg: Any = synchronized { var s = sent, s1 = s.next while (s1 != null && !f.isDefinedAt(s1.elem)) { s = s1; s1 = s1.next } if (s1 != null) { s.next = s1.next; s1.elem } else { val r = insert(lastReceiver, new Receiver { def isDefined(msg: Any) = f.isDefinedAt(msg) }) lastReceiver = r r.elem.wait(msec) if (r.elem.msg == null) r.elem.msg = TIMEOUT r.elem.msg } } f(msg) } } // end MailBox The only differences are the timed call to wait, and the statement following it. 違いは、制限時間つきの wait コールとその後の文だけです。 ---- #comment

表示オプション

横に並べて表示:
変化行の前後のみ表示:
ツールボックス

下から選んでください:

新しいページを作成する
ヘルプ / FAQ もご覧ください。