Observer

@observer(signal, **kwargs)[source]

Note

Should be used as a method decorator eg: @observer(user_logged_in)

The wrapped method will be called once for each consumer that has subscribed.

class AdminPortalLoginConsumer(AsyncAPIConsumer):
     async def accept(self, **kwargs):
         await self.handle_user_logged_in.subscribe()
         await super().accept()

     @observer(user_logged_in)
     async def handle_user_logged_in(self, message, observer=None, **kwargs):
         await self.send_json(message)

If the signal you are using supports filtering with args or kwargs these can be passed to the @observer(signal, args..).

Parameters:

signal (Signal)

@model_observer(model, serializer_class=None, many_to_many=False, **kwargs)[source]

Note

Should be used as a method decorator eg: @model_observer(BlogPost)

The resulted wrapped method body becomes the handler that is called on each subscribed consumer. The method itself is replaced with an instance of ModelObserver

Parameters:
  • model (Type[Model]) – The django model class to observe.

  • serializer_class (Type[Serializer] | None) – Django rest-framework serializer class to use.

  • many_to_many (bool) – Should the observer track many-to-many relationships.

# consumers.py

from djangochannelsrestframework.consumers import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer import model_observer
from djangochannelsrestframework.decorators import action

from .serializers import UserSerializer, CommentSerializer
from .models import User, Comment

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.send_json({"message": message, "request_id": request_id})

    @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs):
        return CommentSerializer(instance).data

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)

If you only need to use a regular Django Rest Framework Serializer class then there is a shorthand:

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment, serializer_class=CommentSerializer)
    async def comment_activity(self, message, action, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.reply(data=message, action=action, request_id=request_id)

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)

If you want to include updates on many-to-many relationships you can also pass:

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment, many_to_many=True)
    async def comment_activity(self, message, action, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.reply(data=message, action=action, request_id=request_id)

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)

You can also use @model_observer to subscribe to a collection of models by configuring the group names used.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        for request_id in subscribing_request_ids:
            await self.send_json({"message": message, "request_id": request_id})

    @comment_activity.groups_for_signal
    def comment_activity(self, instance, **kwargs):
        yield f'comment__{instance.user_id}'

    @comment_activity.groups_for_consumer
    def comment_activity(self, user_pk, **kwargs):
        if user_pk:
            yield f'comment__{user_pk}'

    @action()
    async def subscribe_to_comment_activity(self, request_id, user_pk, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id, user_pk=user_pk)

Here the groups_for_signal method is called whenever a comment is updated/created/deleted to figure out which groups to send a message to.

The groups_for_consumer method is used when subscribing to determine the groups to subscribe to.

class BaseObserver(func, partition='*')[source]

This is the Base Observer class that Observer and ModelObserver inherit from.

The decorators @model_observer and @observer replaced the wrapped method with an instance of these classes. You can then access the methods of this class using the method name that you wrapped.

Parameters:

partition (str)

