2 Twisted解剖図鑑

Twistedは強力なフレームワークですが,基本となる考え方や仕組みはそれほど多くありません.ここでは,Twistedの基本構造,APIを眺めてみます.

Deferred/Reactorの基本


非同期プログラミングでは,コールバック関数によってアプリケーション固有の処理を実行します.Twistedでは,コールバックの順番はtwisted.internet.deferredオブジェクトが管理します.クライアントは,非同期リクエストが利用可能になると,順番にdeferredオブジェクトのメソッドを実行していきます.この実行されるメソッドをコールバックやコールバックチェーンと言います.

非同期処理の中でエラーが発生した場合もコールバック関数が呼ばれます.エラーが発生したときに実行されるコールバックをエラーバックやエラーバックチェーンと言います.deferredには,複数のコールバック関数やエラーバック関数が登録できます.複数登録した場合は,最初に登録したものから順番に実行されます.


プログラムには,さまざまな処理の遅延が発生します.deferredクラスは遅延の問題にも対処できるようにデザインされています.遅延には,ハードディスクへのアクセスやデータベース処理,ネットワーク上の遅延などありますが,これらの遅延に対し統一的に対処できるようになっています.今回はネットワークについての非同期処理を扱いますが,Twistedはファイルアクセスやコンソールへの入出力,データベースなど,遅延が発生しうるものはすべて非同期に扱うことができるように設計されています.

Twistedでは,deferredオブジェクトを返す関数が沢山あります.取得したdeferredは2つの意味/役割を持っています.データを取得するための待ち状態にあるという意味(待ちフラグ),データ取得やエラー発生時に実行するコールバック関数を登録する役割です.


簡単なクライアント


データの取得を待つdeferredの簡単な例を見てみましょう.次のコードは,twisted.web.client.getPageを用いて,Webページのデータを取得して標準出力に出力する例です.getPageはdeferredオブジェクトを返します.取得したdeferredオブジェクトには,Webページが取得できたときに実行するコールバック関数を指定しています.エラーが発生したときは,addErrbackで登録した関数がコールされてエラー内容を出力します.
from twisted.web.client import getPage
from twisted.internet import reactor

def printContent(content):
    print content
    # Twistedのイベントループの終了
    reactor.stop()

def handleError(error):
    print "Error: " + str(error)
    # Twistedのイベントループの終了
    reactor.stop()

deferred = getPage("http://www.liris.org/")
deferred.addCallback(printContent).addErrback(handleError)

reactor.run()


reactor


1つのスレッドで複数の接続を非同期処理するために,Twistedには独自のイベントループがあります.Twistedでは,このイベントループを回さなければいけません.この独自のイベントループを処理するモジュールは,twisted.internet.reactorです.イベントループは次のように実行します.この関数を実行すると,reactor.stop()関数をコールするまでブロックされます.次のコードでは,ページを取得するか,エラーが発生するまでイベントループが回ります.
reactor.run()

Twistedは,サーバ以外にもBitTorrentなどのクライアントGUIアプリケーションで多く利用されています.一般的に独自のイベントループを持つフレームワークは,既存の他のGUIライブラリやフレームワーク内で使用することは難しいと言えます.

たとえば,MicrosoftのWindowsでWin32 APIを用いたプログラミングの場合,Win32用のイベントループと独自のイベントループの両方を同時に動かす必要があります.その場合,たいていのフレームワークではそれぞれを別々のスレッドで動かし,非同期プログラミングのメリット(シングルスレッドのメリット)を十分に活かしきれないケースもあります.

Twistedでは,複数の種類のreactorがあります.reactorの種類によっては,一部実現できない機能などもありますが,既存のフレームワークとの親和性が高いことは特徴の1つです.また,Win32プログラミング用としてtwisted.internet.win32eventreactorがあります.win32eventreactorを利用する場合は,最初にreactorをインポートする前に,次のように記述します.
from twisted.internet imort win32eventreactor
win32eventreactor.install()
from twisted.internet import reactor


あとは,通常のreactorと同じようにreactor.run()を実行します.これによって,Win32イベントループがwin32eventreactorの中で実行されます.ネットワークの非同期処理には,通常のselect/pollではなく,Win32API(Winsock)が使用されます.

reactorには,win32eventreactor以外にもGTK1.2用のgtkreactor,GTK2用のgtk2reactor,Qt用のqtreactor,wxWidgets用のwxreactorなどさまざまなreactorが用意されています.


サーバの構築


