The code here essentially works only if you plan to rewrite the file from scratch. Append and similar stuff is not supported. Reading is not supported (as it does not make sense, since it has no troubles with letting the filesystem dirty). Notice that this code does not work on windows with the intended semantics, since rename is not guaranteed to be atomic (as far as I know).
'''Simple module containing a file-like object whose behavior is to actually save the file only when closed. Before that, data is stored in a temporary file: thus when creating a file, if another file with the same name exists that is never substituted with a partial file.''' import os import tempfile class AtomicFile(object): '''A file-like object where writes are committed only when closed. A temporary named file is used in order to store the results. On close, the temporary is renamed according to the name parameter. AtomicFiles are meant to be used with context managers.''' # Unfortunately the TemporaryFile interface uses a dir parameter # pylint: disable-msg=R0913 # pylint: disable-msg=W0622 def __init__(self, name, bufsize=-1, suffix='', prefix='', dir=None): mode = 'r+b' self.name = name self.tempfile = tempfile.NamedTemporaryFile( mode=mode, bufsize=bufsize, suffix=suffix, prefix=prefix, dir=dir, delete=False) def __enter__(self): return self def __exit__(self, _exc_type, _exc_val, _exc_tb): self.swap() return False def swap(self): 'Explicitly closes and renames the temporary file.' self.tempfile.close() os.rename(self.tempfile.name, self.name) close = swap def write(self, what): 'Writes a string to the file' self.tempfile.write(what)
And now a bit of unit-testing (and example usage):
import unittest import tempfile import atomic_file import os from os import path class TestAtomicFile(unittest.TestCase): def setUp(self): self.directory = path.dirname(__file__) self.name = tempfile.mktemp() def tearDown(self): try: os.remove(self.name) except OSError: pass def testCreate(self): af = atomic_file.AtomicFile(name=self.name, dir=self.directory) self.assert_(not path.exists(self.name)) self.assert_(path.exists(path.join(self.directory, af.tempfile.name))) af.swap() self.assert_(path.exists(self.name)) self.assert_(not path.exists(path.join(self.directory, af.tempfile.name))) def testClose(self): af = atomic_file.AtomicFile(name=self.name, dir=self.directory) self.assert_(not path.exists(self.name)) self.assert_(path.exists(path.join(self.directory, af.tempfile.name))) af.close() self.assert_(path.exists(self.name)) self.assert_(not path.exists(path.join(self.directory, af.tempfile.name))) def testContext(self): with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af: self.assert_(not path.exists(self.name)) self.assert_(path.exists(path.join(self.directory, af.tempfile.name))) self.assert_(path.exists(self.name)) self.assert_(not path.exists(path.join(self.directory, af.tempfile.name))) def testWrite(self): with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af: text = 'THE TEXT\n' af.write(text) self.assertEqual(file(self.name).read(), text) def testMoreWrite(self): with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af: lines = ['THE TEXT', 'MORE TEXT', 'AGAIN!'] for line in lines: print >> af, line self.assertEqual(file(self.name).read(), '\n'.join(lines) + '\n') def hasExplosion(self): with atomic_file.AtomicFile(name=self.name, dir=self.directory) as af: raise RuntimeError() self.assert_(not path.exists(self.name)) self.assert_(not path.exists(path.join(self.directory, af.tempfile.name))) def testBoom(self): self.assertRaises(RuntimeError, self.hasExplosion)
No comments:
Post a Comment