Subscriptions
In this section, we'll cover how you can add subscriptions to your schema.
Subscriptions are a way to get real-time updates from your server through
your GraphQL Schema.
Setup
To use subscriptions, you'll need to turn on Undine's async support,
and use the channels integration. This will set you up
with a web server capable of GraphQL over WebSocket protocol. You'll also need
a client capable of using the protocol.
Now, you can create a new RootType
called Subscription
and add Entrypoints
that return an AsyncIterable, usually
an AsyncGenerator.
AsyncGenerators
Let's take a look at a simple example of a subscription that counts down from 10 to 0.
This subscription is set up using an AsyncGenerator
function.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
About method signature
A method decorated with @Entrypoint
is treated as a static method by the Entrypoint
.
The self
argument is not an instance of the RootType
,
but root
argument of the GraphQLField
resolver. To clarify this,
it's recommended to change the argument's name to root
,
as defined by the RESOLVER_ROOT_PARAM_NAME
setting.
The value of the root
argument for an Entrypoint
is None
by default,
but can be configured using the ROOT_VALUE
setting if desired.
The info
argument can be left out, but if it's included, it should always
have the GQLInfo
type annotation.
This will create the following subscription in the GraphQL schema:
| type Subscription {
countdown: Int!
}
|
Using this subscription, you'll receive the following response 10 times on 1 second intervals,
while the value of the countdown
field is decreases from 10 to 1.
| {
"data": {
"countdown": 10
}
}
|
The subscription's output type will be determined based on the first generic type parameter
on the function's return type, so typing it is required.
To add arguments for the subscription, you can add them to the function signature.
Typing these arguments is required to determine their input type.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo, start: int = 10) -> AsyncGenerator[int, None]:
for i in range(start, 0, -1):
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
This will create the following subscription in the GraphQL schema:
| type Subscription {
countdown(start: Int! = 10): Int!
}
|
AsyncIterables
You can also use an AsyncIterable
instead of creating an AsyncGenerator
function.
Note that the AsyncIterable
needs to be returned from the Entrypoint
function,
not used as the Entrypoint
reference itself. Otherwise, they work similarly to
AsyncGenerators
.
| import asyncio
from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Countdown:
def __aiter__(self) -> AsyncIterator[int]:
return self.gen()
async def gen(self) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncIterable[int]:
return Countdown()
schema = create_schema(query=Query, subscription=Subscription)
|
Exceptions
If an exception is raised in the subscription, the subscription will be closed
and an error message will be sent to the client. You should raise exceptions
subclassing GraphQLError
for better error messages, or use the GraphQLErrorGroup
to raise multiple errors at once.
| import asyncio
from collections.abc import AsyncGenerator
from graphql import GraphQLError
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
if i == 5:
msg = "Something went wrong"
raise GraphQLError(msg)
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
You can also yield a GraphQLError
from the subscription, which will send
an error while keeping the subscription open. Adding the error to the return
type does not change the return type of the subscription.
| import asyncio
from collections.abc import AsyncGenerator
from graphql import GraphQLError
from undine import Entrypoint, GQLInfo, RootType, create_schema
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int | GraphQLError, None]:
for i in range(10, 0, -1):
if i == 5:
msg = "Something went wrong"
yield GraphQLError(msg)
continue
await asyncio.sleep(1)
yield i
schema = create_schema(query=Query, subscription=Subscription)
|
Permissions
As subscriptions use Entrypoints
, you can use their permission checks to
set per-value permissions for the subscription. Raising an exception from
a permission check will close the subscription and send an error message
to the client.
| import asyncio
from collections.abc import AsyncGenerator
from undine import Entrypoint, GQLInfo, RootType, create_schema
from undine.exceptions import GraphQLPermissionError
class Query(RootType):
@Entrypoint
def task(self, info: GQLInfo) -> str:
return "Hello World"
class Subscription(RootType):
@Entrypoint
async def countdown(self, info: GQLInfo) -> AsyncGenerator[int, None]:
for i in range(10, 0, -1):
await asyncio.sleep(1)
yield i
@countdown.permissions
def countdown_permissions(self, info: GQLInfo, value: int) -> None:
if value > 10 and not info.context.user.is_superuser:
msg = "Countdown value is too high"
raise GraphQLPermissionError(msg)
schema = create_schema(query=Query, subscription=Subscription)
|
You can also configure permission checks for establishing a websocket connection
using the WEBSOCKET_CONNECTION_INIT_HOOK
setting.
| from typing import Any
from undine.exceptions import GraphQLPermissionError
from undine.utils.graphql.websocket import WebSocketRequest
def connection_init_hook(request: WebSocketRequest) -> dict[str, Any] | None:
if not request.user.is_superuser:
msg = "Only superusers can establish a connection"
raise GraphQLPermissionError(msg)
|