Defining Tasks¶
django-ray uses Django 6's native Tasks framework. Tasks are defined using the @task decorator and enqueued using .enqueue().
Basic Task Definition¶
# myapp/tasks.py
from django.tasks import task
@task(queue_name="default")
def send_email(to: str, subject: str, body: str) -> dict:
"""Send an email."""
# Your email sending logic here
return {"sent_to": to, "subject": subject}
Task Decorator Options¶
@task(
queue_name="default", # Queue to use
priority=0, # Higher = processed first
)
def my_task():
pass
Enqueueing Tasks¶
Basic Enqueueing¶
from myapp.tasks import send_email
# Enqueue with positional args
result = send_email.enqueue("user@example.com", "Hello", "World")
# Enqueue with keyword args
result = send_email.enqueue(
to="user@example.com",
subject="Hello",
body="World"
)
# Get the task ID
task_id = result.id
Queue Selection¶
# Use a specific queue
result = send_email.using(queue_name="high-priority").enqueue(
to="vip@example.com",
subject="Urgent",
body="Important message"
)
Checking Task Status¶
from django.tasks import TaskResultStatus
# Get task result
result = send_email.enqueue(to="user@example.com", subject="Hi", body="Hello")
# Check status
print(result.status) # TaskResultStatus.READY, RUNNING, SUCCESSFUL, FAILED
# Wait for completion and get result (blocking)
if result.status == TaskResultStatus.SUCCESSFUL:
return_value = result.return_value
Task Arguments¶
JSON-Serializable Arguments¶
Task arguments must be JSON-serializable:
@task(queue_name="default")
def process_data(
user_id: int,
items: list[str],
options: dict[str, Any],
) -> dict:
return {"processed": len(items)}
# Valid
process_data.enqueue(
user_id=123,
items=["a", "b", "c"],
options={"verbose": True}
)
What's Serializable¶
✅ Supported: - Strings, integers, floats, booleans - Lists and tuples (become lists) - Dictionaries with string keys - None - Nested combinations of the above
❌ Not Supported:
- Django model instances (pass IDs instead)
- Datetime objects (pass ISO strings or timestamps)
- Custom classes (unless they have __json__ method)
- Functions, lambdas
Working with Models¶
# ❌ Don't pass model instances
@task(queue_name="default")
def bad_task(user: User): # Won't work!
pass
# ✅ Pass IDs and fetch in the task
@task(queue_name="default")
def good_task(user_id: int) -> dict:
from myapp.models import User
user = User.objects.get(id=user_id)
return {"email": user.email}
Return Values¶
Task return values are stored in the database and must be JSON-serializable:
@task(queue_name="default")
def analyze_data(data_id: int) -> dict:
# Process data...
return {
"status": "completed",
"records_processed": 1000,
"summary": {"mean": 42.5, "std": 3.2}
}
Large Results¶
For large results, consider storing them externally:
@task(queue_name="default")
def generate_report(report_id: int) -> dict:
# Generate large report...
report_data = generate_large_report()
# Store in S3/external storage
s3_key = f"reports/{report_id}.json"
upload_to_s3(s3_key, report_data)
# Return reference only
return {"s3_key": s3_key, "size_bytes": len(report_data)}
Error Handling¶
Exceptions in Tasks¶
Exceptions are captured and stored:
@task(queue_name="default")
def risky_task(value: int) -> int:
if value < 0:
raise ValueError("Value must be positive")
return value * 2
The exception type, message, and traceback are stored in RayTaskExecution.
Custom Error Handling¶
@task(queue_name="default")
def task_with_cleanup(resource_id: int) -> dict:
resource = acquire_resource(resource_id)
try:
result = process(resource)
return {"success": True, "result": result}
except Exception as e:
# Log error, cleanup, etc.
logger.error(f"Task failed: {e}")
raise # Re-raise to mark task as failed
finally:
release_resource(resource)
Task Patterns¶
Batch Processing¶
@task(queue_name="default")
def process_batch(item_ids: list[int]) -> dict:
results = []
for item_id in item_ids:
result = process_item(item_id)
results.append(result)
return {"processed": len(results), "results": results}
Chained Tasks¶
@task(queue_name="default")
def step_one(data: dict) -> dict:
result = transform(data)
# Enqueue next step
step_two.enqueue(result=result)
return {"status": "step_one_complete"}
@task(queue_name="default")
def step_two(result: dict) -> dict:
final = finalize(result)
return {"status": "complete", "final": final}
Fan-Out Pattern¶
@task(queue_name="default")
def coordinator(items: list[int]) -> dict:
# Enqueue individual tasks
task_ids = []
for item in items:
result = process_item.enqueue(item_id=item)
task_ids.append(result.id)
return {"spawned_tasks": task_ids}
@task(queue_name="default")
def process_item(item_id: int) -> dict:
# Process single item
return {"item_id": item_id, "processed": True}
Using Distributed Computing¶
In local or cluster mode, use Ray's distributed computing:
from django.tasks import task
from django_ray.runtime.distributed import parallel_map
@task(queue_name="default")
def parallel_process(item_ids: list[int]) -> list[dict]:
"""Process items in parallel across Ray cluster."""
def process_one(item_id: int) -> dict:
# This runs on Ray workers
result = heavy_computation(item_id)
return {"id": item_id, "result": result}
# Automatically parallelized
return parallel_map(process_one, item_ids)
See Also¶
- Queues - Queue configuration and priorities
- Retry & Error Handling - Configuring retries
- Worker Modes - Execution modes