Subscriptions๐Ÿ”—

In this section, we'll cover how you can add subscriptions to your schema. Subscriptions are a way to get real-time updates from your server through your GraphQL Schema.

Setup๐Ÿ”—

To use subscriptions, you'll need to turn on Undine's async support, as subscription resolvers are always async. Then, you have two options for a transport protocol: WebSockets or Server-Sent Events.

WebSockets๐Ÿ”—

WebSockets use a persistent TCP connection between the client and server. They have broad client library support in the GraphQL ecosystem, making them a good choice when your client tooling expects WebSocket-based subscriptions.

To use WebSockets, you'll need use Undine's channels integration. See the GraphQL over WebSocket protocol for details on how the protocol works.

Server-Sent Events๐Ÿ”—

Server-Sent Events (SSE) use regular HTTP, which means they work through standard load balancers, proxies, and firewalls without special configuration. Since GraphQL subscriptions are inherently server-to-client, SSE is a natural fit and can be simpler to deploy than WebSockets.

SSE can operate in two modes: Distinct Connections mode and Single Connection mode.

Distinct Connections mode๐Ÿ”—

In Distinct Connections mode, each subscription opens its own SSE connection. This is the simpler mode and requires no extra setup beyond async support.

However, when using HTTP/1.1, browsers limit SSE connections to 6 per browser and domain, so you should use a web server capable of HTTP/2 in production. You can use USE_SSE_DISTINCT_CONNECTIONS_FOR_HTTP_1 to allow Distinct Connections mode over HTTP/1.1, if you know this isn't going to be an issue for your use case.

Single Connection mode๐Ÿ”—

In Single Connection mode, all operations are multiplexed over a single SSE connection, which avoids the HTTP/1.1 connection limit. This mode requires Undine's channels integration.

Unlike the reference implementation, which keeps state in-memory within a single process, Undine stores stream and operation state in Django sessions to guarantee a single connection in multi-worker deployments. This changes the implementation slightly compared to the reference implementation:

  1. Due to the possibility of session state becoming stale in case the client loses its stream connection, Undine's implementation allows creating a new stream even if one is already open. In this case, the existing stream is closed and replaced with a new one. The reference implementation always returns 409 Conflict if a stream is already open.

  2. Using sessions also means that Undine's implementation requires authentication, while the reference implementation does not enforce this.

Single Connection mode uses Django's cache framework and channel layers for state coordination. This requires both the cache backend and channel layer to work in multi-worker deployments. The cache backend should also support atomic cache.add. For example, using redis cache and channels-redis satisfies both requirements:

CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.redis.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/0",
    },
}

SESSION_ENGINE = "django.contrib.sessions.backends.cache"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

AsyncGenerators๐Ÿ”—

The simplest way of creating subscriptions is by using an AsyncGenerator function. Let's take a look at a simple example of a subscription that counts down from 10 to 0.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)
About method signature

A method decorated with @Entrypoint is treated as a static method by the Entrypoint.

The self argument is not an instance of the RootType, but root argument of the GraphQLField resolver. To clarify this, it's recommended to change the argument's name to root, as defined by the RESOLVER_ROOT_PARAM_NAME setting.

The value of the root argument for an Entrypoint is None by default, but can be configured using the ROOT_VALUE setting if desired.

The info argument can be left out, but if it's included, it should always have the GQLInfo type annotation.

This will create the following subscription in the GraphQL schema:

1
2
3
type Subscription {
    countdown: Int!
}

Using this subscription, you'll receive the following response 10 times on 1 second intervals, while the value of the countdown field is decreases from 10 to 1.

1
2
3
4
5
{
  "data": {
    "countdown": 10
  }
}

The subscription's output type will be determined based on the first generic type parameter on the AsyncGenerator return type (in this case int), so typing it is required.

To add arguments for the subscription, you can add them to the function signature. Typing these arguments is also required to determine their input type.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo, start: int = 10) -> AsyncGenerator[int, None]:
        for i in range(start, 0, -1):
            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

