Subscriptions
In this section, we'll cover how you can add subscriptions to your schema.
Subscriptions are a way to get real-time updates from your server through
your GraphQL Schema.
Setup
To use subscriptions, you'll need to turn on Undine's async support,
and use the channels integration. This will set you up
with a web server capable of GraphQL over WebSocket protocol.
You'll also need a client capable of using the protocol.
Now, you can create a new RootType called Subscription
and add Entrypoints that return an AsyncIterable, usually
an AsyncGenerator.
AsyncGenerators
Let's take a look at a simple example of a subscription that counts down from 10 to 0.
This subscription is set up using an AsyncGenerator function.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
About method signature
A method decorated with @Entrypoint is treated as a static method by the Entrypoint.
The self argument is not an instance of the RootType,
but root argument of the GraphQLField resolver. To clarify this,
it's recommended to change the argument's name to root,
as defined by the RESOLVER_ROOT_PARAM_NAME
setting.
The value of the root argument for an Entrypoint is None by default,
but can be configured using the ROOT_VALUE
setting if desired.
The info argument can be left out, but if it's included, it should always
have the GQLInfo type annotation.
This will create the following subscription in the GraphQL schema:
| type Subscription {
countdown: Int!
}
|
Using this subscription, you'll receive the following response 10 times on 1 second intervals,
while the value of the countdown field is decreases from 10 to 1.
| {
"data": {
"countdown": 10
}
}
|
The subscription's output type will be determined based on the first generic type parameter
on the AsyncGenerator return type (in this case int), so typing it is required.
To add arguments for the subscription, you can add them to the function signature.
Typing these arguments is also required to determine their input type.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo, start: int = 10) -> AsyncGenerator[int, None]:
for i in range(start, 0, -1):
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
This will create the following subscription in the GraphQL schema:
| type Subscription {
countdown(start: Int! = 10): Int!
}
|
AsyncIterables
You can also use an AsyncIterable instead of creating an AsyncGenerator function.
Note that the AsyncIterable needs to be returned from the Entrypoint function,
not used as the Entrypoint reference itself. Otherwise, they work similarly to
AsyncGenerators.
| import asyncio
from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Countdown:
def __aiter__(self) -> AsyncIterator[int]:
return self.gen()
async def gen(self) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncIterable[int]:
return Countdown()
schema = create_schema(query=Query, subscription=Subscription)
|
Exceptions
If an exception is raised in the subscription, the subscription will be closed
and an error message will be sent to the client. You should raise exceptions
subclassing GraphQLError for better error messages, or use the GraphQLErrorGroup
to raise multiple errors at once.
| import asyncio
from collections.abc import AsyncGenerator
from graphql import GraphQLError
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
if i == 5:
msg = "Something went wrong"
raise GraphQLError(msg)
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
You can also yield a GraphQLError from the subscription, which will send
an error while keeping the subscription open. Furthermore, adding the error to the return
type does not change the return type of the subscription.
| import asyncio
from collections.abc import AsyncGenerator
from graphql import GraphQLError
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int | GraphQLError, None]:
for i in range(10, 0, -1):
if i == 5:
msg = "Something went wrong"
yield GraphQLError(msg)
continue
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
Permissions
As subscriptions use Entrypoints, you can use their permission checks
to set per-value permissions for the subscription. Raising an exception from
a permission check will close the subscription and send an error message
to the client.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
from undine.exceptions import GraphQLPermissionError
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
@countdown.permissions
def countdown_permissions(self, info: GQLInfo, value: int) -> None:
if value > 10 and not info.context.user.is_superuser:
msg = "Countdown value is too high"
raise GraphQLPermissionError(msg)
schema = create_schema(query=Query, subscription=Subscription)
|
You can also configure permission checks for establishing a websocket connection
using the WEBSOCKET_CONNECTION_INIT_HOOK
setting.
| from typing import Any
from undine.exceptions import GraphQLPermissionError
from undine.utils.graphql.websocket import WebSocketRequest
def connection_init_hook(request: WebSocketRequest) -> dict[str, Any] | None:
if not request.user.is_superuser:
msg = "Only superusers can establish a connection"
raise GraphQLPermissionError(msg)
|