>>> from sqlalchemy import create_engine
>>> engine = create_engine("mysql://hoge", convert_unicode=True)
>>> from sqlalchemy.orm import scoped_session, sessionmaker]
>>> from zope.sqlalchemy import ZopeTransactionExtension
>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=True, extension=ZopeTransactionExtension()))
>>> session = Session()
>>> session.add(Hoge(name='fuga'))
>>> session.query(Hoge).all()
[]
>>> import transaction
>>> transaction.commit()
とこんな感じの使い方になるようです。
(詳しくはzope.sqlalchemyを参照してください。)
ポイントは、
sessionmaker(bind=engine,
twophase=True, extension=ZopeTransactionExtension())
の部分と
>>> import transaction
>>> transaction.commit()
のところです。
コードの説明に入るまえに、そもそもなぜトランザクションの統合が必要になるか、ということについて簡単に説明します。
Wikipediaのトランザクションの項によると、
"トランザクション (transaction) とは、分ける事の出来ない一連の情報処理の単位である"とあります。
つまりひとつのトランザクションに含まれる処理であれば、全部が成功もしくは全部が失敗のどちらかになるべきで、一部が失敗して一部が成功した、といった状態になったら困るわけです。これを実現するために、ひとつの
トランザクション処理の途中で、なんらかの不都合が発生した場合、そこまでの処理をすべてなかったことにするロールバックや、すべての処理が成功した段階で、変更結果を永続化させるコミットの機能がZODBやRDBMSにより提供されます。
ZopeのZODBはトランザクションをサポートしており、作法に則ったやり方でZODBにアクセスすれば、処理の途中で例外が発生した場合ロールバックされ、一貫性が保たれます。
しかし、そこでロールバックされるのは当然ZODBに対する書き込み操作のみで、もし直接外部のRDBMSに対して書き込みを行っていた場合、その処理まで自動でロールバックされるわけではありません。すると、Zope/Ploneとしては処理が途中で失敗したのでロールバックを行い、ZODBの中身的には変更が無かったことになっているのに、外部のRDBMS には処理が成功したかのように変更が書き込まれてしまう、といった結果になる可能性があります。多くの場合において、これは望んだ結果ではないでしょう。
これを防ぐには、ZODBとRDBMSに対するふたつのトランザクションを統合し、ZODBに対する処理とRDBMSに対する処理が、両方成功するか、両方失敗するか、どちらかであるようにする必要があります。
これを実現する中核となるのが、zope.sqlalchemy.ZopeTransactionExtensionです。これはSQLAlchemyの
extending-sessionという機能を利用したもので、
sqlalchemy.orm.interfaces.SessionExtensionを継承(or実装。ここらへんはPythonなので微妙なところですね。)したクラスです。このクラスを使用することにより、zope.sqlalchemyはセッションの開始や、セッションの中でオブジェクトの追加や変更、削除などをフックすることができます。
でもって、セッションの開始でTwoPhaseSessionDataManagerインスタンスを作成し、zope_transaction.get().join(TwoPhaseSessionDataManager(session, state))
のような感じでzopeのトランザクションに結びつけます。
そして、SQLAlchemyのセッションでオブジェクトの変更などをフックして、状態を初期状態のSTATUS_ACTIVEからSTATUS_CHANGEDに変更します。
そんでもって
>>> import transaction
>>> transaction.commit()
によりzopeのトランザクションがコミットされた場合は、TwoPhaseSessionDataManagerのメソッドが呼び出され、
もし状態がSTATUS_CHANGEDになっていれば、二相コミットを行います。
STATUS_ACTIVE/STATUS_CHANGEDと、2相コミットに関して補足します。
2相コミットとは、複数のデータベースが存在する時に、各トランザクションを統合して、全体として成功/失敗のどちらかにするための技術です。ちょっと説明が適当なので、
詳しくは
Wikipedia:"2相コミット"を参照してください。
例えば、データベースAとBがあった場合、よくありそうな問題としては、全部の処理がうまくいったので、
まずAにコミットして成功し、つぎにBに対してコミットを行おうとしたら、コミットに失敗してしまった、という状況です。
これを防ぐために2相コミットでは、トランザクションのコミットしようとするとき、
参加しているデータベースに対して、コミットが可能であるか、まず全員に質問します。ここで、全員から同意が得られれば、全体をあらためてコミットします。逆に、どれかひとつでも同意が得られなかった場合は、全体をロールバックさせます。この2相コミットを使用すれば、DBMSが落ちる、DBMSとのネットワークが切れるなどの事態が肝心なタイミングで発生しなければ、全体の一貫性を保つことができます。
この2相コミットは、それなりに面倒な処理です。RDBMSの方に書き込みを行わなかった場合、そもそも2相コミットを行う必要がありません。このためにzope.sqlalchemyでは、SQLAlchemyのセッションに変更操作を行ったかどうかを、STATUS_ACTIVE/STATUS_CHANGEDという状態により管理し、zopeのtransaction.commit()時にSTATUS_ACTIVEのままであれば2相コミットを省略するような動きになっているようです。
STATUS_ACTIVE/STATUS_CHANGEDの状態は、sqlalchemy.orm.interfaces.SessionExtensionのafter_bulk_updateメソッドなどが呼び出されるタイミングで、STATUS_CHANGEDに変更されます。
このフックメソッドは、次のようにORMを通さないでSQLが発行された場合は呼び出されません。
(よびだしてくれればいいような気がしますが。)
>>> conn = session.connection()
>>> conn.execute(update_expression)
この問題を防ぐため、このような場合には、つぎのように明示的にSTATUS_CHANGEDをセットします。
>>> from zope.sqlalchemy import mark_changed
>>> mark_changed(session)
<追記 2009-12-18>
ということで、ということで、次のバトンは昔からブログでお世話になっているPythonの大先輩のnakagamiさんにお願いします。