Tutorial
In this tutorial we’re going to create a couple of services that demonstrate some of good stuff about working with Nameko.
A Simple Service
Let’s start with a simple service that uses one of the community extensions, nameko-grpc — a set of extensions that let Nameko services speak gRPC.
Here is a Nameko service that implements the Python Quickstart from the official gRPC docs:
# greeter.py
from nameko_grpc.entrypoint import Grpc
import grpc
# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto"
)
grpc = Grpc.implementing(helloworld_pb2_grpc.GreeterStub)
class Greeter:
name = "greeter"
@grpc
def say_hello(self, request, context):
return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")
@grpc
def say_hello_again(self, request, context):
return helloworld_pb2.HelloReply(message=f"Hello again, {request.name}!")
This is the protobuf definition from the official gRPC quickstart, except that the method names have been snake_cased to be more Pythonic.
// helloworld.proto
syntax = "proto3";
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc say_hello (HelloRequest) returns (HelloReply) {}
// Sends another greeting
rpc say_hello_again (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
This is the client from the official gRPC quickstart, but accounting for the snake_cased method names.
The protobuf codegen is inlined too for convenience, and it accepts the name as input.
# greeter_client.py
import logging
import grpc
# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto"
)
def run():
name = input("Enter a name: ")
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.say_hello(helloworld_pb2.HelloRequest(name=name))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
run()
Let’s see if it works.
Mix and Match Protocols
One of Nameko’s superpowers is being able to use different network protocols cohesively inside one service.
Let’s extend our Greeter
service by adding some asynchronous message passing. We’ll use the built-in EventDispatcher
extension to dispatch a Nameko Event whenever one of these methods are called.
And since asynchronous message passing isn’t much use unless there’s something consuming those messages, we’ll add another Listener
service to do that using the built-in event_handler
entrypoint.
# greeter_with_events.py
from nameko_grpc.entrypoint import Grpc
import grpc
from nameko.events import EventDispatcher
# run the codegen at runtime for convenience
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto"
)
grpc = Grpc.implementing(helloworld_pb2_grpc.GreeterStub)
class Greeter:
name = "greeter"
dispatch = EventDispatcher()
@grpc
def say_hello(self, request, context):
self.dispatch("hello", {"name": request.name})
return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!")
@grpc
def say_hello_again(self, request, context):
self.dispatch("hello_again", {"name": request.name})
return helloworld_pb2.HelloReply(message=f"Hello again, {request.name}!")
Now when we run them:
Add Persistence
Let’s add some persistence our service using another community extension, nameko-sqlalchemy.
We’ll create a SQLAlchemy model to store each greeting that we see, and update the listener to persist them using the Database
dependency provider.
Define a simple SQLAlchemy model.
# models.py
from datetime import datetime
from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.ext.declarative import declarative_base
DeclarativeBase = declarative_base(name="Greetings")
class Greeting(DeclarativeBase):
__tablename__ = "greetings"
id = Column(Integer, primary_key=True, autoincrement=True)
created_at = Column(
DateTime,
default=datetime.utcnow,
nullable=False
)
name = Column(String())
Update the Listener service to save an instance of the model for each event that it handles.
# listener_with_persistence.py
from nameko.events import event_handler
from nameko_sqlalchemy import Database
from models import DeclarativeBase, Greeting
import bootstrap # create the sqlite db if it doesn't exist
class Listener:
name = "listener"
db = Database(DeclarativeBase)
@event_handler("greeter", "hello")
def handle_greeting(self, payload):
print(f"Greeter met {payload['name']} ❤️")
with self.db.get_session() as session:
session.add(Greeting(name=payload['name']))
Let’s run it and check that the greetings are being saved.
Run the updated listener
Info
You’ll need the SQLite CLI for this step.
Query the database
Use Dependencies as Abstraction Layers
The Database
object we used in the example above is a Dependency Provider.
Nameko is opinionated about using dependency providers as an abstraction layer in your services. They are an opportunity to hide complexity from the rest of your service.
To explore this idea, let’s add new HTTP API that returned analytics about our greetings. We want it to return:
- The number of times a person has been greeted
- When their first greeting was
- How long it’s been since they were last greeted
As JSON:
These analytics are derived from the data stored in Greeting
model. If we wrap the Database
dependency provider, we can group the behaviour together into something that has a narrower interface.
For each model we care about we define a new wrapper class that exposes a narrow interface for that model.
We want to hide as many of the internals here as possible, so they don’t leak into the service implementation.
# repository.py
from datetime import datetime
from models import Greeting
class GreetingsRepository:
name = "greetings"
def __init__(self, db):
self.db = db
def add(self, name):
with self.db.get_session() as session:
session.add(Greeting(name=name))
def count(self, name):
return self.db.session.query(Greeting).filter_by(name=name).count()
def first_greeted(self, name):
first = (
self.db.session.query(Greeting)
.filter(Greeting.name == name)
.order_by(Greeting.created_at)
.first()
)
if first:
return first.created_at
def since_last_greeting(self, name):
last = (
self.db.session.query(Greeting)
.filter(Greeting.name == name)
.order_by(Greeting.created_at.desc())
.first()
)
if not last:
return
return (datetime.utcnow() - last.created_at).seconds
We make a new Storage
Dependency Provider that wraps the Database
one provided by nameko-sqlalchemy.
It has a generic structure to support multiple Repository
classes, but we only have one here.
# storage.py
from datetime import datetime
from nameko_sqlalchemy import Database
from models import DeclarativeBase, Greeting
from .repository import GreetingsRepository
class Storage(Database):
def __init__(self):
super().__init__(declarative_base=DeclarativeBase)
def get_dependency(self, worker_ctx):
db = super().get_dependency(worker_ctx)
return Repositories(db)
class Repositories:
repos = [
GreetingsRepository
# add new repositories here
]
def __init__(self, db):
self.db = db
self.register_repos()
def register_repos(self):
for repo in self.repos:
setattr(self, repo.name, repo(self.db))
Now we can implement our new HTTP API, and make use of the narrow interface exposed by the new dependency provider.
# listener_with_analytics.py
import json
from nameko.events import event_handler
from nameko.web.handlers import http
from storage import Storage
import bootstrap # create the sqlite db if it doesn't exist
class Listener:
name = "listener"
storage = Storage()
@http("GET", "/analytics/<string:name>")
def get_analytics(self, request, name):
greetings = self.storage.greetings
result = {
"total_greetings": greetings.count(name),
"first_greeted": greetings.first_greeted(name).strftime("%A, %d %b %Y"),
"seconds_since_last_greeting": greetings.since_last_greeting(name),
}
return 200, {"Content-Type": "application/json"}, json.dumps(result)
@event_handler("greeter", "hello")
def handle_greeting(self, payload):
print(f"Greeter met {payload['name']} ❤️")
self.storage.greetings.add(payload["name"])
It’s easier to reason about services that are written this way, and to test them, as we’ll see in the next section.
Testing
Nameko includes some utilities to make testing services easier.
Unit Testing Service Methods
Using dependencies to encapsulate chunks of logic makes it possible to strip them out when unit testing service methods. Nameko includes some utilities to make that process easier.
The worker_factory
returns instances of a service class, with its dependencies replaced by mocks or alternative objects if you provide them. You can use this to test the logic of service methods independently from their dependencies, or with specific stub or fake dependencies.
worker_factory
returns a worker without starting the underlying service. To exercise a service method you call it as a normal Python method, rather than triggering whatever entrypoint is attached to it.
For example:
…
from unittest.mock import Mock
from datetime import date
import json
from nameko.testing.services import worker_factory
from listener_with_analytics import Listener
def test_get_analytics():
worker = worker_factory(Listener)
# configure mock
worker.storage.greetings.count.return_value = 3
worker.storage.greetings.first_greeted.return_value = date(2021, 1, 1)
worker.storage.greetings.since_last_greeting.return_value = 3600
# call service method as Python method
result = worker.get_analytics(request=Mock(), name="Matt")
# make assertions about result
status_code, headers, payload = result
data = json.loads(payload)
assert data['total_greetings'] == 3
assert data['first_greeted'] == "Friday, 01 Jan 2021"
assert data['seconds_since_last_greeting'] == 3600
…
from unittest.mock import Mock
from datetime import date
import json
import pytest
from nameko.testing.services import worker_factory
from listener_with_analytics import Listener
@pytest.fixture
def storage_stub():
class GreetingsRepositoryStub:
count = lambda self, name: 3
first_greeted = lambda self, name: date(2021, 1, 1)
since_last_greeting = lambda self, name: 3600
class StorageStub:
greetings = GreetingsRepositoryStub()
return StorageStub()
def test_get_analytics(storage_stub):
# create worker with stub dependency
worker = worker_factory(Listener, storage=storage_stub)
# call service method as Python method
result = worker.get_analytics(request=Mock(), name="Matt")
# make assertions about result
status_code, headers, payload = result
data = json.loads(payload)
assert data['total_greetings'] == 3
assert data['first_greeted'] == "Friday, 01 Jan 2021"
assert data['seconds_since_last_greeting'] == 3600
In both cases we called worker.get_analytics
directly, rather than making an HTTP request.
Todo
worker_factory is a utility whereas container factory is a fixture. should try to resolve this either by making a fixture for worker factory, or moving container factory logic into a callable utility. or both.
Integration Testing
The container_factory
is a pytest fixture that lets you start service containers. Similar to the worker factory, you can opt to replace certain dependencies with stubs or fake implementations.
Todo
this doesn’t actually exist yet… gotta back-port it
Triggering Entrypoints
… obvs you can send a real request … if that’s not possible, use the entrypoint hook
Waiting For Entrypoints
… we need this to test asynchronous actions