Saturday, August 14, 2010

Python example on atomic files

This is a very simple example on how to write atomically to a file in Python. We already dealt with the subject here.

The code here essentially works only if you plan to rewrite the file from scratch. Append and similar stuff is not supported. Reading is not supported (as it does not make sense, since it has no troubles with letting the filesystem dirty). Notice that this code does not work on windows with the intended semantics, since rename is not guaranteed to be atomic (as far as I know).

'''Simple module containing a file-like object whose behavior is
to actually save the file only when closed. Before that, data is
stored in a temporary file: thus when creating a file, if another file
with the same name exists that is never substituted with a partial
file.'''

import os
import tempfile

class AtomicFile(object):
    '''A file-like object where writes are committed only when closed.

    A temporary named file is used in order to store the results.
    On close, the temporary is renamed according to the name parameter.

    AtomicFiles are meant to be used with context managers.'''

    # Unfortunately the TemporaryFile interface uses a dir parameter
    # pylint: disable-msg=R0913
    # pylint: disable-msg=W0622
    def __init__(self, name, bufsize=-1, suffix='', prefix='', dir=None):
        mode = 'r+b'
        self.name = name
        self.tempfile = tempfile.NamedTemporaryFile(
            mode=mode, bufsize=bufsize, suffix=suffix, prefix=prefix,
            dir=dir, delete=False)

    def __enter__(self):
        return self

    def __exit__(self, _exc_type, _exc_val, _exc_tb):
        self.swap()
        return False

    def swap(self):
        'Explicitly closes and renames the temporary file.'
        self.tempfile.close()
        os.rename(self.tempfile.name, self.name)
    close = swap


    def write(self, what):
        'Writes a string to the file'
        self.tempfile.write(what)

And now a bit of unit-testing (and example usage):

import unittest
import tempfile
import atomic_file

import os
from os import path

class TestAtomicFile(unittest.TestCase):
    def setUp(self):
        self.directory = path.dirname(__file__)
        self.name = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.name)
        except OSError:
            pass

    def testCreate(self):
        af = atomic_file.AtomicFile(name=self.name, dir=self.directory)
        self.assert_(not path.exists(self.name))
        self.assert_(path.exists(path.join(self.directory,
                                           af.tempfile.name)))
        af.swap()
        self.assert_(path.exists(self.name))
        self.assert_(not path.exists(path.join(self.directory,
                                           af.tempfile.name)))
    def testClose(self):
        af = atomic_file.AtomicFile(name=self.name, dir=self.directory)
        self.assert_(not path.exists(self.name))
        self.assert_(path.exists(path.join(self.directory,
                                           af.tempfile.name)))
        af.close()
        self.assert_(path.exists(self.name))
        self.assert_(not path.exists(path.join(self.directory,
                                           af.tempfile.name)))

    def testContext(self):
        with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
            self.assert_(not path.exists(self.name))
            self.assert_(path.exists(path.join(self.directory,                                               af.tempfile.name)))
        self.assert_(path.exists(self.name))
        self.assert_(not path.exists(path.join(self.directory,
                                           af.tempfile.name)))

    def testWrite(self):
        with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
            text = 'THE TEXT\n'
            af.write(text)
        self.assertEqual(file(self.name).read(), text)

    def testMoreWrite(self):
        with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
            lines = ['THE TEXT', 'MORE TEXT', 'AGAIN!']
            for line in lines:
                print >> af, line
        self.assertEqual(file(self.name).read(), '\n'.join(lines) + '\n')

    def hasExplosion(self):
        with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
            raise RuntimeError()
        self.assert_(not path.exists(self.name))
        self.assert_(not path.exists(path.join(self.directory,
                                               af.tempfile.name)))
    def testBoom(self):
        self.assertRaises(RuntimeError, self.hasExplosion)

No comments: