Observer

@observer(signal, **kwargs)[source]

Note

Should be used as a method decorator eg: @observer(user_logged_in)

The wrapped method will be called once for each consumer that has subscribed.

class AdminPortalLoginConsumer(AsyncAPIConsumer):
     async def accept(self, **kwargs):
         await self.handle_user_logged_in.subscribe()
         await super().accept()

     @observer(user_logged_in)
     async def handle_user_logged_in(self, message, observer=None, **kwargs):
         await self.send_json(message)

If the signal you are using supports filtering with args or kwargs these can be passed to the @observer(signal, args..).

Parameters:

signal (Signal) –

@model_observer(model, **kwargs)[source]

Note

Should be used as a method decorator eg: @model_observer(BlogPost)

The resulted wrapped method body becomes the handler that is called on each subscribed consumer. The method itself is replaced with an instance of djangochannelsrestframework.observer.model_observer.ModelObserver

# consumers.py

from djangochannelsrestframework.consumers import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer import model_observer
from djangochannelsrestframework.decorators import action

from .serializers import UserSerializer, CommentSerializer
from .models import User, Comment

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.send_json({"message": message, "request_id": request_id})

    @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs):
        return CommentSerializer(instance).data

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)

If you only need to use a regular Django Rest Framework Serializer class then there is a shorthand:

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment, serializer_class=CommentSerializer)
    async def comment_activity(self, message, action, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.reply(data=message, action=action, request_id=request_id)

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)
Parameters:

model (Type[Model]) –

class BaseObserver(func, partition='*')[source]

This is the Base Observer class that Observer and ModelObserver inherit from.

The decorators @model_observer and @observer replaced the wrapped method with an instance of these classes. You can then access the methods of this class using the method name that you wrapped.

Parameters:

partition (str) –

groups_for_consumer(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.groups_for_consumer

The decorated method is used when subscribe() and unsubscribe() are called to enumerate the corresponding groups to un/subscribe to.

The args and kwargs providing to subscribe() and unsubscribe() are passed here to enable this.

@classroom_change_handler.groups_for_consumer
def classroom_change_handler(self, school=None, classroom=None, **kwargs):
    # This is called when you subscribe/unsubscribe
    if school is not None:
        yield f'-school__{school.pk}'
    if classroom is not None:
        yield f'-pk__{classroom.pk}'

@action()
async def subscribe_to_classrooms_in_school(self, school_pk, request_id, **kwargs):
    # check user has permission to do this
    await self.classroom_change_handler.subscribe(school=school, request_id=request_id)

@action()
async def subscribe_to_classroom(self, classroom_pk, request_id, **kwargs):
    # check user has permission to do this
    await self.classroom_change_handler.subscribe(classroom=classroom, request_id=request_id)

It is important that a corresponding groups_for_signal() method is provided that enumerates the groups that each event is sent to.

Parameters:

func (Callable[[BaseObserver, AsyncAPIConsumer], Generator[str, None, None]]) –

groups_for_signal(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.groups_for_signal

The decorated method is used whenever an event happens that the observer is observing (even if nothing is subscribed).

The role of this method is to enumerate the groups that the event should be sent over.

@classroom_change_handler.groups_for_signal
def classroom_change_handler(self, instance: models.Classroom, **kwargs):
    yield f'-school__{instance.school_id}'
    yield f'-pk__{instance.pk}'

It is important that a corresponding groups_for_consumer() method is provided to enable the consumers to correctly select which groups to subscribe to.

Parameters:

func (Callable[[...], Generator[str, None, None]]) –

serializer(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.serializer

The method that this wraps is evaluated just after the observer is triggered before the result is sent over the channel layer. That means you DO NOT have access to user or other request information.

The result of this method is what is sent over the channel layer. If you need to modify that with user specific information then you need to do that in the observer handler method.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs):
        return CommentSerializer(instance).data

The advantage of doing serialization at this point is that it happens only once even if 1000s of consumers are subscribed to the event.

async subscribe(consumer, *args, request_id=None, **kwargs)[source]

This should be called to subscribe the current consumer.

args and kwargs passed here are provided to the groups_for_consumer() method to enable custom partitioning of events.

If the request_id is passed to the subscribe method then the observer will track that request id and provide it to the handling method when an event happens.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)
Parameters:

consumer (AsyncAPIConsumer) –

Return type:

Iterable[str]

async unsubscribe(consumer, *args, request_id=None, **kwargs)[source]

This should be called to unsubscribe the current consumer.

args and kwargs passed here are provided to the groups_for_consumer() method to enable custom partitioning of events.

If the request_id is passed to the un-subscribe method then this will un-subscribe the requests with the same id that called the subscribe() method. If no request_id is provided then all subscribed requests for this consumer are un-subscribed.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @action()
    async def unsubscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.unsubscribe(request_id=request_id)
Parameters:

consumer (AsyncAPIConsumer) –

Return type:

Iterable[str]