dataarchive.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import os
  2. import shutil
  3. from django.core.files import File
  4. from django.utils import timezone
  5. from django.utils.crypto import get_random_string
  6. from django.utils import six
  7. from misago.core.utils import slugify
  8. FILENAME_MAX_LEN = 50
  9. class DataArchive(object):
  10. def __init__(self, user, working_dir_path):
  11. self.user = user
  12. self.working_dir_path = working_dir_path
  13. self.tmp_dir_path = None
  14. self.data_dir_path = None
  15. self.file_path = None
  16. self.file = None
  17. def __enter__(self):
  18. self.tmp_dir_path = self.create_tmp_dir()
  19. self.data_dir_path = self.create_data_dir()
  20. return self
  21. def __exit__(self, *args):
  22. self.delete_file()
  23. self.delete_tmp_dir()
  24. def create_tmp_dir(self):
  25. tmp_dir_name = get_tmp_filename(self.user)
  26. tmp_dir_path = os.path.join(self.working_dir_path, tmp_dir_name)
  27. os.mkdir(tmp_dir_path)
  28. return tmp_dir_path
  29. def create_data_dir(self):
  30. data_dir_name = get_tmp_filename(self.user)
  31. data_dir_path = os.path.join(self.tmp_dir_path, data_dir_name)
  32. os.mkdir(data_dir_path)
  33. return data_dir_path
  34. def delete_tmp_dir(self):
  35. if self.tmp_dir_path:
  36. shutil.rmtree(self.tmp_dir_path)
  37. self.tmp_dir_path = None
  38. self.data_dir_path = None
  39. def get_file(self):
  40. file_name = get_tmp_filename(self.user)
  41. file_path = os.path.join(self.working_dir_path, file_name)
  42. self.file_path = shutil.make_archive(file_path, 'zip', self.tmp_dir_path)
  43. self.file = open(self.file_path, 'rb')
  44. return File(self.file)
  45. def delete_file(self):
  46. if self.file:
  47. self.file.close()
  48. self.file = None
  49. if self.file_path:
  50. os.remove(self.file_path)
  51. self.file_path = None
  52. def add_text(self, name, value, date=None, directory=None):
  53. clean_filename = slugify(str(name))
  54. file_dir_path = self.make_final_path(date=date, directory=directory)
  55. file_path = os.path.join(file_dir_path, '{}.txt'.format(clean_filename))
  56. with open(file_path, 'w+') as fp:
  57. fp.write(six.text_type(value))
  58. return file_path
  59. def add_dict(self, name, value, date=None, directory=None):
  60. text_lines = []
  61. for key, value in value.items():
  62. text_lines.append(u"{}: {}".format(key, value))
  63. text = u'\n'.join(text_lines)
  64. return self.add_text(name, text, date=date, directory=directory)
  65. def add_model_file(self, model_file, prefix=None, date=None, directory=None):
  66. if not model_file:
  67. return None
  68. target_dir_path = self.make_final_path(date=date, directory=directory)
  69. filename = os.path.basename(model_file.name)
  70. if prefix:
  71. prefixed_filename = u"{}-{}".format(prefix, filename)
  72. clean_filename = trim_long_filename(prefixed_filename)
  73. target_path = os.path.join(target_dir_path, clean_filename)
  74. else:
  75. clean_filename = trim_long_filename(filename)
  76. target_path = os.path.join(target_dir_path, clean_filename)
  77. with open(target_path, 'wb') as fp:
  78. for chunk in model_file.chunks():
  79. fp.write(chunk)
  80. return target_path
  81. def make_final_path(self, date=None, directory=None):
  82. # fixme: os.path.isdir test can be avoided in py37k
  83. if date and directory:
  84. raise ValueError("date and directory arguments are mutually exclusive")
  85. data_dir_path = self.data_dir_path
  86. if date:
  87. final_path = data_dir_path
  88. path_items = [date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]
  89. for path_item in path_items:
  90. final_path = os.path.join(final_path, six.text_type(path_item))
  91. if not os.path.isdir(final_path):
  92. os.mkdir(final_path)
  93. return final_path
  94. if directory:
  95. final_path = os.path.join(data_dir_path, six.text_type(directory))
  96. if not os.path.isdir(final_path):
  97. os.mkdir(final_path)
  98. return final_path
  99. return data_dir_path
  100. def get_tmp_filename(user):
  101. filename_bits = [
  102. user.slug,
  103. timezone.now().strftime('%Y%m%d-%H%M%S'),
  104. get_random_string(6),
  105. ]
  106. return '-'.join(filename_bits)
  107. def trim_long_filename(filename):
  108. # fixme: consider moving this utility to better place?
  109. # eg. to trim too long attachment filenames on upload
  110. if len(filename) < FILENAME_MAX_LEN:
  111. return filename
  112. name, extension = os.path.splitext(filename)
  113. name_len = FILENAME_MAX_LEN - len(extension)
  114. return u'{}{}'.format(name[:name_len], extension)