dataarchive.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import io # fixme: remove explicit io imports after going py3k-only
  2. import os
  3. import shutil
  4. from django.core.files import File
  5. from django.utils import timezone
  6. from django.utils.crypto import get_random_string
  7. from django.utils import six
  8. from misago.core.utils import slugify
  9. FILENAME_MAX_LEN = 50
  10. class DataArchive(object):
  11. def __init__(self, user, working_dir_path):
  12. self.user = user
  13. self.working_dir_path = working_dir_path
  14. self.tmp_dir_path = None
  15. self.data_dir_path = None
  16. self.file_path = None
  17. self.file = None
  18. def __enter__(self):
  19. self.tmp_dir_path = self.create_tmp_dir()
  20. self.data_dir_path = self.create_data_dir()
  21. return self
  22. def __exit__(self, *args):
  23. self.delete_file()
  24. self.delete_tmp_dir()
  25. def create_tmp_dir(self):
  26. tmp_dir_name = get_tmp_filename(self.user)
  27. tmp_dir_path = os.path.join(self.working_dir_path, tmp_dir_name)
  28. os.mkdir(tmp_dir_path)
  29. return tmp_dir_path
  30. def create_data_dir(self):
  31. data_dir_name = get_tmp_filename(self.user)
  32. data_dir_path = os.path.join(self.tmp_dir_path, data_dir_name)
  33. os.mkdir(data_dir_path)
  34. return data_dir_path
  35. def delete_tmp_dir(self):
  36. if self.tmp_dir_path:
  37. shutil.rmtree(self.tmp_dir_path)
  38. self.tmp_dir_path = None
  39. self.data_dir_path = None
  40. def get_file(self):
  41. file_name = get_tmp_filename(self.user)
  42. file_path = os.path.join(self.working_dir_path, file_name)
  43. self.file_path = shutil.make_archive(file_path, 'zip', self.tmp_dir_path)
  44. self.file = open(self.file_path, 'rb')
  45. return File(self.file)
  46. def delete_file(self):
  47. if self.file:
  48. self.file.close()
  49. self.file = None
  50. if self.file_path:
  51. os.remove(self.file_path)
  52. self.file_path = None
  53. def add_text(self, name, value, date=None, directory=None):
  54. clean_filename = slugify(str(name))
  55. file_dir_path = self.make_final_path(date=date, directory=directory)
  56. file_path = os.path.join(file_dir_path, '{}.txt'.format(clean_filename))
  57. with io.open(file_path, 'w', encoding='utf-8') as fp:
  58. fp.write(six.text_type(value))
  59. return file_path
  60. def add_dict(self, name, value, date=None, directory=None):
  61. text_lines = []
  62. for key, value in value.items():
  63. text_lines.append("{}: {}".format(key, value))
  64. text = '\n'.join(text_lines)
  65. return self.add_text(name, text, date=date, directory=directory)
  66. def add_model_file(self, model_file, prefix=None, date=None, directory=None):
  67. if not model_file:
  68. return None
  69. target_dir_path = self.make_final_path(date=date, directory=directory)
  70. filename = os.path.basename(model_file.name)
  71. if prefix:
  72. prefixed_filename = "{}-{}".format(prefix, filename)
  73. clean_filename = trim_long_filename(prefixed_filename)
  74. target_path = os.path.join(target_dir_path, clean_filename)
  75. else:
  76. clean_filename = trim_long_filename(filename)
  77. target_path = os.path.join(target_dir_path, clean_filename)
  78. with open(target_path, 'wb') as fp:
  79. for chunk in model_file.chunks():
  80. fp.write(chunk)
  81. return target_path
  82. def make_final_path(self, date=None, directory=None):
  83. # fixme: os.path.isdir test can be avoided in py37k
  84. if date and directory:
  85. raise ValueError("date and directory arguments are mutually exclusive")
  86. data_dir_path = self.data_dir_path
  87. if date:
  88. final_path = data_dir_path
  89. path_items = [date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]
  90. for path_item in path_items:
  91. final_path = os.path.join(final_path, six.text_type(path_item))
  92. if not os.path.isdir(final_path):
  93. os.mkdir(final_path)
  94. return final_path
  95. if directory:
  96. final_path = os.path.join(data_dir_path, six.text_type(directory))
  97. if not os.path.isdir(final_path):
  98. os.mkdir(final_path)
  99. return final_path
  100. return data_dir_path
  101. def get_tmp_filename(user):
  102. filename_bits = [
  103. user.slug,
  104. timezone.now().strftime('%Y%m%d-%H%M%S'),
  105. get_random_string(6),
  106. ]
  107. return '-'.join(filename_bits)
  108. def trim_long_filename(filename):
  109. # fixme: consider moving this utility to better place?
  110. # eg. to trim too long attachment filenames on upload
  111. if len(filename) < FILENAME_MAX_LEN:
  112. return filename
  113. name, extension = os.path.splitext(filename)
  114. name_len = FILENAME_MAX_LEN - len(extension)
  115. return '{}{}'.format(name[:name_len], extension)