from functools import partial
from typing import Type, Optional
from django.db.models import Model
from django.dispatch import Signal
from rest_framework.serializers import Serializer
from djangochannelsrestframework.observer.observer import Observer
from djangochannelsrestframework.observer.model_observer import ModelObserver
[docs]
def observer(signal: Signal, **kwargs):
"""
.. note::
Should be used as a method decorator eg: `@observer(user_logged_in)`
The wrapped method will be called once for each consumer that has subscribed.
.. code-block:: python
class AdminPortalLoginConsumer(AsyncAPIConsumer):
async def accept(self, **kwargs):
await self.handle_user_logged_in.subscribe()
await super().accept()
@observer(user_logged_in)
async def handle_user_logged_in(self, message, observer=None, **kwargs):
await self.send_json(message)
If the signal you are using supports filtering with `args` or `kwargs` these can be passed
to the `@observer(signal, args..)`.
"""
return partial(Observer, signal=signal, kwargs=kwargs)
[docs]
def model_observer(
model: Type[Model],
serializer_class: Optional[Type[Serializer]] = None,
many_to_many: bool = False,
**kwargs
):
"""
.. note::
Should be used as a method decorator eg: `@model_observer(BlogPost)`
The resulted wrapped method body becomes the handler that is called on each subscribed consumer.
The method itself is replaced with an instance of :class:`~djangochannelsrestframework.observer.model_observer.ModelObserver`
Args:
model (Type[Model]): The django model class to observe.
serializer_class (Type[Serializer] | None): Django rest-framework serializer class to use.
many_to_many (bool): Should the observer track many-to-many relationships.
.. code-block:: python
# consumers.py
from djangochannelsrestframework.consumers import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer import model_observer
from djangochannelsrestframework.decorators import action
from .serializers import UserSerializer, CommentSerializer
from .models import User, Comment
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comment)
async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
for request_id in subscribing_request_ids:
await self.send_json({"message": message, "request_id": request_id})
@comment_activity.serializer
def comment_activity(self, instance: Comment, action, **kwargs):
return CommentSerializer(instance).data
@action()
async def subscribe_to_comment_activity(self, request_id, **kwargs):
await self.comment_activity.subscribe(request_id=request_id)
If you only need to use a regular Django Rest Framework Serializer class then there is a shorthand:
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comment, serializer_class=CommentSerializer)
async def comment_activity(self, message, action, subscribing_request_ids=[], **kwargs):
for request_id in subscribing_request_ids:
await self.reply(data=message, action=action, request_id=request_id)
@action()
async def subscribe_to_comment_activity(self, request_id, **kwargs):
await self.comment_activity.subscribe(request_id=request_id)
If you want to include updates on many-to-many relationships you can also pass:
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comment, many_to_many=True)
async def comment_activity(self, message, action, subscribing_request_ids=[], **kwargs):
for request_id in subscribing_request_ids:
await self.reply(data=message, action=action, request_id=request_id)
@action()
async def subscribe_to_comment_activity(self, request_id, **kwargs):
await self.comment_activity.subscribe(request_id=request_id)
You can also use ``@model_observer`` to subscribe to a collection of models by configuring the group names used.
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comment)
async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
for request_id in subscribing_request_ids:
await self.send_json({"message": message, "request_id": request_id})
@comment_activity.groups_for_signal
def comment_activity(self, instance, **kwargs):
yield f'comment__{instance.user_id}'
@comment_activity.groups_for_consumer
def comment_activity(self, user_pk, **kwargs):
if user_pk:
yield f'comment__{user_pk}'
@action()
async def subscribe_to_comment_activity(self, request_id, user_pk, **kwargs):
await self.comment_activity.subscribe(request_id=request_id, user_pk=user_pk)
Here the ``groups_for_signal`` method is called whenever a comment is updated/created/deleted to figure out which
groups to send a message to.
The ``groups_for_consumer`` method is used when subscribing to determine the groups to subscribe to.
"""
return partial(
ModelObserver,
model_cls=model,
serializer_class=serializer_class,
many_to_many=many_to_many,
**kwargs
)