2009年12月16日水曜日

zope.sqlalchemyの紹介

中西さんよりZope/Ploneアドベントカレンダーのバトンを頂戴したので、Zope/Ploneネタを書いて16日目を担当したいと思います。

普段の仕事ではZope/PloneよりもSQLAlchemyなどを使いつつRDBMSを使用するようなフレームワークを使うことの方が多いです。
どんなネタにしようかとすこし悩みましたが、Zope/PloneからSQLAlchemy越しにRDBMSにアクセスする方法について調べてみました。

Zopeは自前のオブジェクトデータベースZODBを持っているのでZope/PloneからRDBMSを使用する機会はあまりありませんが、既存のデータベースとやりとりが必要になることは、たまにあります。
ZopeからRDBMSを操作する方法としては、
を使用するが昔からあって、
select * from products where id=<dtml-sqlvar type="string">
このような感じで使用するようですが、ちょっと古い感じがしないでもないです。

現在PythonでSQLとなれば、
The Python SQL Toolkit and Object Relational Mapperであるところの
SQLAlchemyを使うのが一般的です。
ということで、zope.sqlalchemyを使う方法を調べてみました。
zope.sqlalchemyは、"Minimal Zope/SQLAlchemy transaction integration"ということで
ZopeのトランザクションとSQLAlchemyのトランザクションを統合してくれる
ツールのようで、その際、きちんと二相コミットもしてくれるようです。
zope.sqlalchemyの守備範囲は、トランザクションに関してだけで、データベースエンジンやデータモデルの記述については特別なサポートはありません。
つまり、SQLAlchemyの作法に則って、DBエンジンやモデルについての記述を行うことはできるけれども、ZMIからデータベースエンジンの設定を行ったり、zcmlで定義したり、Zopeのオブジェクトモデルとマッピングしたり、といった機能はなく、あくまでトランザクションの統合機能だけが提供されます。
>>> from sqlalchemy import create_engine
>>> engine = create_engine("mysql://hoge", convert_unicode=True)
>>> from sqlalchemy.orm import scoped_session, sessionmaker]
>>> from zope.sqlalchemy import ZopeTransactionExtension
>>> Session = scoped_session(sessionmaker(bind=engine,
... twophase=True, extension=ZopeTransactionExtension()))
>>> session = Session()
>>> session.add(Hoge(name='fuga'))
>>> session.query(Hoge).all()
[]
>>> import transaction
>>> transaction.commit()

とこんな感じの使い方になるようです。
(詳しくはzope.sqlalchemyを参照してください。)
ポイントは、
sessionmaker(bind=engine,
twophase=True, extension=ZopeTransactionExtension())

の部分と
>>> import transaction
>>> transaction.commit()

のところです。


コードの説明に入るまえに、そもそもなぜトランザクションの統合が必要になるか、ということについて簡単に説明します。

Wikipediaのトランザクションの項によると、
"トランザクション (transaction) とは、分ける事の出来ない一連の情報処理の単位である"とあります。
つまりひとつのトランザクションに含まれる処理であれば、全部が成功もしくは全部が失敗のどちらかになるべきで、一部が失敗して一部が成功した、といった状態になったら困るわけです。これを実現するために、ひとつの
トランザクション処理の途中で、なんらかの不都合が発生した場合、そこまでの処理をすべてなかったことにするロールバックや、すべての処理が成功した段階で、変更結果を永続化させるコミットの機能がZODBやRDBMSにより提供されます。

ZopeのZODBはトランザクションをサポートしており、作法に則ったやり方でZODBにアクセスすれば、処理の途中で例外が発生した場合ロールバックされ、一貫性が保たれます。
しかし、そこでロールバックされるのは当然ZODBに対する書き込み操作のみで、もし直接外部のRDBMSに対して書き込みを行っていた場合、その処理まで自動でロールバックされるわけではありません。すると、Zope/Ploneとしては処理が途中で失敗したのでロールバックを行い、ZODBの中身的には変更が無かったことになっているのに、外部のRDBMS には処理が成功したかのように変更が書き込まれてしまう、といった結果になる可能性があります。多くの場合において、これは望んだ結果ではないでしょう。
これを防ぐには、ZODBとRDBMSに対するふたつのトランザクションを統合し、ZODBに対する処理とRDBMSに対する処理が、両方成功するか、両方失敗するか、どちらかであるようにする必要があります。

