Subscriptions🔗

In this section, we'll cover how you can add subscriptions to your schema. Subscriptions are a way to get real-time updates from your server through your GraphQL Schema.

Setup🔗

To use subscriptions, you'll need to turn on Undine's async support, and use the channels integration. This will set you up with a web server capable of GraphQL over WebSocket protocol. You'll also need a client capable of using the protocol.

Now, you can create a new RootType called Subscription and add Entrypoints that return an AsyncIterable, usually an AsyncGenerator.

AsyncGenerators🔗

Let's take a look at a simple example of a subscription that counts down from 10 to 0. This subscription is set up using an AsyncGenerator function.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)
About method signature

A method decorated with @Entrypoint is treated as a static method by the Entrypoint.

The self argument is not an instance of the RootType, but root argument of the GraphQLField resolver. To clarify this, it's recommended to change the argument's name to root, as defined by the RESOLVER_ROOT_PARAM_NAME setting.

The value of the root argument for an Entrypoint is None by default, but can be configured using the ROOT_VALUE setting if desired.

The info argument can be left out, but if it's included, it should always have the GQLInfo type annotation.

This will create the following subscription in the GraphQL schema:

1
2
3
type Subscription {
    countdown: Int!
}

Using this subscription, you'll receive the following response 10 times on 1 second intervals, while the value of the countdown field is decreases from 10 to 1.

1
2
3
4
5
{
  "data": {
    "countdown": 10
  }
}

The subscription's output type will be determined based on the first generic type parameter on the function's return type, so typing it is required.

To add arguments for the subscription, you can add them to the function signature. Typing these arguments is required to determine their input type.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo, start: int = 10) -> AsyncGenerator[int, None]:
        for i in range(start, 0, -1):
            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

This will create the following subscription in the GraphQL schema:

1
2
3
type Subscription {
    countdown(start: Int! = 10): Int!
}

AsyncIterables🔗

You can also use an AsyncIterable instead of creating an AsyncGenerator function. Note that the AsyncIterable needs to be returned from the Entrypoint function, not used as the Entrypoint reference itself. Otherwise, they work similarly to AsyncGenerators.

import asyncio
from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Countdown:
    def __aiter__(self) -> AsyncIterator[int]:
        return self.gen()

    async def gen(self) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncIterable[int]:
        return Countdown()


schema = create_schema(query=Query, subscription=Subscription)

Exceptions🔗

If an exception is raised in the subscription, the subscription will be closed and an error message will be sent to the client. You should raise exceptions subclassing GraphQLError for better error messages, or use the GraphQLErrorGroup to raise multiple errors at once.

import asyncio
from collections.abc import AsyncGenerator

from graphql import GraphQLError

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            if i == 5:
                msg = "Something went wrong"
                raise GraphQLError(msg)

            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

You can also yield a GraphQLError from the subscription, which will send an error while keeping the subscription open. Adding the error to the return type does not change the return type of the subscription.

import asyncio
from collections.abc import AsyncGenerator

from graphql import GraphQLError

from undine import Entrypoint, GQLInfo, RootType, create_schema


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int | GraphQLError, None]:
        for i in range(10, 0, -1):
            if i == 5:
                msg = "Something went wrong"
                yield GraphQLError(msg)
                continue

            await asyncio.sleep(1)
            yield i


schema = create_schema(query=Query, subscription=Subscription)

Permissions🔗

As subscriptions use Entrypoints, you can use their permission checks to set per-value permissions for the subscription. Raising an exception from a permission check will close the subscription and send an error message to the client.

import asyncio
from collections.abc import AsyncGenerator

from undine import Entrypoint, GQLInfo, RootType, create_schema
from undine.exceptions import GraphQLPermissionError


class Query(RootType):
    @Entrypoint
    def task(self, info: GQLInfo) -> str:
        return "Hello World"


class Subscription(RootType):
    @Entrypoint
    async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
        for i in range(10, 0, -1):
            await asyncio.sleep(1)
            yield i

    @countdown.permissions
    def countdown_permissions(self, info: GQLInfo, value: int) -> None:
        if value > 10 and not info.context.user.is_superuser:
            msg = "Countdown value is too high"
            raise GraphQLPermissionError(msg)


schema = create_schema(query=Query, subscription=Subscription)

You can also configure permission checks for establishing a websocket connection using the WEBSOCKET_CONNECTION_INIT_HOOK setting.

from typing import Any

from undine.exceptions import GraphQLPermissionError
from undine.utils.graphql.websocket import WebSocketRequest


def connection_init_hook(request: WebSocketRequest) -> dict[str, Any] | None:
    if not request.user.is_superuser:
        msg = "Only superusers can establish a connection"
        raise GraphQLPermissionError(msg)