メモ > TCPIP

Selector


androidだとSelector.openが使えない。

W/dalvikvm(  540): No implementation found for native org/apache/harmony/luni/platform/OSNetworkSystem.createSocketImpl (Ljava/io/FileDescriptor;Z)V
W/***(  540): java.lang.UnsatisfiedLinkError: createSocketImpl
W/***(  540): 	at org.apache.harmony.luni.platform.OSNetworkSystem.createSocketImpl(Native Method)
W/***(  540): 	at org.apache.harmony.luni.platform.OSNetworkSystem.createSocket(OSNetworkSystem.java:80)
W/***(  540): 	at org.apache.harmony.nio.internal.SocketChannelImpl.<init>(SocketChannelImpl.java:151)
W/***(  540): 	at org.apache.harmony.nio.internal.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:79)
W/***(  540): 	at org.apache.harmony.nio.internal.PipeImpl$SinkChannelImpl.<init>(PipeImpl.java:160)
W/***(  540): 	at org.apache.harmony.nio.internal.PipeImpl.<init>(PipeImpl.java:46)
W/***(  540): 	at org.apache.harmony.nio.internal.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:55)
W/***(  540): 	at org.apache.harmony.nio.internal.SelectorImpl.<init>(SelectorImpl.java:99)
W/***(  540): 	at org.apache.harmony.nio.internal.SelectorProviderImpl.openSelector(SelectorProviderImpl.java:63)
W/***(  540): 	at java.nio.channels.Selector.open(Selector.java:48)

SocketChannel

SocketChannel単体だと使えるのかどうかは未確認。

Socket

普通のソケットは使える。非同期アクセスできないのでスレッドを余計に消費する書き方にならざるを得ない。

おまけ


メインスレッドをブロックせずにソケットを使う例。
コード中で使っている InvokeKnockについては Looper を参照。

abstract class SocketManager{

   // 派生クラスが実装するべきメソッド
   abstract protected void onError(Throwable e);
   abstract protected void onConnect();
   abstract protected void onDisconnect(String reason);
   abstract protected void onRecvLine(String line);

   // 接続先の情報
   protected String mRemoteHost;
   protected int    mRemotePort;

   // 文字コード
   protected String mCharset;

   // メインスレッドのメッセージハンドラ
   protected Handler mHandler; 

   // 受信スレッド
   protected Thread  mThread;

   // ソケット
   protected Socket mSocket;

   // 進行状況
   protected CountDownLatch mCloseWait = new CountDownLatch(3);

   // 切断理由
   protected String mDisconnectReason;

   // 次に送信キューがカラになったら出力を閉じる
   protected boolean mOutputCloseGraceful = false; 

   // シャットダウン関連の処理
   protected void setDisconnectReason(final boolean bShutdownInput,final boolean bShutdownOutput,final String reason){
       InvokeKnock.call( mHandler,new Runnable(){

           @Override public void run(){
               // 切断理由がまだ設定されていないなら覚える
               if( mDisconnectReason == null )
                   mDisconnectReason = reason;

               // 入力のシャットダウン
               if( bShutdownInput && !mSocket.isInputShutdown()){
                   try{ mSocket.shutdownInput();          }catch(Throwable e){}
                   mCloseWait.countDown();
               }

               // 出力のシャットダウン
               if( bShutdownOutput && !mSocket.isOutputShutdown()){
                   try{ mSocket.shutdownOutput();          }catch(Throwable e){}
                   mCloseWait.countDown();
               }

               // シャットダウンを指定しなかった場合、
               // この後に出力キューがカラになったら閉じる
               if( !bShutdownInput && !bShutdownOutput )
                   mOutputCloseGraceful = true;
           }
       });
   }

   //////////////////////////////////////////

   // 出力処理
   protected OutputStream mOutputStream;

   // 出力キューは行単位で、高優先と低優先の2つ
   protected ConcurrentLinkedQueue<byte[]> mSendQ1 = new ConcurrentLinkedQueue<byte[]>();
   protected ConcurrentLinkedQueue<byte[]> mSendQ2 = new ConcurrentLinkedQueue<byte[]>();

   // 行の末尾に付与する改行文字
   public static final byte[] crlf = new byte[]{ 0x0d,0x0a };

   // IRC用のFlood protection
   protected long timePreCall;
   protected long timePenaltyLeft;

   // Handler経由で定期的に出力
   protected Runnable mWriter = new Runnable(){
       @Override public void run(){
           try{
               // ソケットの出力が完了している
               if( mSocket.isOutputShutdown() ) return;

               // 前回呼ばれてから経過した時間
               long now = SystemClock.uptimeMillis();
               long delta = now - timePreCall;
               if( delta > 0){
                   // ペナルティ時間を減らす
                   timePenaltyLeft -= delta;
                   if( timePenaltyLeft < 0 ) timePenaltyLeft =0;
               }

               // キューの内容を出力
               int nWrite =0;
               for(;;){
                   byte[] line = null;
                   if( !mSendQ1.isEmpty() ){
                       if( timePenaltyLeft > 0 ) break;
                       line = mSendQ1.poll();

                   }else if( !mSendQ2.isEmpty() ){
                       if( timePenaltyLeft > 0 ) break;
                       line = mSendQ2.poll();

                   }else{
                       // キューがカラ
                       if( mOutputCloseGraceful ){
                           // 出力完了
                           setDisconnectReason(false,true,null);
                           return;
                       }
                       break;
                   }
                   mOutputStream.write(line);
                   mOutputStream.write(crlf);
                   mOutputStream.flush();
                   ++nWrite;
                   timePenaltyLeft += 1000 + (line.length*1000/45);
               }
               // 少し待ってから再実行
               mHandler.postDelayed(mWriter,300);

           }catch(final Throwable e){
               setDisconnectReason(true,true,e.getMessage());
               InvokeKnock.call( mHandler,new Runnable(){ 
                   @Override public void run(){ onError(e); }
               });
           }
       }
   };

   // 受信はreadがブロックするので別スレッド
   protected Runnable mReader = new Runnable(){
       @Override public void run(){
           try{
               mSocket = new Socket(mRemoteHost,mRemotePort);
               try{
                   InputStream in = mSocket.getInputStream();
                   mOutputStream  = new BufferedOutputStream(mSocket.getOutputStream());

                   // 接続成功
                   mCloseWait.countDown();
                   InvokeKnock.call( mHandler,new Runnable(){ 
                       @Override public void run(){
                           onConnect();
                       }
                   });

                   mHandler.post(mWriter);

                   int buf_size = 4096;
                   int buf_used = 0;
                   byte[] buf  = new byte[buf_size];
                   byte[] buf2 = new byte[buf_size];
                   while( mDisconnectReason == null ){
                       if( buf_used >= buf_size ){
                           setDisconnectReason(true,true,"too long line received.");
                           break;
                       }
                       // 読めるだけ読むか、ブロックするかもしれないけど1バイト読む
                       int delta = in.available();
                       delta = (delta<=0?1:delta>(buf_size-buf_used)?(buf_size-buf_used):delta);
                       delta = in.read(buf,buf_used,delta);
                       if( delta == -1 ){
                           setDisconnectReason(true,false,"end of input stream.");
                           break;
                       }
                       // 改行で区切って文字コード変換
                       int line_start   = 0;
                       int buf_used_pre = buf_used;
                       buf_used += delta;
                       for(int i=buf_used_pre;i<buf_used;++i){
                           if( buf[i] != 0x0d && buf[i] != 0x0a ) continue;
                           if( i-line_start >0 ){
                               final byte[] fBuffer = buf;
                               final int    fStart  = line_start;
                               final int    fLength = i-line_start;
                               InvokeKnock.call( mHandler,new Runnable(){ 
                                   @Override public void run(){
                                       try{
                                           // 受信データの文字コード変換
                                           String line = new String(fBuffer,fStart,fLength,mCharset);
                                           // 通知
                                           onRecvLine(line);
                                       }catch(UnsupportedEncodingException e){
                                           onError(e);
                                       }
                                   }
                               });
                           }
                           line_start = i+1;
                       }
                       int new_used = buf_used - line_start;
                       if( new_used > 0 ){
                           // 残った部分をbuf2の先頭に移動する
                           System.arraycopy( buf,line_start,buf2,0,new_used);
                           // bufとbuf2をswap
                           byte[] tmp=buf;buf=buf2;buf2=tmp;
                       }
                       buf_used = new_used;
                   }
                   // 受信ループ終了。
                   // 出力側も閉じられるのを待つ
                   mCloseWait.await();
               }catch(final Throwable e){
                   setDisconnectReason(true,true,e.getMessage());
                   InvokeKnock.call( mHandler,new Runnable(){ 
                       @Override public void run(){
                           onError(e);
                       }
                   });
               }finally{
                   // 出力コールバックを停止する
                   mHandler.removeCallbacks(mWriter);

                   // ソケットを閉じる
                   try{ mSocket.close(); }catch(Throwable e){}

                   // 接続停止を通知する
                   InvokeKnock.call( mHandler,new Runnable(){ 
                       @Override public void run(){
                           onDisconnect(mDisconnectReason);
                       }
                   });
               }
               mSocket = null;
           }catch(final Throwable e){
               InvokeKnock.call( mHandler,new Runnable(){ 
                   @Override public void run(){ onError(e); }
               });
               return;
           }
       }
   };

   // コンストラクタ
   public SocketManager(){}

   // 接続開始
   public boolean start( Looper looper,String thread_name,String host,int port,String charset){
       mRemoteHost = host;
       mRemotePort = port;
       mCharset = charset;
       mHandler = new Handler(looper);
       mThread  = new Thread(mReader,thread_name);
       mThread.start();
       return true;
   }

   // 接続停止
   public void stop(String reason){
       if( isAlive() ){
           setDisconnectReason(true,true,reason);
           mThread.interrupt();
       }
   }

   // 接続中か調べる
   public boolean isAlive(){
       return mThread != null && mThread.getState() != Thread.State.TERMINATED;
   }

   // 行をバイト配列に変換して、送信キューに追加
   public void sendLine(boolean bPrior,String line){
       try{
           (bPrior?mSendQ1:mSendQ2).add( line.getBytes(mCharset));
       }catch(final Throwable e){
           InvokeKnock.call( mHandler,new Runnable(){ 
               @Override public void run(){ onError(e); }
           });
       }
   }
}
利用側
mSocketManager = new SocketManager(){
   @Override protected void onError(Throwable e){
       Log.e("hoge","onError",e);
   }
   @Override protected void onConnect(){
       Log.i("hoge","onConnect");
       mSocketManager.sendLine(true,"NICK :"+ conf_nickname);
       mSocketManager.sendLine(true,"USER "+conf_username+" 4 * :"+conf_realname);
   }
   @Override protected void onDisconnect(String reason){
       Log.i("hoge","onDisconnect reason="+reason);
   }
   @Override protected void onRecvLine(String line){
       Log.i("hoge","onRecvLine "+line);
   }
};
mSocketManager.start( Looper.myLooper(),"hoge SocketManager",host,port,charset);

タグ:

+ タグ編集
  • タグ:
最終更新:2007年11月18日 12:40
ツールボックス

下から選んでください:

新しいページを作成する
ヘルプ / FAQ もご覧ください。