Twistedは高機能なサーバを構築できるように設計されています.高機能なサーバを統一的に構築できるようにするために,いくつかの決まりごとがあります.この決まりごとの中核にあるのがtwisted.internet.protocol.Protocolクラスです.

Protocolレイヤ


Protocolクラスでは,ネットワークから送信されてきたデータ(プロトコル)を解析して処理するためのクラスです.通常はProtocolクラスの派生クラスを作成して,プロトコルのハンドリングを行います.このクラスは,twisted.internet.protocol.Factoryクラス,またはそのサブクラスによって必要に応じてインスタンス化されます.インスタンス化されたオブジェクトは,接続が終了すると破棄されます.

Factoryクラス


Factoryクラス,またはそのサブクラスは,サーバとして使用するポート番号やネットワークインターフェースなど,ネットワークに関する情報は持っていません.Protocolクラスのオブジェクトのライフサイクルを管理するだけです.Factoryクラスとネットワークの情報を結びつけるのが,twisted.internet.interfaces.IReactorTCPクラスのlistenTCP関数です.

listenTCP関数

listenTCPでは,指定したProtocolファクトリとTCP/IPのポート番号,ネットワークインターフェースを結び付けて,リモートのクライアントから接続されたときにProtocolクラスをインスタンス化します.このため,同一のサービスを複数のポートや複数のネットワークインターフェースに対して同時に提供することができます.

インスタンス化されたProtocolオブジェクトが,インスタンス化されるときに使用したFactoryクラスは,Protocolオブジェクトのfactoryフィールドで参照できます.

Protocolクラスは,直接ネットワークからのデータの受信を行いません.ネットワークからデータが受信されると,それに応じてProtocolクラスのdataRecievedメソッドがコールされます.Protocolクラスには,dataRecieved以外にも接続が確立したときにコールされるconnectionMadeや,接続が切断されたときにコールされるconnectionLostなどのメソッドを定義できます.


Protocolを使ったサーバ構築

次にProtocolを使った簡単なEchoサーバのコードを見てみます.
from twisted.internet.protocol import Protocol

class Echo(Protocol):
    def connectionMade(self):
        self.factory.numProtocols = self.factory.numProtocols+1
        if self.factory.numProtocols > 100:
            self.transport.write("Too many connections, try later")
            self.transport.loseConnection()
   
    def connectionLost(self, reason):
        self.factory.numProtocols = self.factory.numProtocols-1
   
    def dataReceived(self, data):
        self.transport.write(data)


こののコードではまず,接続が確立するとEchoクラスのconnectionMadeがコールされます.このメソッドの中では,現在のProtocolオブジェクトの数をself.factory.numProtocolsで管理しています.
複数のProtocolオブジェクトは,単一のFactoryオブジェクトから作成されるので,Protocolオブジェクトのライフサイクルを超えるような永続的なデータはFactoryオブジェクトに保存します.ここでは同時接続数を100に限定して,それを超える接続は,「Too many connections, try later」を返信して接続を閉じます.

接続が終了するとconnectionLostというメソッドがコールされます.ここでは,現在のアクティブな接続を1つ減らしています.

接続が確立してデータをリモートから受信すると,dataRecievedがコールされます.dataRecievedの引数には,リモートから受信したデータがセットされています.ここでは,リモートのクライアントから受信したデータをそのままクライアントに送信し直しています.

Factoryオブジェクトは,Protocolクラスをインスタンス化するときにtransportオブジェクトをセットします.transportオブジェクトは,リモートのクライアントにデータを送信するために利用します.transportオブジェクトに対して,writeメソッドをコールすることで,リモートのクライアントにデータを送信できます.また,サーバ側から接続を終了したい場合は,transportオブジェクトのloseConnectionをコールします.

Protocolクラスがインスタンス化されると,Protocolクラス,Factoryクラスとネットワークとを関連付けます.ポート番号1234でEchoサーバを起動したい場合は,次のようになります.
from twisted.internet.protocol import Factory
from twisted.internet import reactor

factory = Factory()     # Factoryオブジェクトの生成
factory.protocol = Echo # インスタンス化するProtocolクラスを指定
factory.numProtocols = 0# Echoオブジェクトが参照するフィールドの初期化

# ポート番号1234でリッスンして,接続が確立したときに使用する
# Factoryオブジェクトを設定
reactor.listenTCP(1234, factory)
# Twistedのイベントループの実行
reactor.run()


ここでは,まず,Factoryオブジェクトを初期化しています.listenTCPでは,ポート番号1234で接続を待ち受け,Factoryオブジェクトの設定をしています.

ヘルパプロトコル


