Source code for djangochannelsrestframework.observer.base_observer

import hashlib
from copy import deepcopy
from typing import Any, Dict, Generator, Callable, Iterable

from djangochannelsrestframework.consumers import AsyncAPIConsumer
from djangochannelsrestframework.observer.utils import ObjPartial


[docs]class BaseObserver: """Base observer class""" def __init__(self, func, partition: str = "*"): self.func = func self._serializer = None self._group_names_for_signal = None self._group_names_for_consumer = None self._stable_observer_id = ( f"{partition}-" f"{self.__class__.__name__}-" f"{self.func.__module__}." f"{self.func.__name__}" ) async def __call__(self, message, consumer=None, **kwargs): message = deepcopy(message) message_body = message.pop("body", {}) message_type = message.pop("type") return await self.func( consumer, message_body, observer=self, message_type=message_type, **message, **kwargs, ) def __get__(self, parent, objtype): if parent is None: return self return ObjPartial(self, consumer=parent) def serialize(self, signal, *args, **kwargs) -> Dict[str, Any]: message_body = {} if self._serializer: message_body = self._serializer(self, signal, *args, **kwargs) message = dict(type=self.func.__name__.replace("_", "."), body=message_body) return message
[docs] def serializer(self, func): """Adds a Serializer to the model observer return. .. note:: This is meant to use as a decorator. Examples: TODO path to examples? .. code-block:: python # models.py from django.db import models from django.contrib.auth.models import AbstractUser class User(AbstractUser): pass class Comment(models.Model): text = models.TextField() user = models.ForeignKey(User, related_name="comments", on_delete=models.CASCADE) date = models.DatetimeField(auto_now_add=True) .. code-block:: python # serializers.py from rest_framework import serializers from .models import User, Comment class UserSerializer(serializers.ModelSerializer): class Meta: model = User fields = ["id", "username", "email"] class CommentSerializer(serializers.ModelSerializer): class Meta: model = Comment fields = ["id", "text", "user"] .. code-block:: python # consumers.py from djangochannelsrestframework.consumers import GenericAsyncAPIConsumer from djangochannelsrestframework.observer import model_observer from djangochannelsrestframework.decorators import action from .serializers import UserSerializer, CommentSerializer from .models import User, Comment class MyConsumer(GenericAsyncAPIConsumer): queryset = User.objects.all() serializer_class = UserSerializer @model_observer(Comments) async def comment_activity(self, message, observer=None, **kwargs): await self.send_json(message) @comment_activity.serializer def comment_activity(self, instance: Comment, action, **kwargs): return CommentSerializer(instance).data @action() async def subscribe_to_comment_activity(self, **kwargs): await self.comment_activity.subscribe() Now we will have a websocket client in javascript listening to the messages, after subscribing to the comment activity. This codeblock can be used it in the browser console. .. code-block:: javascript const ws = new WebSocket("ws://localhost:8000/ws/my-consumer/") const ws.onopen = function(){ ws.send(JSON.stringify({ action: "subscribe_to_comment_activity", request_id: new Date().getTime(), })) } const ws.onmessage = function(e){ console.log(e) } In the IPython shell we will create some comments for differnt users and in the browser console we will se the log. .. note:: At this point we should have some users in our database, otherwise create them >>> from my_app.models import User, Comment >>> user_1 = User.objects.get(pk=1) >>> user_2 = User.objects.get(pk=2) >>> Comment.objects.create(text="user 1 creates a new comment", user=user_1) In the consol log we will se something like this: .. code-block:: json { action: "subscribe_to_comment_activity", errors: [], response_status: 200, request_id: 15606042, data: { id: 1, text: "user 1 creates a new comment", user: 1, }, } Now we will create a comment with the user 2. >>> Comment.objects.create(text="user 2 creates a second comment", user=user_2) In the consol log we will se something like this: .. code-block:: json { action: "subscribe_to_comment_activity", errors: [], response_status: 200, request_id: 15606042, data: { id: 2, text: "user 2 creates a second comment", user: 2, }, } As you can see in this example, we are subscribe to **ALL ACTIVITY** of the comment model. """ self._serializer = func return self
async def subscribe( self, consumer: AsyncAPIConsumer, *args, **kwargs ) -> Iterable[str]: groups = list(self.group_names_for_consumer(*args, consumer=consumer, **kwargs)) for group_name in groups: await consumer.add_group(group_name) return groups async def unsubscribe( self, consumer: AsyncAPIConsumer, *args, **kwargs ) -> Iterable[str]: groups = list(self.group_names_for_consumer(*args, consumer=consumer, **kwargs)) for group_name in groups: await consumer.remove_group(group_name) return groups def group_names_for_consumer( self, consumer: AsyncAPIConsumer, *args, **kwargs ) -> Generator[str, None, None]: if self._group_names_for_consumer: for group in self._group_names_for_consumer( self, *args, consumer=consumer, **kwargs ): yield self.clean_group_name( "{}-{}".format(self._stable_observer_id, group) ) return for group in self.group_names(*args, **kwargs): yield self.clean_group_name(group) def group_names_for_signal(self, *args, **kwargs) -> Generator[str, None, None]: if self._group_names_for_signal: for group in self._group_names_for_signal(self, *args, **kwargs): yield self.clean_group_name( "{}-{}".format(self._stable_observer_id, group) ) return for group in self.group_names(*args, **kwargs): yield self.clean_group_name(group) def group_names(self, *args, **kwargs): raise NotImplementedError() def groups_for_consumer( self, func: Callable[["BaseObserver", AsyncAPIConsumer], Generator[str, None, None]], ): self._group_names_for_consumer = func return self def groups_for_signal(self, func: Callable[..., Generator[str, None, None]]): self._group_names_for_signal = func return self def groups(self, func): self._group_names_for_consumer = func self._group_names_for_signal = func return self def clean_group_name(self, name): # Some chanel layers have a max group name length. return f"DCRF-{hashlib.sha256(name.encode()).hexdigest()}"