The code here essentially works only if you plan to rewrite the file from scratch. Append and similar stuff is not supported. Reading is not supported (as it does not make sense, since it has no troubles with letting the filesystem dirty). Notice that this code does not work on windows with the intended semantics, since rename is not guaranteed to be atomic (as far as I know).
'''Simple module containing a file-like object whose behavior is
to actually save the file only when closed. Before that, data is
stored in a temporary file: thus when creating a file, if another file
with the same name exists that is never substituted with a partial
file.'''
import os
import tempfile
class AtomicFile(object):
'''A file-like object where writes are committed only when closed.
A temporary named file is used in order to store the results.
On close, the temporary is renamed according to the name parameter.
AtomicFiles are meant to be used with context managers.'''
# Unfortunately the TemporaryFile interface uses a dir parameter
# pylint: disable-msg=R0913
# pylint: disable-msg=W0622
def __init__(self, name, bufsize=-1, suffix='', prefix='', dir=None):
mode = 'r+b'
self.name = name
self.tempfile = tempfile.NamedTemporaryFile(
mode=mode, bufsize=bufsize, suffix=suffix, prefix=prefix,
dir=dir, delete=False)
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb):
self.swap()
return False
def swap(self):
'Explicitly closes and renames the temporary file.'
self.tempfile.close()
os.rename(self.tempfile.name, self.name)
close = swap
def write(self, what):
'Writes a string to the file'
self.tempfile.write(what)
And now a bit of unit-testing (and example usage):
import unittest
import tempfile
import atomic_file
import os
from os import path
class TestAtomicFile(unittest.TestCase):
def setUp(self):
self.directory = path.dirname(__file__)
self.name = tempfile.mktemp()
def tearDown(self):
try:
os.remove(self.name)
except OSError:
pass
def testCreate(self):
af = atomic_file.AtomicFile(name=self.name, dir=self.directory)
self.assert_(not path.exists(self.name))
self.assert_(path.exists(path.join(self.directory,
af.tempfile.name)))
af.swap()
self.assert_(path.exists(self.name))
self.assert_(not path.exists(path.join(self.directory,
af.tempfile.name)))
def testClose(self):
af = atomic_file.AtomicFile(name=self.name, dir=self.directory)
self.assert_(not path.exists(self.name))
self.assert_(path.exists(path.join(self.directory,
af.tempfile.name)))
af.close()
self.assert_(path.exists(self.name))
self.assert_(not path.exists(path.join(self.directory,
af.tempfile.name)))
def testContext(self):
with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
self.assert_(not path.exists(self.name))
self.assert_(path.exists(path.join(self.directory, af.tempfile.name)))
self.assert_(path.exists(self.name))
self.assert_(not path.exists(path.join(self.directory,
af.tempfile.name)))
def testWrite(self):
with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
text = 'THE TEXT\n'
af.write(text)
self.assertEqual(file(self.name).read(), text)
def testMoreWrite(self):
with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
lines = ['THE TEXT', 'MORE TEXT', 'AGAIN!']
for line in lines:
print >> af, line
self.assertEqual(file(self.name).read(), '\n'.join(lines) + '\n')
def hasExplosion(self):
with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af:
raise RuntimeError()
self.assert_(not path.exists(self.name))
self.assert_(not path.exists(path.join(self.directory,
af.tempfile.name)))
def testBoom(self):
self.assertRaises(RuntimeError, self.hasExplosion)
No comments:
Post a Comment