Commit 7cf43b49 authored by Taddeus Kroes's avatar Taddeus Kroes

Added (untested) functions for cache headers.

parent ee28c5c8
import web
from time import gmtime
from datetime import datetime, timedelta
from os import mkdir
from os.path import getmtime, exists
from cPickle import load, dump
ADMINISTRATION_FILE = 'administration.py'
def to_dir(path):
......@@ -15,29 +23,41 @@ def to_dir(path):
return path
def assert_file_exists(path):
if not exists(path):
raise IOError('File "%s" does not exist.' % path)
def seconds_to_datetime(seconds):
return datetime(gmtime(seconds)[:7])
class Cache:
"""
A Cache instance represents a single file in the cache directory, which is
a concatenation of the file list.
"""
def __init__(self, root='', files=[], cached='cached'):
def __init__(self, root='', files=[], cached='cached',
expires={'days': 365}):
"""
The constructor takes the following arguments (all are optional):
1. The directory in which the cached files are located (empty by
default).
2. An initial list of number of files to include in the cahce file.
3. The cached files directory (defaults to 'cached').
4. A dictionary containing arguments to the timedelta constructor, that
indicates how long the cache object sould live.
"""
self.root = to_dir(root)
self.cached = to_dir(cached)
self.files = map(lambda f: self.root + f, files)
self.expires = expires
self.assert_files_exist()
def assert_files_exist(self):
for path in self.files:
if not exists(path):
raise IOError('File "%s" does not exist.' % path)
map(assert_file_exists, self.files)
def add(self, path, absolute=False):
"""
......@@ -48,6 +68,7 @@ class Cache:
if not absolute:
path = self.root + path
assert_file_exists(path)
self.files.append(path)
def remove(self, path, absolute=False):
......@@ -61,11 +82,87 @@ class Cache:
self.files.remove(path)
def get_modification_dates(self):
def assert_modification_dates_exist(self):
"""
Get the latest modification dates fo each file in the cache object.
"""
if hasattr(self, 'modified'):
return
self.modified = {}
for path in self.files:
self.modified[path] = getmtime(path)
def assert_cached_folder_exist(self):
"""
Assert that the folder for cached files is created.
"""
if not exists(self.cached):
mkdir(self.cached)
def save_administration(self):
"""
Generate a Python file containing the modification dates of the cached
file list.
"""
self.assert_modification_dates_exist()
self.assert_cached_folder_exist()
f = open(self.cached + ADMINISTRATION_FILE, 'w')
dump(self.modified, f)
f.close()
def last_modified(self):
self.assert_modification_dates_exist()
return self.modified[max(self.modified)]
def etag(self):
"""
Generate an Etag for the cache object, using the names of the files
included and the latest modification date.
"""
return self.filename() + str(self.last_modified())
def filename(self):
return ','.join(self.files)
def concatenate(self):
contents = ''
for path in self.files:
f = open(path, 'r')
contents += f.read()
f.close()
return contents
def output(self):
""""""
# Update cached file
last_modified = self.last_modified()
path = self.filename()
server_modified = not exists(path) or getmtime(path) < last_modified
if server_modified:
content = self.concatenate()
f = open(path, 'w')
f.write(content)
f.close()
#try:
web.http.modified(seconds_to_datetime(last_modified), self.etag())
web.http.expires(timedelta(**self.expires))
if not server_modified:
# Concatenated content has not been loaded yet, read the cached
# file
f = open(path, 'r')
content = f.read()
f.close()
return content
#except web.NotModified as e:
# web.header('Status', e.message)
import unittest
import os
from shutil import rmtree
from copy import copy
from cPickle import dumps
from src.cache import to_dir, Cache
from src.cache import ADMINISTRATION_FILE, assert_file_exists, to_dir, Cache
class TestCache(unittest.TestCase):
def setUp(self):
if os.path.exists('baz'):
rmtree('baz')
os.mkdir('baz')
for name in ('foo', 'bar'):
for name in ('foo', 'bar', 'baz'):
f = open('baz/' + name, 'w')
f.write(name)
f.close()
......@@ -19,6 +24,13 @@ class TestCache(unittest.TestCase):
def tearDown(self):
rmtree('baz')
if os.path.exists('cached'):
rmtree('cached')
def test_assert_file_exists(self):
assert_file_exists('baz/foo')
self.assertRaises(IOError, assert_file_exists, 'foobar')
def test_to_dir(self):
self.assertEqual(to_dir(''), '')
self.assertEqual(to_dir('foo'), 'foo/')
......@@ -61,8 +73,26 @@ class TestCache(unittest.TestCase):
self.c.remove('baz/foo', absolute=True)
self.assertNotIn('baz/foo', self.c.files)
def test_get_modification_dates(self):
self.c.get_modification_dates()
def test_assert_modification_dates_exist(self):
self.c.assert_modification_dates_exist()
self.assertIn('baz/foo', self.c.modified)
self.assertEqual(self.c.modified['baz/foo'],
os.path.getmtime('baz/foo'))
modified = copy(self.c.modified)
self.c.assert_modification_dates_exist()
self.assertEqual(self.c.modified, modified)
def test_assert_cached_folder_exist(self):
self.assertFalse(os.path.exists('cached'))
self.c.assert_cached_folder_exist()
self.assertTrue(os.path.exists('cached'))
def test_save_administration(self):
self.c.save_administration()
path = 'cached/' + ADMINISTRATION_FILE
self.assertTrue(os.path.exists(path))
f = open(path, 'r')
pickled = f.read()
f.close()
self.assertEqual(pickled, dumps(self.c.modified))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment