import hashlib
from collections import defaultdict
from copy import deepcopy
from typing import Any, Dict, Generator, Callable, Iterable, Optional
from djangochannelsrestframework.consumers import AsyncAPIConsumer
from djangochannelsrestframework.observer.utils import ObjPartial
[docs]class BaseObserver:
"""
This is the Base Observer class that `Observer` and `ModelObserver` inherit from.
The decorators `@model_observer` and `@observer` replaced the wrapped method with an instance of these classes.
You can then access the methods of this class using the method name that you wrapped.
"""
def __init__(self, func, partition: str = "*"):
self.func = func
self._serializer = None
self._group_names_for_signal = None
self._group_names_for_consumer = None
self._stable_observer_id = (
f"{partition}-"
f"{self.__class__.__name__}-"
f"{self.func.__module__}."
f"{self.func.__name__}"
)
async def __call__(
self, message, consumer: Optional[AsyncAPIConsumer] = None, **kwargs
):
message = deepcopy(message)
message_body = message.pop("body", {})
message_type = message.pop("type")
group = message.get("group")
if consumer is not None:
requests = consumer._observer_group_to_request_id[self._stable_observer_id][
group
]
return await self.func(
consumer,
message_body,
observer=self,
message_type=message_type,
subscribing_request_ids=list(requests),
**message,
**kwargs,
)
return await self.func(
consumer,
message_body,
observer=self,
message_type=message_type,
subscribing_request_ids=[],
**message,
**kwargs,
)
def __get__(self, parent, objtype):
if parent is None:
return self
return ObjPartial(self, consumer=parent)
def serialize(self, signal, *args, **kwargs) -> Dict[str, Any]:
message_body = {}
if self._serializer:
message_body = self._serializer(self, signal, *args, **kwargs)
message = dict(type=self.func.__name__.replace("_", "."), body=message_body)
return message
[docs] def serializer(self, func):
"""
.. note::
Should be used as a method decorator eg: `@observed_handler.serializer`
The method that this wraps is evaluated just after the observer is triggered before the result is sent over
the channel layer. That means you **DO NOT** have access to user or other request information.
The result of this method is what is sent over the channel layer.
If you need to modify that with user specific information then you need to do that in the observer handler method.
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comments)
async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
...
@comment_activity.serializer
def comment_activity(self, instance: Comment, action, **kwargs):
return CommentSerializer(instance).data
The advantage of doing serialization at this point is that it happens only once even if 1000s of consumers are
subscribed to the event.
"""
self._serializer = func
return self
[docs] async def subscribe(
self, consumer: AsyncAPIConsumer, *args, request_id=None, **kwargs
) -> Iterable[str]:
"""
This should be called to subscribe the current consumer.
args and kwargs passed here are provided to the :meth:`groups_for_consumer` method to enable custom
partitioning of events.
If the request_id is passed to the subscribe method then the observer will track that request id and provide it
to the handling method when an event happens.
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comments)
async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
...
@action()
async def subscribe_to_comment_activity(self, request_id, **kwargs):
await self.comment_activity.subscribe(request_id=request_id)
"""
groups = list(self.group_names_for_consumer(*args, consumer=consumer, **kwargs))
for group_name in groups:
# add request id to mapping
if request_id is not None:
consumer._observer_group_to_request_id[self._stable_observer_id][
group_name
].add(request_id)
await consumer.add_group(group_name)
return groups
[docs] async def unsubscribe(
self, consumer: AsyncAPIConsumer, *args, request_id=None, **kwargs
) -> Iterable[str]:
"""
This should be called to unsubscribe the current consumer.
args and kwargs passed here are provided to the :meth:`groups_for_consumer` method to enable custom
partitioning of events.
If the request_id is passed to the un-subscribe method then this will un-subscribe the requests with the same
id that called the :meth:`subscribe` method. If no `request_id` is provided then all subscribed requests for
this consumer are un-subscribed.
.. code-block:: python
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comments)
async def comment_activity(self, message, observer=None, subscribing_request_ids=[], **kwargs):
...
@action()
async def unsubscribe_to_comment_activity(self, request_id, **kwargs):
await self.comment_activity.unsubscribe(request_id=request_id)
"""
groups = list(self.group_names_for_consumer(*args, consumer=consumer, **kwargs))
for group_name in groups:
# remove group to request mappings
if (
group_name
in consumer._observer_group_to_request_id[self._stable_observer_id]
):
# unsubscribe all requests to this group
if request_id is None:
consumer._observer_group_to_request_id[
self._stable_observer_id
].pop(group_name)
else:
consumer._observer_group_to_request_id[self._stable_observer_id][
group_name
].remove(request_id)
if (
len(
consumer._observer_group_to_request_id[self._stable_observer_id][
group_name
]
)
> 0
):
await consumer.remove_group(group_name)
return groups
def group_names_for_consumer(
self, consumer: AsyncAPIConsumer, *args, **kwargs
) -> Generator[str, None, None]:
if self._group_names_for_consumer:
for group in self._group_names_for_consumer(
self, *args, consumer=consumer, **kwargs
):
yield self.clean_group_name(
"{}-{}".format(self._stable_observer_id, group)
)
return
for group in self.group_names(*args, **kwargs):
yield self.clean_group_name(group)
def group_names_for_signal(self, *args, **kwargs) -> Generator[str, None, None]:
if self._group_names_for_signal:
for group in self._group_names_for_signal(self, *args, **kwargs):
yield self.clean_group_name(
"{}-{}".format(self._stable_observer_id, group)
)
return
for group in self.group_names(*args, **kwargs):
yield self.clean_group_name(group)
def group_names(self, *args, **kwargs):
raise NotImplementedError()
[docs] def groups_for_consumer(
self,
func: Callable[["BaseObserver", AsyncAPIConsumer], Generator[str, None, None]],
):
"""
.. note::
Should be used as a method decorator eg: `@observed_handler.groups_for_consumer`
The decorated method is used when :meth:`subscribe` and :meth:`unsubscribe` are called to enumerate the
corresponding groups to un/subscribe to.
The `args` and `kwargs` providing to :meth:`subscribe` and :meth:`unsubscribe` are passed here to enable this.
.. code-block:: python
@classroom_change_handler.groups_for_consumer
def classroom_change_handler(self, school=None, classroom=None, **kwargs):
# This is called when you subscribe/unsubscribe
if school is not None:
yield f'-school__{school.pk}'
if classroom is not None:
yield f'-pk__{classroom.pk}'
@action()
async def subscribe_to_classrooms_in_school(self, school_pk, request_id, **kwargs):
# check user has permission to do this
await self.classroom_change_handler.subscribe(school=school, request_id=request_id)
@action()
async def subscribe_to_classroom(self, classroom_pk, request_id, **kwargs):
# check user has permission to do this
await self.classroom_change_handler.subscribe(classroom=classroom, request_id=request_id)
It is important that a corresponding :meth:`groups_for_signal` method is provided that enumerates the groups
that each event is sent to.
"""
self._group_names_for_consumer = func
return self
[docs] def groups_for_signal(self, func: Callable[..., Generator[str, None, None]]):
"""
.. note::
Should be used as a method decorator eg: `@observed_handler.groups_for_signal`
The decorated method is used whenever an event happens that the observer is observing
(even if nothing is subscribed).
The role of this method is to enumerate the groups that the event should be sent over.
.. code-block:: python
@classroom_change_handler.groups_for_signal
def classroom_change_handler(self, instance: models.Classroom, **kwargs):
yield f'-school__{instance.school_id}'
yield f'-pk__{instance.pk}'
It is important that a corresponding :meth:`groups_for_consumer` method is provided to enable the consumers to
correctly select which groups to subscribe to.
"""
self._group_names_for_signal = func
return self
def groups(self, func):
self._group_names_for_consumer = func
self._group_names_for_signal = func
return self
def clean_group_name(self, name):
# Some chanel layers have a max group name length.
return f"DCRF-{hashlib.sha256(name.encode()).hexdigest()}"