Skip to content

Tutorial

In this tutorial we’re going to create a couple of services that demonstrate some of good stuff about working with Nameko.

A Simple Service

Let’s start with a simple service that uses one of the community extensions, nameko-grpc — a set of extensions that let Nameko services speak gRPC.

Here is a Nameko service that implements the Python Quickstart from the official gRPC docs:

# greeter.py

from nameko_grpc.entrypoint import Grpc
import grpc

# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
    "helloworld.proto"
)
grpc = Grpc.implementing(helloworld_pb2_grpc.GreeterStub)


class Greeter:
    name = "greeter"

    @grpc
    def say_hello(self, request, context):
        return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")

    @grpc
    def say_hello_again(self, request, context):
        return helloworld_pb2.HelloReply(message=f"Hello again, {request.name}!")

This is the protobuf definition from the official gRPC quickstart, except that the method names have been snake_cased to be more Pythonic.

// helloworld.proto

syntax = "proto3";

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc say_hello (HelloRequest) returns (HelloReply) {}
  // Sends another greeting
  rpc say_hello_again (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

This is the client from the official gRPC quickstart, but accounting for the snake_cased method names.

The protobuf codegen is inlined too for convenience, and it accepts the name as input.

# greeter_client.py

import logging
import grpc

# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
    "helloworld.proto"
)


def run():

    name = input("Enter a name: ")

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.say_hello(helloworld_pb2.HelloRequest(name=name))
    print("Greeter client received: " + response.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

This example requires the following libraries.

# requirements.txt
nameko>=3
nameko-grpc>=1.2.0
grpcio-tools  # for runtime codegen
nameko-sqlalchemy   # for later example

Let’s see if it works.

Install the dependencies:

$ pip install -r requirements.txt
...

Run the service:

$ nameko run greeter
starting services: greeter

From another terminal, run the client:

$ python greeter_client.py
Enter a name: Matt
Greeter client received: Hello, Matt!

🎉

Mix and Match Protocols

One of Nameko’s superpowers is being able to use different network protocols cohesively inside one service.

Let’s extend our Greeter service by adding some asynchronous message passing. We’ll use the built-in EventDispatcher extension to dispatch a Nameko Event whenever one of these methods are called.

And since asynchronous message passing isn’t much use unless there’s something consuming those messages, we’ll add another Listener service to do that using the built-in event_handler entrypoint.

# greeter_with_events.py

from nameko_grpc.entrypoint import Grpc
import grpc
from nameko.events import EventDispatcher

# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
    "helloworld.proto"
)
grpc = Grpc.implementing(helloworld_pb2_grpc.GreeterStub)


class Greeter:
    name = "greeter"

    dispatch = EventDispatcher()

    @grpc
    def say_hello(self, request, context):
        self.dispatch("hello", {"name": request.name})
        return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")

    @grpc
    def say_hello_again(self, request, context):
        self.dispatch("hello_again", {"name": request.name})
        return helloworld_pb2.HelloReply(message=f"Hello again, {request.name}!")
# listener.py

from nameko.events import event_handler


class Listener:
    name = "listener"

    @event_handler("greeter", "hello")
    def handle_greeting(self, payload):
        print(f"Greeter met {payload['name']} ❤️")

Now when we run them:

Start the greeter:

$ nameko run greeter-with-events --define AMQP_URI=pyamqp://guest:guest@localhost:5672
starting services: greeter

Info

We need a RabbitMQ broker in order to use the Events extensions. You can quickly start one running on the default ports with docker:

$ docker run -d -p 5672:5672 rabbitmq:3

Start the listener:

$ nameko run listener --define AMQP_URI=pyamqp://guest:guest@localhost:5672
starting services: listener
Connected to amqp://guest:**@127.0.0.1:5672//

Run the client:

$ python greeter_client.py
Enter a name: Matt
Greeter client received: Hello, Matt!

Check the listener again:

$ nameko run listener --define AMQP_URI=pyamqp://guest:guest@localhost:5672
starting services: listener
Greeter met Matt ❤️