groups_for_consumer(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.groups_for_consumer

The decorated method is used when subscribe() and unsubscribe() are called to enumerate the corresponding groups to un/subscribe to.

The args and kwargs providing to subscribe() and unsubscribe() are passed here to enable this.

@classroom_change_handler.groups_for_consumer
def classroom_change_handler(self, school=None, classroom=None, **kwargs):
    # This is called when you subscribe/unsubscribe
    if school is not None:
        yield f'-school__{school.pk}'
    if classroom is not None:
        yield f'-pk__{classroom.pk}'

@action()
async def subscribe_to_classrooms_in_school(self, school_pk, request_id, **kwargs):
    # check user has permission to do this
    await self.classroom_change_handler.subscribe(school=school, request_id=request_id)

@action()
async def subscribe_to_classroom(self, classroom_pk, request_id, **kwargs):
    # check user has permission to do this
    await self.classroom_change_handler.subscribe(classroom=classroom, request_id=request_id)

It is important that a corresponding groups_for_signal() method is provided that enumerates the groups that each event is sent to.

Parameters:

func (Callable[[BaseObserver, AsyncAPIConsumer], Generator[str, None, None]])

groups_for_signal(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.groups_for_signal

The decorated method is used whenever an event happens that the observer is observing (even if nothing is subscribed).

The role of this method is to enumerate the groups that the event should be sent over.

@classroom_change_handler.groups_for_signal
def classroom_change_handler(self, instance: models.Classroom, **kwargs):
    yield f'-school__{instance.school_id}'
    yield f'-pk__{instance.pk}'

It is important that a corresponding groups_for_consumer() method is provided to enable the consumers to correctly select which groups to subscribe to.

Parameters:

func (Callable[[...], Generator[str, None, None]])

serializer(func)[source]

Note

Should be used as a method decorator eg: @observed_handler.serializer

The method that this wraps is evaluated just after the observer is triggered before the result is sent over the channel layer. That means you DO NOT have access to user or other request information.

The result of this method is what is sent over the channel layer. If you need to modify that with user specific information then you need to do that in the observer handler method.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs):
        return CommentSerializer(instance).data

The advantage of doing serialization at this point is that it happens only once even if 1000s of consumers are subscribed to the event. The disadvantage is this will always be called on every model update/create/delete even if there are no active subscribers.

async subscribe(consumer, *args, request_id=None, **kwargs)[source]

This should be called to subscribe the current consumer.

args and kwargs passed here are provided to the groups_for_consumer() method to enable custom partitioning of events.

If the request_id is passed to the subscribe method then the observer will track that request id and provide it to the handling method when an event happens.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @action()
    async def subscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.subscribe(request_id=request_id)
Parameters:

consumer (AsyncAPIConsumer)

Return type:

Iterable[str]

async unsubscribe(consumer, *args, request_id=None, **kwargs)[source]

This should be called to unsubscribe the current consumer.

args and kwargs passed here are provided to the groups_for_consumer() method to enable custom partitioning of events.

If the request_id is passed to the un-subscribe method then this will un-subscribe the requests with the same id that called the subscribe() method. If no request_id is provided then all subscribed requests for this consumer are un-subscribed.

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
        ...

    @action()
    async def unsubscribe_to_comment_activity(self, request_id, **kwargs):
        await self.comment_activity.unsubscribe(request_id=request_id)
Parameters:
Return type:

Iterable[str]

class ObserverModelInstanceMixin(*args, **kwargs)[source]

Use this as a mixing with GenericAsyncAPIConsumer.

You can also set the observer_many_to_many_relationships = True class property to ensure many-to-many relationship changes are tracked by the subscription.

# consumers.py
from djangochannelsrestframework.consumers import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer.generics import ObserverModelInstanceMixin

from .serializers import UserSerializer
from .models import User

class MyConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer
    observer_many_to_many_relationships = True
async handle_observed_action(action, group=None, **kwargs)[source]

run the action.

Parameters:
  • action (str)

  • group (str | None)

async subscribe_instance(request_id=None, **kwargs)[source]

Subscribes the current consumer to updates for a specific model instance.

This method retrieves the model instance based on the provided lookup parameters (kwargs), then subscribes the consumer to receive real-time updates related to that instance. The subscription is identified by a request_id, which must be provided.

Parameters:
  • request_id (str) – A unique identifier for the subscription request.

  • **kwargs – Lookup parameters used to retrieve the model instance.

Raises:

ValueError – If request_id is not provided.

async unsubscribe_instance(request_id=None, **kwargs)[source]

Unsubscribes the current consumer from updates for a specific model instance.

This method removes the consumer’s subscription to real-time updates for the given model instance. If a request_id is provided, only that specific subscription is removed. Otherwise, all subscriptions related to the instance are unsubscribed.

Parameters:
  • request_id (str, optional) – A unique identifier for the subscription request.

  • **kwargs – Lookup parameters used to retrieve the model instance.