tasks.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import requests
  2. from celery import shared_task
  3. from requests.exceptions import RequestException
  4. from ..cache import clear_theme_cache
  5. from ..models import Theme, Css
  6. from .css import get_theme_media_map, rebuild_css
  7. @shared_task
  8. def update_remote_css_size(pk):
  9. try:
  10. css = Css.objects.get(pk=pk, url__isnull=False)
  11. except Css.DoesNotExist:
  12. pass
  13. else:
  14. css.size = get_remote_css_size(css.url)
  15. css.save(update_fields=["size"])
  16. def get_remote_css_size(url):
  17. try:
  18. response = requests.head(url)
  19. response.raise_for_status()
  20. except RequestException:
  21. return 0
  22. else:
  23. try:
  24. return int(response.headers.get("content-length", 0))
  25. except (TypeError, ValueError):
  26. return 0
  27. @shared_task
  28. def build_single_theme_css(pk):
  29. try:
  30. css = Css.objects.get(pk=pk, source_needs_building=True)
  31. except Css.DoesNotExist:
  32. pass
  33. else:
  34. media_map = get_theme_media_map(css.theme)
  35. rebuild_css(media_map, css)
  36. clear_theme_cache()
  37. @shared_task
  38. def build_theme_css(pk):
  39. try:
  40. theme = Theme.objects.get(pk=pk)
  41. except Theme.DoesNotExist:
  42. pass
  43. else:
  44. media_map = get_theme_media_map(theme)
  45. for css in theme.css.filter(source_needs_building=True):
  46. rebuild_css(media_map, css)
  47. clear_theme_cache()