Source code for djangochannelsrestframework.decorators

import asyncio
from functools import wraps, partial
from typing import Optional

from channels.db import database_sync_to_async
from django.conf import settings
from django.db import transaction

from djangochannelsrestframework.consumers import AsyncAPIConsumer


[docs]def action(atomic: Optional[bool] = None, detached: Optional[bool] = None, **kwargs): """ Mark a method as an action. .. note:: Should be used as a method decorator eg: `@action()` It can be used on both `async` and `sync` methods. .. code-block:: python from djangochannelsrestframework.decorators import action class MyConsumer(AsyncAPIConsumer): @action() async def delete_user(self, request_id, user_pk, **kwargs): ... Methods decorated with `@action()` will be called when a json message arrives from the client with a matching `action` name. The default way of sending a message to call an action is: .. code-block:: javascript { action: "delete_user", request_id: 42, user_pk: 82 } You can alter how :class:`AsyncAPIConsumer` matches the action using the :meth:`get_action_name` method. ---- When using on `async` methods you can provide an additional option `detached=True` so that the method runs detached from the main run-loop of the consumer, allowing other actions on the consumer to be called while this action runs. This can be useful if the action needs to make further long-running async operations such as upstream network requests. .. code-block:: python from djangochannelsrestframework.decorators import action class MyConsumer(AsyncAPIConsumer): @action(detached=true) async def check_homepage(self, request_id, user_pk, **kwargs): async with aiohttp.ClientSession() as session: async with session.get('http://python.org') as response: return dict(response.headers), response.status ---- When using on `sync` methods you can provide an additional option `atomic=True` to forcefully wrap the method in a transaction. The default value for atomic is determined by django's default db `ATOMIC_REQUESTS` setting. """ def decorator(func): _atomic = False _detached = False if atomic is not None: _atomic = atomic if detached is not None: _detached = detached func.action = True func.kwargs = kwargs if asyncio.iscoroutinefunction(func): if _atomic: raise ValueError("Only synchronous actions can be atomic") if detached: return __detached_action(func) else: return func elif detached: raise ValueError("Only asynchronous actions can be detached") # Read out default atomic state from DB connection if atomic is None: databases = getattr(settings, "DATABASES", {}) database = databases.get("default", {}) _atomic = database.get("ATOMIC_REQUESTS", False) if _atomic: # wrap function in atomic wrapper func = transaction.atomic(func) @wraps(func) async def async_f(self: AsyncAPIConsumer, *args, **_kwargs): response = await database_sync_to_async(func)(self, *args, **_kwargs) return response async_f.action = True async_f.kwargs = kwargs async_f.__name__ = func.__name__ return async_f return decorator
[docs]def detached(func): """ Sets a method to run detached from the consumers main run-loop. You should only do this for methods were you expect the runtime to be long (such as awaiting an upstream network request) and what to be able to handle other messages using the consumer while waiting. If you need a detached :func:`action` then you should use `@action(detached=True)` instead. .. note:: Should be used as a method decorator eg: `@detached` This can **only** be applied to async methods: .. code-block:: python from djangochannelsrestframework.decorators import detached class MyConsumer(AsyncAPIConsumer): @detached async def on_message(self, *args, **kwargs): ... Methods decorated with `@detached` are canceled when the websocket connection closes. """ @wraps(func) async def wrapped_method(self: AsyncAPIConsumer, *args, **kwargs): task = asyncio.create_task(func(self, *args, **kwargs)) task.add_done_callback( lambda t: asyncio.create_task(self.handle_detached_task_completion(t)) ) self.detached_tasks.append(task) return wrapped_method
def __detached_action(func): @wraps(func) async def wrapped_detached_method(self: AsyncAPIConsumer, *args, **kwargs): @wraps(func) async def wrapped_action(): try: response = await func(self, *args, **kwargs) if isinstance(response, tuple): data, status = response await self.reply( data=data, status=status, action=kwargs.get("action"), request_id=kwargs.get("request_id"), ) except asyncio.CancelledError: raise except Exception as e: await self.handle_exception( exc=e, action=kwargs.get("action"), request_id=kwargs.get("request_id"), ) task = asyncio.create_task(wrapped_action()) task.add_done_callback( lambda t: asyncio.create_task(self.handle_detached_task_completion(t)) ) self.detached_tasks.append(task) return wrapped_detached_method