Tiny Framework to build micro services. Currently only support amqp, rpc over amqp and http stream based micro services.
We declare a publisher:
the method decorated takes one parameter publish (farine.amqp.publisher.Publisher)
import farine.amqp
class Publish(object):
@farine.amqp.publish(exchange='consume', routing_key='routing_key')
def publish_dummy(self, publish):
"""
:param publish: Send the data through AMQP.
:type publish: farine.amqp.publisher.Publisher
"""
publish({'result':0})
And then a consumer :
the method decorated takes two parameters body (dict) and message (kombu.message.Message).
import farine.amqp
class Consume(object):
@farine.amqp.consume(exchange='publish', routing_key='routing_key')
def consume_dummy(self, body, message):
"""
:param body: The message's data.
:type body: dict
:param message: Message class.
:type message: kombu.message.Message
"""
message.ack()
We need declare two services :
the server : Wait for a call(consumer), and answer(publisher).
The method decorated just takes args and kwargs
import farine.rpc
class Server(object):
@farine.rpc.method()
def dummy(self, *args, **kwargs):
return True
And the client : Send a call(publisher), and wait for an answer(consumer).
the method decorated takes one argument rpc (farine.rpc.client.Client).
The result will be a dictionnary.
import farine.rpc
class Client(object):
@farine.rpc.client('myotherservice')
def dummy(self, rpc):
"""
:param rpc: The RPC client.
:type rpc: farine.rpc.client.Client
"""
result = rpc.dummy()
We can also do streaming RPC call.
All you need to do is to add __stream__ = True* to your RPC call.
Also, a generator is returned.
Example:
import farine.rpc
class Server(object):
@farine.rpc.method()
def dummy(self, *args, **kwargs):
yield 'a'
yield 'b'
import farine.rpc
class Client(object):
@farine.rpc.client('myotherservice')
def dummy(self, rpc):
"""
:param rpc: The RPC client.
:type rpc: farine.rpc.client.Client
"""
for result in rpc.dummy(__stream__=True):
print result
We can declare a service that will listen to an HTTP SSE event :
the method decorated takes one argument data (dict).
In the configuration file, we just need to setup the endpoint to listen.
import farine.stream
class Client(object):
@farine.stream.http()
def listen_event(self, data):
"""
:param data: The event sent.
:type data: dict
"""
return True
If all we need is to execute a method, we can use farine.execute.method.
It will restart it, if it ends (customizable).
import farine.execute
class Client(object):
@farine.execute.method()
def my_own_stuff(self):
"""
Your own code.
"""
baguette-messaging is using peewee to manage databases connections (only postgresql is supported for the moment)
In order for it to detect that you are using a database, you need to create a models.py module.
Then, each time we enter into a method (amqp,rpc,stream) a database connection will be created and closed.
You can use the database connection using self.db, to manage transactions for example.
models.py:
from farine.connectors.sql import *
class User(Model):
name = CharField()
service.py:
import farine.amqp
from models import User
class Client(object):
@farine.amqp.consume(exchange='exchange', routing_key='routing_key')
def select(self, body, message):
return User.select().where(User.id==1)
You can mix in a service, everything:
it can be a consumer of an HTTP stream, and send back the result in RPC, etc.
import farine.rpc
import farine.stream
class Client(object):
@farine.stream.http()
def get(self, data):
return self.send(data)
@farine.rpc.client('myotherservice')
def send(self, rpc, data):
return rpc.process(data)
By default the configuration file is located in /etc/farine.ini. You can override this path using the environment variable FARINE_INI.
It must contains one section by service (using the lowercase class name).
a DEFAULT section can also be present.
[DEFAULT] amqp_uri = amqp://baguette:baguette@127.0.0.1:5672/baguette [consume] enabled = true
If you use a database connection, you have to add in the [DEFAULT] section the db parameters:
[DEFAULT] db_connector = postgres (required) db_name = name (required) db_user = user (required) db_host = host (required) db_password = password (required) db_port = port (optional) db_max_conn = max_connections (optional) db_stale_timeout = stale timeout (optional) db_timeout = timeout (optional)
To launch a service, just run:
farine --start=mymodule
It will try to import mymodule.service and launch it.