Add Persistence

Let’s add some persistence our service using another community extension, nameko-sqlalchemy.

We’ll create a SQLAlchemy model to store each greeting that we see, and update the listener to persist them using the Database dependency provider.

Define a simple SQLAlchemy model.

# models.py

from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.ext.declarative import declarative_base


DeclarativeBase = declarative_base(name="Greetings")


class Greeting(DeclarativeBase):
    __tablename__ = "greetings"

    id = Column(Integer, primary_key=True, autoincrement=True)
    created_at = Column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    name = Column(String())

Update the Listener service to save an instance of the model for each event that it handles.

# listener_with_persistence.py

from nameko.events import event_handler
from nameko_sqlalchemy import Database

from models import DeclarativeBase, Greeting
import bootstrap  # create the sqlite db if it doesn't exist


class Listener:
    name = "listener"

    db = Database(DeclarativeBase)

    @event_handler("greeter", "hello")
    def handle_greeting(self, payload):
        print(f"Greeter met {payload['name']} ❤️")

        with self.db.get_session() as session:
            session.add(Greeting(name=payload['name']))

We need to tell the dependency provider where the database lives, using the Nameko config file


Let’s run it and check that the greetings are being saved.

Run the updated listener

nameko run listener_with_persistence --config config.yaml
starting services: listener
Connected to amqp://guest:**@127.0.0.1:5672//

Run the client again

python greeter_client.py
Enter a name: Anush

Info

You’ll need the SQLite CLI for this step.

Query the database

$ echo "select name from greetings" | sqlite3 greetings.db
Anush

🙌

Use Dependencies as Abstraction Layers

The Database object we used in the example above is a Dependency Provider.

Nameko is opinionated about using dependency providers as an abstraction layer in your services. They are an opportunity to hide complexity from the rest of your service.

To explore this idea, let’s add new HTTP API that returned analytics about our greetings. We want it to return:

  • The number of times a person has been greeted
  • When their first greeting was
  • How long it’s been since they were last greeted

As JSON:

{
    "total_greetings": 2,
    "first_greeted": "2021-01-01",
    "seconds_since_last_greeting": 10
}

These analytics are derived from the data stored in Greeting model. If we wrap the Database dependency provider, we can group the behaviour together into something that has a narrower interface.

For each model we care about we define a new wrapper class that exposes a narrow interface for that model.

We want to hide as many of the internals here as possible, so they don’t leak into the service implementation.

# repository.py

from datetime import datetime

from models import Greeting


class GreetingsRepository:
    name = "greetings"

    def __init__(self, db):
        self.db = db

    def add(self, name):
        with self.db.get_session() as session:
            session.add(Greeting(name=name))

    def count(self, name):
        return self.db.session.query(Greeting).filter_by(name=name).count()

    def first_greeted(self, name):
        first = (
            self.db.session.query(Greeting)
            .filter(Greeting.name == name)
            .order_by(Greeting.created_at)
            .first()
        )
        if first:
            return first.created_at

    def since_last_greeting(self, name):
        last = (
            self.db.session.query(Greeting)
            .filter(Greeting.name == name)
            .order_by(Greeting.created_at.desc())
            .first()
        )
        if not last:
            return

        return (datetime.utcnow() - last.created_at).seconds

We make a new Storage Dependency Provider that wraps the Database one provided by nameko-sqlalchemy.

It has a generic structure to support multiple Repository classes, but we only have one here.

# storage.py

from datetime import datetime
from nameko_sqlalchemy import Database

from models import DeclarativeBase, Greeting
from .repository import GreetingsRepository


class Storage(Database):
    def __init__(self):
        super().__init__(declarative_base=DeclarativeBase)

    def get_dependency(self, worker_ctx):
        db = super().get_dependency(worker_ctx)
        return Repositories(db)


class Repositories:
    repos = [
        GreetingsRepository
        # add new repositories here
    ]

    def __init__(self, db):
        self.db = db
        self.register_repos()

    def register_repos(self):
        for repo in self.repos:
            setattr(self, repo.name, repo(self.db))

