Django는 높은 생산성과 인기 있는 웹 프레임워크로서, 강력한 기능을 제공합니다. 하지만 복잡한 비즈니스 로직이나 대량의 데이터 처리 작업을 수행할 때는 일반적인 웹 요청 흐름의 외부에서 비동기 처리를 시행할 필요가 있습니다. 이 과정에서 많은 개발자들이 선호하는 도구가 바로 Celery입니다. 이 글에서는 Django에서 Celery를 설정하고 사용하는 방법에 대해 자세히 설명하고, 실습 예제를 통해 비동기 작업 처리의 실제적인 적용 방법을 알아보겠습니다.
1. Celery란?
Celery는 Python으로 작성된 비동기 작업 큐 시스템입니다. 작업(queue) 및 작업자(worker) 개념을 바탕으로 하여, 분산 환경에서 태스크를 비동기적으로 처리할 수 있습니다. 이를 통해 웹 애플리케이션에서 사용자 요청에 대한 응답 속도를 개선하고, 시간 소모적인 작업을 별개로 처리할 수 있습니다.
1.1 주요 특징
- 비동기 처리: Celery를 사용하여 웹 요청과는 별도로 작업을 실행하여 응답 속도를 개선할 수 있습니다.
- 분산 시스템 지원: 여러 대의 서버에서 작업을 분산 처리하여 확장성 및 성능을 높일 수 있습니다.
- 입력 및 결과 저장: 작업의 결과를 데이터베이스나 파일 등 외부 시스템에 저장할 수 있습니다.
- 스케줄러 기능: 주기적으로 반복되는 작업을 쉽게 관리할 수 있습니다.
- 다양한 브로커 지원: RabbitMQ, Redis 등 다양한 메시지 브로커와 연동하여 사용할 수 있습니다.
2. Django와 Celery 통합하기
2.1 Celery 설치
Celery를 사용하기 위해서는 먼저 프로젝트에 설치해야 합니다. pip를 사용하여 Celery와 메시지 브로커(여기서는 Redis를 예시로 사용합니다)를 설치합니다. 다음 명령어를 사용하세요:
pip install celery redis
2.2 Django 프로젝트 설정
이제 Django 프로젝트를 설정해야 합니다. 새로운 Django 프로젝트를 생성합니다.
django-admin startproject myproject
그리고 생성된 디렉토리로 이동합니다.
cd myproject
2.3 Redis 설치 및 실행
Redis를 설치하는 방법은 운영 체제에 따라 다릅니다. 만약 Homebrew가 설치되어 있다면, 다음과 같은 명령으로 Redis를 설치하고 실행할 수 있습니다:
brew install redis
brew services start redis
2.4 Celery 설정 파일 생성
Django 프로젝트 디렉토리에 `celery.py`라는 파일을 생성하고 다음과 같이 설정합니다.
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
여기서 `namespace=’CELERY’`는 Django 설정 파일에서 Celery 관련 설정을 찾을 수 있도록 도와주며, `autodiscover_tasks`는 앱 내의 `tasks.py`에서 자동으로 작업을 발견하고 등록할 수 있게 합니다.
2.5 Django 설정 변경
이제 `settings.py` 파일을 열어 다음과 같이 Celery와 Redis를 설정합니다.
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
2.6 비동기 작업 생성
Celery를 사용할 준비가 되었습니다. 다음으로, 실제 비동기 작업을 생성해보겠습니다. 앱 내에 `tasks.py` 파일을 생성하고 다음과 같이 작성합니다.
from celery import shared_task
import time
@shared_task
def sleep_task(seconds):
time.sleep(seconds)
return f"Slept for {seconds} seconds!"
2.7 Django 뷰와 연결하기
이제 비동기 작업을 호출할 Django 뷰를 생성합니다. `views.py` 파일을 열고 다음과 같이 작성합니다.
from django.http import JsonResponse
from .tasks import sleep_task
def trigger_sleep_task(request, seconds):
sleep_task.delay(seconds) # 비동기적으로 작업 호출
return JsonResponse({"status": "Task has been started!"})
2.8 URL 설정
마지막으로, 생성한 뷰를 URL에 연결해야 합니다. `urls.py` 파일을 다음과 같이 수정합니다.
from django.urls import path
from .views import trigger_sleep_task
urlpatterns = [
path('sleep//', trigger_sleep_task, name='trigger_sleep_task'),
]
3. Celery 작업 실행하기
모든 설정이 완료되었으므로, 이제 서버를 실행하고 Celery 작업을 실행할 수 있습니다. 먼저 Django 서버를 실행합니다.
python manage.py runserver
그 다음, 새로운 터미널에서 Celery worker를 실행합니다:
celery -A myproject worker --loglevel=info
이제 브라우저를 열고 `http://localhost:8000/sleep/10/`에 접속하면, 10초 동안 대기하는 비동기 작업이 시작됩니다. 작업이 완료되면 로그에서 결과를 확인할 수 있습니다.
4. 비동기 작업의 응답 처리
Celery는 비동기 작업이 완료되었을 때 작업 결과를 처리할 수 있는 여러 방법을 제공합니다. 예를 들어, 작업 ID를 통해 결과를 조회할 수 있습니다. 이를 위해 Django에서 작업 ID를 반환하도록 뷰를 수정할 수 있습니다.
from django.http import JsonResponse
from .tasks import sleep_task
def trigger_sleep_task(request, seconds):
task = sleep_task.delay(seconds) # 비동기적으로 작업 호출
return JsonResponse({"status": "Task has been started!", "task_id": task.id})
4.1 작업 ID를 통한 상태 확인
작업 ID를 가진 결과를 확인하기 위해 다음과 같은 뷰를 작성할 수 있습니다.
from celery.result import AsyncResult
def check_task_status(request, task_id):
result = AsyncResult(task_id)
return JsonResponse({"task_id": task_id, "status": result.status, "result": result.result})
4.2 URL 추가
이제 작업 상태를 조회할 수 있도록 URL을 추가합니다.
urlpatterns = [
path('sleep//', trigger_sleep_task, name='trigger_sleep_task'),
path('check_task//', check_task_status, name='check_task_status'),
]
5. Celery Beat: 주기적인 작업 수행
Celery는 주기적으로 작업을 수행할 수 있는 스케줄러 기능인 Celery Beat를 제공합니. 이를 통해 특정 시간 간격으로 지속적으로 작업을 실행할 수 있습니다. 예를 들어, 매 10초마다 특정 함수를 호출하고 싶다면 아래와 같이 설정합니다.
from celery import shared_task
from celery.schedules import crontab
from celery import Celery
app = Celery('myproject')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, my_periodic_task.s(), name='Add every 10 seconds')
@shared_task
def my_periodic_task():
print("This task runs every 10 seconds.")
5.1 기동하기
Celery Beat를 실행하려면 다음 명령어를 사용합니다:
celery -A myproject beat
5.2 주기적인 작업 확인
작업의 로깅을 통해 주기적으로 실행되고 있는 작업들을 확인할 수 있습니다.
6. 결론
이번 포스트에서는 Django와 Celery를 통합하여 비동기 작업 처리 시스템을 구축하는 방법에 대해 살펴보았습니다. 비동기 처리 및 분산 시스템을 통해 웹 애플리케이션의 성능을 극대화할 수 있으며, 주기적인 작업을 통해 자동화된 시스템 구축 또한 가능해집니다. Celery는 많은 Python 프로젝트에서 널리 사용되고 있으며, 이를 통해 더 나은 사용자 경험과 효율적인 시스템 운영이 가능합니다.
이제 여러분은 Django에서 Celery를 설정하고 활용하는 방법을 알게 되었습니다. 비즈니스 요구에 따라 Celery를 적극적으로 사용해보시길 바랍니다!