Quickstart
Example
This will be a toy example, so the specific implementations details are not important here, rather the separation of logic.
We are going to build a pipeline that takes product reviews from users, saves them to our database, contacts some outside system for recommendations based on the given review, and responds with the user's review and recommendation.
It's a good idea to first define an InputSerializer, and an OutputSerializer that define the input and output of the pipeline respectively. This forces verification of the incoming and outcoming data, so that if something changes in the pipeline, or some unexpected values are produced, the endpoint will break instead of creating side effects in the application using the API. We'll also get better documentation in The Browsable API or any tool based on the OpenAPI specification, like Swagger.
More about automatic documentation for pipeline views here.
from django.contrib.auth.models import User
from rest_framework import serializers
class ReviewInputSerializer(serializers.Serializer):
product_id = serializers.UUIDField()
score = serializers.ChoiceField(choices=[1, 2, 3, 4, 5])
review = serializers.CharField()
user = serializers.SerializerMethodField()
# Current request object is always included
# in the serializer context
def get_user(self, obj) -> User:
return self.context["request"].user
class ReviewOutputSerializer(serializers.Serializer):
class RecommendationSerializer(serializers.Serializer):
product_id = serializers.CharField()
avg_score = serializers.FloatField()
score = serializers.ChoiceField(choices=[1, 2, 3, 4, 5])
review = serializers.CharField()
recommendations = RecommendationSerializer(many=True)
And now the main logic, the business logic
from uuid import UUID
import requests
from django.contrib.auth.models import User
from .models import Product, Review
class Recommencation(TypedDict):
product_id: str
avg_score: float
def review_product(product_id: UUID, score: int, review: str, user: User):
product = Product.objects.get(product_id)
user_review = Review.objects.add_review(product, user, score, review)
return {"product": product, "review": user_review}
def get_recommendations(product: Product, review: Review):
payload = {"product": str(product.id), "score": review.score}
response = requests.get("...", params=payload)
data: list[Recommencation] = response.json()
return {
"score": review.score,
"review": review.content,
"recommendations": data
}
Finally, let's put those together in the pipeline.
from pipeline_views import BasePipelineView
from .serializers import ReviewInputSerializer, ReviewOutputSerializer
from .services import review_product, get_recommendations
class SomeView(BasePipelineView):
pipelines = {
"POST": [
ReviewInputSerializer,
add_review_for_product,
get_recommendations,
ReviewOutputSerializer,
],
}
A Closer Look
Notice that the output from the previous function is used as the input of for the next function.
def review_product(product_id: UUID, score: int, review: str, user: User):
product = Product.objects.get(product_id)
user_review = Review.objects.add_review(product, user, score, review)
return {"product": product, "review": user_review}
def get_recommendations(product: Product, review: Review):
payload = {"product": str(product.id), "score": review.score}
response = requests.get("...", params=payload)
data: list[Recommencation] = response.json()
return {
"score": review.score,
"review": review.content,
"recommendations": data
}
Depending on your needs, you might want to reuse a logic fuction in a different context,
and you might not always give the same input. You can make the functions more generic
by specifying **kwargs
.
def review_product(**kwargs):
product_id: UUID = kwargs["product_id"]
score: int = kwargs["score"]
review: str = kwargs["review"]
user: User = kwargs["user"]
product = Product.objects.get(product_id)
user_review = Review.objects.add_review(product, user, score, review)
return {"product": product, "review": user_review}
def get_recommendations(**kwargs):
product: Product = kwargs["product"]
review: Review = kwargs["review"]
payload = {"product": str(product.id), "score": review.score}
response = requests.get("...", params=payload)
data: list[Recommencation] = response.json()
return {
"score": review.score,
"review": review.content,
"recommendations": data
}
You can even make functions that are only used to verify input, but do not modify the output at all (or only add data to it).
def validate_data(**kwargs):
# Validation goes here.
# Might raise an exception,
# which interrupts the pipeline.
return kwargs
Another point no note is that the functions are easily testable.
review_product
can be tested without mocking the GET request
to the outside system, get_recommendations
can be tested
without making database queries. Some functions, like validators,
have no side effects, which makes them easy and fast to unit test.