123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- import os
- import shutil
- from django.core.files import File
- from django.utils import timezone
- from django.utils.crypto import get_random_string
- from misago.core.utils import slugify
- FILENAME_MAX_LEN = 50
- class DataArchive(object):
- def __init__(self, user, working_dir_path):
- self.user = user
- self.working_dir_path = working_dir_path
- self.tmp_dir_path = None
- self.data_dir_path = None
- self.file_path = None
- self.file = None
- def __enter__(self):
- self.tmp_dir_path = self.create_tmp_dir()
- self.data_dir_path = self.create_data_dir()
- return self
- def __exit__(self, *args):
- self.delete_file()
- self.delete_tmp_dir()
- def create_tmp_dir(self):
- tmp_dir_name = get_tmp_filename(self.user)
- tmp_dir_path = os.path.join(self.working_dir_path, tmp_dir_name)
- os.mkdir(tmp_dir_path)
- return tmp_dir_path
- def create_data_dir(self):
- data_dir_name = get_tmp_filename(self.user)
- data_dir_path = os.path.join(self.tmp_dir_path, data_dir_name)
- os.mkdir(data_dir_path)
- return data_dir_path
- def delete_tmp_dir(self):
- if self.tmp_dir_path:
- shutil.rmtree(self.tmp_dir_path)
- self.tmp_dir_path = None
- self.data_dir_path = None
-
- def get_file(self):
- file_name = get_tmp_filename(self.user)
- file_path = os.path.join(self.working_dir_path, file_name)
- self.file_path = shutil.make_archive(file_path, 'zip', self.tmp_dir_path)
- self.file = open(self.file_path, 'rb')
- return File(self.file)
- def delete_file(self):
- if self.file:
- self.file.close()
- self.file = None
- if self.file_path:
- os.remove(self.file_path)
- self.file_path = None
- def add_text(self, name, value, date=None, directory=None):
- clean_filename = slugify(str(name))
- file_dir_path = self.make_final_path(date=date, directory=directory)
- file_path = os.path.join(file_dir_path, '%s.txt' % clean_filename)
- with open(file_path, 'w') as fp:
- fp.write(str(value))
- return file_path
- def add_dict(self, name, value, date=None, directory=None):
- text_lines = []
- for key, value in value.items():
- text_lines.append("%s: %s" % (key, value))
- text = '\n'.join(text_lines)
- return self.add_text(name, text, date=date, directory=directory)
- def add_model_file(self, model_file, prefix=None, date=None, directory=None):
- if not model_file:
- return None
- target_dir_path = self.make_final_path(date=date, directory=directory)
- filename = os.path.basename(model_file.name)
- if prefix:
- prefixed_filename = "%s-%s" % (prefix, filename)
- clean_filename = trim_long_filename(prefixed_filename)
- target_path = os.path.join(target_dir_path, clean_filename)
- else:
- clean_filename = trim_long_filename(filename)
- target_path = os.path.join(target_dir_path, clean_filename)
- with open(target_path, 'wb') as fp:
- for chunk in model_file.chunks():
- fp.write(chunk)
- return target_path
- def make_final_path(self, date=None, directory=None):
- # fixme: os.path.isdir test can be avoided in py37k
- if date and directory:
- raise ValueError("date and directory arguments are mutually exclusive")
- data_dir_path = self.data_dir_path
- if date:
- final_path = data_dir_path
- path_items = [date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]
- for path_item in path_items:
- final_path = os.path.join(final_path, str(path_item))
- if not os.path.isdir(final_path):
- os.mkdir(final_path)
- return final_path
- if directory:
- final_path = os.path.join(data_dir_path, str(directory))
- if not os.path.isdir(final_path):
- os.mkdir(final_path)
- return final_path
- return data_dir_path
- def get_tmp_filename(user):
- filename_bits = [
- user.slug,
- timezone.now().strftime('%Y%m%d-%H%M%S'),
- get_random_string(6),
- ]
- return '-'.join(filename_bits)
- def trim_long_filename(filename):
- # fixme: consider moving this utility to better place?
- # eg. to trim too long attachment filenames on upload
- if len(filename) < FILENAME_MAX_LEN:
- return filename
- name, extension = os.path.splitext(filename)
- name_len = FILENAME_MAX_LEN - len(extension)
- return '%s%s' % (name[:name_len], extension)
|