これを実現する中核となるのが、zope.sqlalchemy.ZopeTransactionExtensionです。これはSQLAlchemyの
extending-sessionという機能を利用したもので、
sqlalchemy.orm.interfaces.SessionExtensionを継承(or実装。ここらへんはPythonなので微妙なところですね。)したクラスです。このクラスを使用することにより、zope.sqlalchemyはセッションの開始や、セッションの中でオブジェクトの追加や変更、削除などをフックすることができます。
でもって、セッションの開始でTwoPhaseSessionDataManagerインスタンスを作成し、zope_transaction.get().join(TwoPhaseSessionDataManager(session, state))
のような感じでzopeのトランザクションに結びつけます。
そして、SQLAlchemyのセッションでオブジェクトの変更などをフックして、状態を初期状態のSTATUS_ACTIVEからSTATUS_CHANGEDに変更します。
そんでもって
>>> import transaction
>>> transaction.commit()

によりzopeのトランザクションがコミットされた場合は、TwoPhaseSessionDataManagerのメソッドが呼び出され、
もし状態がSTATUS_CHANGEDになっていれば、二相コミットを行います。



STATUS_ACTIVE/STATUS_CHANGEDと、2相コミットに関して補足します。

2相コミットとは、複数のデータベースが存在する時に、各トランザクションを統合して、全体として成功/失敗のどちらかにするための技術です。ちょっと説明が適当なので、
詳しくは
Wikipedia:"2相コミット"を参照してください。
例えば、データベースAとBがあった場合、よくありそうな問題としては、全部の処理がうまくいったので、
まずAにコミットして成功し、つぎにBに対してコミットを行おうとしたら、コミットに失敗してしまった、という状況です。
これを防ぐために2相コミットでは、トランザクションのコミットしようとするとき、
参加しているデータベースに対して、コミットが可能であるか、まず全員に質問します。ここで、全員から同意が得られれば、全体をあらためてコミットします。逆に、どれかひとつでも同意が得られなかった場合は、全体をロールバックさせます。この2相コミットを使用すれば、DBMSが落ちる、DBMSとのネットワークが切れるなどの事態が肝心なタイミングで発生しなければ、全体の一貫性を保つことができます。
この2相コミットは、それなりに面倒な処理です。RDBMSの方に書き込みを行わなかった場合、そもそも2相コミットを行う必要がありません。このためにzope.sqlalchemyでは、SQLAlchemyのセッションに変更操作を行ったかどうかを、STATUS_ACTIVE/STATUS_CHANGEDという状態により管理し、zopeのtransaction.commit()時にSTATUS_ACTIVEのままであれば2相コミットを省略するような動きになっているようです。
STATUS_ACTIVE/STATUS_CHANGEDの状態は、sqlalchemy.orm.interfaces.SessionExtensionのafter_bulk_updateメソッドなどが呼び出されるタイミングで、STATUS_CHANGEDに変更されます。
このフックメソッドは、次のようにORMを通さないでSQLが発行された場合は呼び出されません。
(よびだしてくれればいいような気がしますが。)

>>> conn = session.connection()
>>> conn.execute(update_expression)

この問題を防ぐため、このような場合には、つぎのように明示的にSTATUS_CHANGEDをセットします。
>>> from zope.sqlalchemy import mark_changed
>>> mark_changed(session)

<追記 2009-12-18>
sessionmakerの作成の際に、twophase=Falseにしておけば、2相コミットをせずに
動いてくれます。当然不整合が起こる可能性は高くなりますが。
あと、twophase=Trueの場合は、当然ですがRDBMS側でも2相コミットが可能な設定になっている必要があります。

ということで、ということで、次のバトンは昔からブログでお世話になっているPythonの大先輩のnakagamiさんにお願いします。

0 件のコメント:

コメントを投稿