import io # fixme: remove explicit io imports after going py3k-only import os import shutil from django.core.files import File from django.utils import timezone from django.utils.crypto import get_random_string from django.utils import six from misago.core.utils import slugify FILENAME_MAX_LEN = 50 class DataArchive(object): def __init__(self, user, working_dir_path): self.user = user self.working_dir_path = working_dir_path self.tmp_dir_path = None self.data_dir_path = None self.file_path = None self.file = None def __enter__(self): self.tmp_dir_path = self.create_tmp_dir() self.data_dir_path = self.create_data_dir() return self def __exit__(self, *args): self.delete_file() self.delete_tmp_dir() def create_tmp_dir(self): tmp_dir_name = get_tmp_filename(self.user) tmp_dir_path = os.path.join(self.working_dir_path, tmp_dir_name) os.mkdir(tmp_dir_path) return tmp_dir_path def create_data_dir(self): data_dir_name = get_tmp_filename(self.user) data_dir_path = os.path.join(self.tmp_dir_path, data_dir_name) os.mkdir(data_dir_path) return data_dir_path def delete_tmp_dir(self): if self.tmp_dir_path: shutil.rmtree(self.tmp_dir_path) self.tmp_dir_path = None self.data_dir_path = None def get_file(self): file_name = get_tmp_filename(self.user) file_path = os.path.join(self.working_dir_path, file_name) self.file_path = shutil.make_archive(file_path, 'zip', self.tmp_dir_path) self.file = open(self.file_path, 'rb') return File(self.file) def delete_file(self): if self.file: self.file.close() self.file = None if self.file_path: os.remove(self.file_path) self.file_path = None def add_text(self, name, value, date=None, directory=None): clean_filename = slugify(str(name)) file_dir_path = self.make_final_path(date=date, directory=directory) file_path = os.path.join(file_dir_path, '{}.txt'.format(clean_filename)) with io.open(file_path, 'w', encoding='utf-8') as fp: fp.write(six.text_type(value)) return file_path def add_dict(self, name, value, date=None, directory=None): text_lines = [] for key, value in value.items(): text_lines.append("{}: {}".format(key, value)) text = '\n'.join(text_lines) return self.add_text(name, text, date=date, directory=directory) def add_model_file(self, model_file, prefix=None, date=None, directory=None): if not model_file: return None target_dir_path = self.make_final_path(date=date, directory=directory) filename = os.path.basename(model_file.name) if prefix: prefixed_filename = "{}-{}".format(prefix, filename) clean_filename = trim_long_filename(prefixed_filename) target_path = os.path.join(target_dir_path, clean_filename) else: clean_filename = trim_long_filename(filename) target_path = os.path.join(target_dir_path, clean_filename) with open(target_path, 'wb') as fp: for chunk in model_file.chunks(): fp.write(chunk) return target_path def make_final_path(self, date=None, directory=None): # fixme: os.path.isdir test can be avoided in py37k if date and directory: raise ValueError("date and directory arguments are mutually exclusive") data_dir_path = self.data_dir_path if date: final_path = data_dir_path path_items = [date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')] for path_item in path_items: final_path = os.path.join(final_path, six.text_type(path_item)) if not os.path.isdir(final_path): os.mkdir(final_path) return final_path if directory: final_path = os.path.join(data_dir_path, six.text_type(directory)) if not os.path.isdir(final_path): os.mkdir(final_path) return final_path return data_dir_path def get_tmp_filename(user): filename_bits = [ user.slug, timezone.now().strftime('%Y%m%d-%H%M%S'), get_random_string(6), ] return '-'.join(filename_bits) def trim_long_filename(filename): # fixme: consider moving this utility to better place? # eg. to trim too long attachment filenames on upload if len(filename) < FILENAME_MAX_LEN: return filename name, extension = os.path.splitext(filename) name_len = FILENAME_MAX_LEN - len(extension) return '{}{}'.format(name[:name_len], extension)