HTTPやSMTP,FTPなどの多くのインターネットプロトコルは,CR-LFで区切られた行単位のプロトコルです.これらの行単位のプロトコルを処理するために,通常のProtocolクラスのサブクラスでLineRecieverクラスが定義されています.それ以外にもdjbのNetstringなどを扱うためのNetstringRecieverなどのヘルパープロトコルが定義されています.

LineRecieverクラスは,行単位での処理だけでなくモードを設定することで,受信したデータを加工せずにそのまま扱うことができます.行単位の処理に特化したLineOnlyRecieverも存在します.ここでは,LineOnlyRecieverを使って,先ほどのEchoクラスを書き換えてみます.
from twisted.protocols.basic import LineReceiver

class Echo(LineReciever):
    def connectionMade(self):
        self.factory.numProtocols = self.factory.numProtocols+1
        if self.factory.numProtocols > 100:
            self.transport.write("Too many connections, try later")
            self.transport.loseConnection()
   
    def connectionLost(self, reason):
        self.factory.numProtocols = self.factory.numProtocols-1
   
    def lineReceived(self, line):
        self.transport.write(line + "\r\n")


ここでは,dataRecievedの代わりにlineRecievedを実装しています.リモートから受信したデータは,CR-LFで区切られた各行に対して,lineRecievedメソッドがコールされます.lineRecievedがコールされるときに改行コードCRLFは取り除かれています.

クライアントの構築


Twistedはサーバだけなくクライアントアプリケーションの作成にも威力を発揮します.インターネットプロトコルを処理するための高レベルなクライアント用APIを提供しています.

サーバを構築するときと同様に,クライアントサイドでもデータを処理するための中核はtwisted.internet.protocol.Protocolを継承したプロトコルハンドラです.ほとんどのプロトコルハンドラは,Protocolクラスから直接派生したクラスになっていますが,Protocolクラスから派生したヘルパープロトコルクラスを継承することもできます.

クライアントでのProtocolオブジェクトのライフサイクルもサーバのものと同様です.クライアントがサーバに接続したときにインスタンス化されて,接続が終了すると破棄されます.

永続化するような情報もサーバと同じようにFactoryクラスが持ちます.サーバは通常twisted.internet.protocol.Factoryクラスからプロトコルオブジェクトが作成されますが,クライアントでは,twisted.internet.protocol.ClientFactoryクラスのオブジェクトがプロトコルオブジェクトをインスタンス化します.


プロトコル


まず,Echoクライアントでのプロトコルの例を見てください.
from twisted.internet.protocol import Protocol

class EchoClient(Protocol):
    def dataRecieved(self, data):
        print data
   
    def sendMessage(self, data):
        self.transport.write(data)


クライアントでもサーバとほとんど同じようにdataRecievedによって,リモートのサーバから受信したデータを非同期処理します.サーバでは,受信したデータをそのまま送信しなおしていましたが,クライアントでは受信したデータを標準出力に出力するようにしています.

クライアントからのデータは,sendMessageによってリモートのサーバに送信します.

sendMessageでは,プロトコルオブジェクトにセットされているtransportオブジェクトに対してwriteメソッドで書き込みを行っています.

clientFactory


リモートのサーバに接続するときは,reactorのconnectTCP関数を使用します.connectTCP関数には,接続するリモートのホスト名,ポート番号とプロトコルオブジェクトを作成するためのClientFactoryオブジェクトを指定します.

ClientFactoryは,サーバの構築で説明したプロトコルオブジェクトのインスタンス化を行います.これ以外に,接続状態に関するイベントを受け取って処理します.接続状態を管理したい場合は,ClientFactoryクラスのサブクラスを作成します.

次にEchoクライアントのClientFactoryの例を示します.プロトコルには,前述のEchoClientを使用しています.
from twisted.internet.protocol import ClientFactory
from twisted.internet import defer

class EchoClientFactory(ClientProtocol):
    def __init__(self, deferred):
        self.deferred = deferred
       
    def startedConnecting(self, connector):
        print "接続開始"
   
    def buildProtocol(self, addr):
        print "接続完了"
        p = self.protocol()
        p.factory = this
        reactor.callLater(0, self.deferred.callback, p)
        return p
     
    def clientConnectionLost(self, connector, reason):
        print "接続終了: ", reason
       
    def clientConnectionFailed(self, connector, reason):
        reactor.callLater(0, self.deferred.errback, reason)
        print "接続失敗: ", reason