Now we can implement our new HTTP API, and make use of the narrow interface exposed by the new dependency provider.

# listener_with_analytics.py

import json

from nameko.events import event_handler
from nameko.web.handlers import http

from storage import Storage
import bootstrap  # create the sqlite db if it doesn't exist


class Listener:
    name = "listener"

    storage = Storage()

    @http("GET", "/analytics/<string:name>")
    def get_analytics(self, request, name):

        greetings = self.storage.greetings
        result = {
            "total_greetings": greetings.count(name),
            "first_greeted": greetings.first_greeted(name).strftime("%A, %d %b %Y"),
            "seconds_since_last_greeting": greetings.since_last_greeting(name),
        }
        return 200, {"Content-Type": "application/json"}, json.dumps(result)

    @event_handler("greeter", "hello")
    def handle_greeting(self, payload):
        print(f"Greeter met {payload['name']} ❤️")
        self.storage.greetings.add(payload["name"])

It’s easier to reason about services that are written this way, and to test them, as we’ll see in the next section.

Testing

Nameko includes some utilities to make testing services easier.

Unit Testing Service Methods

Using dependencies to encapsulate chunks of logic makes it possible to strip them out when unit testing service methods. Nameko includes some utilities to make that process easier.

The worker_factory returns instances of a service class, with its dependencies replaced by mocks or alternative objects if you provide them. You can use this to test the logic of service methods independently from their dependencies, or with specific stub or fake dependencies.

worker_factory returns a worker without starting the underlying service. To exercise a service method you call it as a normal Python method, rather than triggering whatever entrypoint is attached to it.

For example:

from unittest.mock import Mock
from datetime import date
import json

from nameko.testing.services import worker_factory

from listener_with_analytics import Listener


def test_get_analytics():
    worker = worker_factory(Listener)

    # configure mock
    worker.storage.greetings.count.return_value = 3
    worker.storage.greetings.first_greeted.return_value = date(2021, 1, 1)
    worker.storage.greetings.since_last_greeting.return_value = 3600

    # call service method as Python method
    result = worker.get_analytics(request=Mock(), name="Matt")

    # make assertions about result 
    status_code, headers, payload = result
    data = json.loads(payload)
    assert data['total_greetings'] == 3
    assert data['first_greeted'] == "Friday, 01 Jan 2021"
    assert data['seconds_since_last_greeting'] == 3600

from unittest.mock import Mock
from datetime import date
import json

import pytest
from nameko.testing.services import worker_factory

from listener_with_analytics import Listener


@pytest.fixture
def storage_stub():

    class GreetingsRepositoryStub:
        count = lambda self, name: 3
        first_greeted = lambda self, name: date(2021, 1, 1)
        since_last_greeting = lambda self, name: 3600

    class StorageStub:
        greetings = GreetingsRepositoryStub()

    return StorageStub()


def test_get_analytics(storage_stub):
    # create worker with stub dependency
    worker = worker_factory(Listener, storage=storage_stub)

    # call service method as Python method
    result = worker.get_analytics(request=Mock(), name="Matt")

    # make assertions about result
    status_code, headers, payload = result
    data = json.loads(payload)
    assert data['total_greetings'] == 3
    assert data['first_greeted'] == "Friday, 01 Jan 2021"
    assert data['seconds_since_last_greeting'] == 3600

In both cases we called worker.get_analytics directly, rather than making an HTTP request.

Todo

worker_factory is a utility whereas container factory is a fixture. should try to resolve this either by making a fixture for worker factory, or moving container factory logic into a callable utility. or both.

Integration Testing

The container_factory is a pytest fixture that lets you start service containers. Similar to the worker factory, you can opt to replace certain dependencies with stubs or fake implementations.

Todo

this doesn’t actually exist yet… gotta back-port it

Triggering Entrypoints

… obvs you can send a real request … if that’s not possible, use the entrypoint hook

Waiting For Entrypoints

… we need this to test asynchronous actions