dataarchive.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import os
  2. import shutil
  3. from django.core.files import File
  4. from django.utils import timezone
  5. from django.utils.crypto import get_random_string
  6. from ...core.utils import slugify
  7. FILENAME_MAX_LEN = 50
  8. class DataArchive(object):
  9. def __init__(self, user, working_dir_path):
  10. self.user = user
  11. self.working_dir_path = working_dir_path
  12. self.tmp_dir_path = None
  13. self.data_dir_path = None
  14. self.file_path = None
  15. self.file = None
  16. def __enter__(self):
  17. self.tmp_dir_path = self.create_tmp_dir()
  18. self.data_dir_path = self.create_data_dir()
  19. return self
  20. def __exit__(self, *args):
  21. self.delete_file()
  22. self.delete_tmp_dir()
  23. def create_tmp_dir(self):
  24. tmp_dir_name = get_tmp_filename(self.user)
  25. tmp_dir_path = os.path.join(self.working_dir_path, tmp_dir_name)
  26. os.mkdir(tmp_dir_path)
  27. return tmp_dir_path
  28. def create_data_dir(self):
  29. data_dir_name = get_tmp_filename(self.user)
  30. data_dir_path = os.path.join(self.tmp_dir_path, data_dir_name)
  31. os.mkdir(data_dir_path)
  32. return data_dir_path
  33. def delete_tmp_dir(self):
  34. if self.tmp_dir_path:
  35. shutil.rmtree(self.tmp_dir_path)
  36. self.tmp_dir_path = None
  37. self.data_dir_path = None
  38. def get_file(self):
  39. file_name = get_tmp_filename(self.user)
  40. file_path = os.path.join(self.working_dir_path, file_name)
  41. self.file_path = shutil.make_archive(file_path, "zip", self.tmp_dir_path)
  42. self.file = open(self.file_path, "rb")
  43. return File(self.file)
  44. def delete_file(self):
  45. if self.file:
  46. self.file.close()
  47. self.file = None
  48. if self.file_path:
  49. os.remove(self.file_path)
  50. self.file_path = None
  51. def add_text(self, name, value, date=None, directory=None):
  52. clean_filename = slugify(str(name))
  53. file_dir_path = self.make_final_path(date=date, directory=directory)
  54. file_path = os.path.join(file_dir_path, "%s.txt" % clean_filename)
  55. with open(file_path, "w") as fp:
  56. fp.write(str(value))
  57. return file_path
  58. def add_dict(self, name, value, date=None, directory=None):
  59. text_lines = []
  60. for key, value in value.items():
  61. text_lines.append("%s: %s" % (key, value))
  62. text = "\n".join(text_lines)
  63. return self.add_text(name, text, date=date, directory=directory)
  64. def add_model_file(self, model_file, prefix=None, date=None, directory=None):
  65. if not model_file:
  66. return None
  67. target_dir_path = self.make_final_path(date=date, directory=directory)
  68. filename = os.path.basename(model_file.name)
  69. if prefix:
  70. prefixed_filename = "%s-%s" % (prefix, filename)
  71. clean_filename = trim_long_filename(prefixed_filename)
  72. target_path = os.path.join(target_dir_path, clean_filename)
  73. else:
  74. clean_filename = trim_long_filename(filename)
  75. target_path = os.path.join(target_dir_path, clean_filename)
  76. with open(target_path, "wb") as fp:
  77. for chunk in model_file.chunks():
  78. fp.write(chunk)
  79. return target_path
  80. def make_final_path(self, date=None, directory=None):
  81. # fixme: os.path.isdir test can be avoided in py37k
  82. if date and directory:
  83. raise ValueError("date and directory arguments are mutually exclusive")
  84. data_dir_path = self.data_dir_path
  85. if date:
  86. final_path = data_dir_path
  87. path_items = [date.strftime("%Y"), date.strftime("%m"), date.strftime("%d")]
  88. for path_item in path_items:
  89. final_path = os.path.join(final_path, str(path_item))
  90. if not os.path.isdir(final_path):
  91. os.mkdir(final_path)
  92. return final_path
  93. if directory:
  94. final_path = os.path.join(data_dir_path, str(directory))
  95. if not os.path.isdir(final_path):
  96. os.mkdir(final_path)
  97. return final_path
  98. return data_dir_path
  99. def get_tmp_filename(user):
  100. filename_bits = [
  101. user.slug,
  102. timezone.now().strftime("%Y%m%d-%H%M%S"),
  103. get_random_string(6),
  104. ]
  105. return "-".join(filename_bits)
  106. def trim_long_filename(filename):
  107. # fixme: consider moving this utility to better place?
  108. # eg. to trim too long attachment filenames on upload
  109. if len(filename) < FILENAME_MAX_LEN:
  110. return filename
  111. name, extension = os.path.splitext(filename)
  112. name_len = FILENAME_MAX_LEN - len(extension)
  113. return "%s%s" % (name[:name_len], extension)