ClientFactoryでは,接続が開始,完了,終了,失敗の各状態に応じて,startedConnecting,buildProtocol,clientConnectionLost,clientConnectionFailedがそれぞれコールされます.接続が完了したときにコールされるbuildProtocoは,接続が完了したと言う状態の通知以外にもプロトコルオブジェクトのインスタンス化を行う役割があります.

次に実際にサーバに接続するためのコードを書く必要があります.
from twisted.internet import reactor

def gotProtocol(protocol):
    p.sendMessage("Hello, world\n")
    reactor.callLater(1, p.sendMessage, "Hello, world, again\n")
    reactor.callLater(2, p.transport.loseConnection)

deferred = defer.Deferred()
clientFactory = EchoClientFactory(deferred)
clientFactory.protocol = EchoClient
reactor.connectTCP("localhost", 1234, clientFactory)
deferred.addCallback(gotProtocol)

reactor.run()


ここでは,EchoClientFactoryを作成して,使用するプロトコルを指定しています.次にreactor.connectTCPでリモートのサーバに接続しています.connectTCPはブロックせずに処理をすぐに戻します.接続が確立するとEchoClientFacoryに指定したdeferredオブジェクトのコールバック関数gotProtocolがコールされます.gotProtocolの中でサーバに対して「Hello, world」というメッセージを送信し,その1秒後に「Hello, world, again」を送信しています.最後に接続が確立してから2秒後に接続を終了するようになっています.

クライアントの再接続


クライアントのネットワークの接続が切断されることはよくあります.たとえば,無線LANの環境で,ネットワーク状態が不安定で接続が途切れるなどです.Webページを取得するような接続と切断を繰り返すようなアプリケーションの場合は問題になりません.サーバと常時接続するクライアントでは接続が途切れた場合は,サーバに再接続する必要があります.クライアント同士が接続しあうP2Pアプリケーションでも,再接続の処理が必要です.

再接続する方法はいくつかあります.最も簡単な方法は,connector.connect()をコールすることです.ClientFactoryでは,接続が切断したときにclientConnectionLost関数がコールされます.この関数には二つの引数を受け取ります.再接続処理を行うのは,第2引数のconnectorオブジェクトです.connectorオブジェクトは,既にインスタンス化されているProtocolオブジェクトと接続情報(IPアドレスやポート番号など)との橋渡しをします.
from twisted.internet.protocol import ClientFactory

class MyClientFactory(ClientFactory):
    def clientConnectionLost(self, connector, reason):
        connector.connect()


twisted.internet.protocol.ReconnectingClientFactoryクラスを継承したFactoryクラスを作成することで,再接続を自動化することができます.ReconnectingClientFactoryを使用することで,再接続する間隔,再接続する最大試行回数を制御することができます.次のコードはReconnectingClientFactoryを使ったEchoFactoryクラスです.初期化関数の中で最大試行回数を3回に設定します(1). コードの(2)では,接続が完了したときに,試行回数などのフラグをリセットして初期状態に戻します.(3)で,接続が失敗したときの試行回数を確認して,試行回数が最大試行回数に達していたら終了します.再接続回数に達していない場合は,ReconnectingClientFactoryクラスのclientConnectionFailed関数をコールして,再接続します(4).
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import defer

class EchoClientFactory(ReconnectingClientProtocol):
    def __init__(self, deferred):
        self.deferred = deferred
        self.maxRetries = 3  # 最大試行回数を指定(1)
       
    def startedConnecting(self, connector):
        print "接続開始"
   
    def buildProtocol(self, addr):
        print "接続完了"
        p = self.protocol()
        p.factory = this
        reactor.callLater(0, self.deferred.callback, p)
        self.resetDelay() # 接続が完了したら、試行回数をリセット(2)
        return p
     
    def clientConnectionFailed(self, connector, reason):
        # 再接続回数が最大試行回数に達していたら終了(3)
        if self.retires == self.maxRetries:
            reactor.callLater(0, self.deferred.errback, reason)
            return
       
        print "接続失敗: ", reason
        # 最大試行回数に達していない場合は、ReconnectingClientProtocolの
        # clientConnectionFailedメソッドをコール(4)
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
                                                         reason)



再接続を行う場合,タイミングが重要になります.サーバの接続数が限界に達している状態ですぐに再接続を要求しても接続することができません.また,無制限に連続して再接続を行うことはDoS攻撃になります.ReconnectingClientFactoryでは,再接続のタイミングは計算されています.タイミングをプログラマが制御することもできますが,ネットワークに負荷がかかる危険があるので,細心の注意が必要です.特に理由がない限りは,再接続のタイミングはTwistedに任せたほうが安全です.

Comments