This will create the following subscription in the GraphQL schema:

1
2
3
type Subscription {
    countdown(start: Int! = 10): Int!
}

If an exception is raised in the function, the subscription will be closed and an error message will be sent to the client. You should raise exceptions subclassing GraphQLError for better error messages, or use the GraphQLErrorGroup to raise multiple errors at once.

import asyncio
from collections.abc import AsyncGenerator

from graphql import GraphQLError

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            if i == 5:
                msg = "Something went wrong"
                raise GraphQLError(msg)

            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

You can also yield a GraphQLError from the function, which will send an error while keeping the subscription open. Furthermore, adding the error to the return type does not change the return type of the subscription.

import asyncio
from collections.abc import AsyncGenerator

from graphql import GraphQLError

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int | GraphQLError, None]:
        for i in range(10, 0, -1):
            if i == 5:
                msg = "Something went wrong"
                yield GraphQLError(msg)
                continue

            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

AsyncIterables๐Ÿ”—

You can also use an AsyncIterable instead of creating an AsyncGenerator function. Note that the AsyncIterable needs to be returned from the Entrypoint function, not used as the Entrypoint reference itself. Otherwise, they work similarly to AsyncGenerators.

import asyncio
from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Countdown:
    def __aiter__(self) -> AsyncIterator[int]:
        return self.gen()

    async def gen(self) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncIterable[int]:
        return Countdown()


schema = create_schema(query=Query, subscription=Subscription)

Signal subscriptions๐Ÿ”—

Undine also supports creating subscriptions for Django signals using SignalSubscriptions. For example, if you wanted to listen to new Tasks being created, you could add a ModelCreateSubscription for the Task Model like this.

from undine import Entrypoint, QueryType, RootType, create_schema
from undine.subscriptions import ModelCreateSubscription

from .models import Task


class TaskType(QueryType[Task], auto=True): ...


class Query(RootType):
    tasks = Entrypoint(TaskType, many=True)


class Subscription(RootType):
    created_tasks = Entrypoint(ModelCreateSubscription(TaskType))


schema = create_schema(query=Query, subscription=Subscription)

Similar subscriptions exists for Model updates (ModelUpdateSubscription), deletes (ModelDeleteSubscription), and overall saves (ModelSaveSubscription). These subscriptions return data through QueryTypes so queries to them are optimized just like any other query.

For delete subscriptions, note that the Model instance may have been deleted by the time the subscription is executed. You should not rely on the instance existing in the database or its relations being connected like you would with a normal query.

However, a copy of the instance is made just before deletion so that you can query its details, but not its relations since those have not been prefetched.

For other signals, you can create custom subscriptions by subclassing undine.subscriptions.SignalSubscription and adding the appropriate converters in order to use it in your schema. See the "Hacking Undine" section for more information on how to do this.

Permissions๐Ÿ”—

As subscriptions use Entrypoints, you can use their permission checks to set per-value permissions for the subscription. Raising an exception from a permission check will close the subscription and send an error message to the client.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema
from undine.exceptions import GraphQLPermissionError


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i

    @countdown.permissions
    def countdown_permissions(self, info: GQLInfo, value: int) -> None:
        if value > 10 and not info.context.user.is_superuser:
            msg = "Countdown value is too high"
            raise GraphQLPermissionError(msg)


schema = create_schema(query=Query, subscription=Subscription)

When using GraphQL over WebSocket, you can also configure permission checks for establishing a websocket connection using the WEBSOCKET_CONNECTION_INIT_HOOK setting.

from typing import Any

from undine.exceptions import GraphQLPermissionError
from undine.utils.graphql.websocket import WebSocketRequest


def connection_init_hook(request: WebSocketRequest) -> dict[str, Any] | None:
    if not request.user.is_superuser:
        msg = "Only superusers can establish a connection"
        raise GraphQLPermissionError(msg)