pip install cocaine-tools
cocaine-tool app upload
chmod u+x echo.py
{ "isolate": { "args": { "spool": "/var/spool/cocaine" }, "type": "process" } }
cocaine-tool profile upload --name profile@test --profile ./profile.json
cocaine-tool app start --name echo --profile profile@test
from cocaine.futures import chain from cocaine.services import Service from tornado.ioloop import IOLoop # Alias for more readability. asynchronous = chain.source if __name__ == '__main__': io_loop = IOLoop.current() service = Service('echo') @asynchronous def invoke(message): result = yield service.enqueue('ping', message) print(result) invoke('Hello World!') invoke('Hello again!') io_loop.start()
Service
, passing the application name (echo in our case) as a parameter. This object is a mapping of the methods and events of the cloud service / application to the class methods in the language used. Our work with it is to asynchronously call the enqueue
method provided by all applications, passing in the name of the event as the first parameter and the remaining parameters required by the application (all of a sudden); and then just asynchronously wait for a response. To do this, we get the object of the event loop using the Tornado library, start it and wait. When the answer is received, the cycle does not stop explicitly.@asynchronous
additional actions to the decorated function, namely:Service
class to Cocaine. In other languages, one has to write one or other stubs, which will be shown later.Service
object is not echo = Service('42')
0, Chunk ::= <Tuple> 1, Error ::= (<Number>, <String>) 2, Choke ::= ()
Choke
, strictly speaking, applications canChunk
's, thereby organizing a real stream processing of data and their asynchronous from cocaine.worker import Worker</python> def echo(request, response): message = yield request.read() response.write(message) response.close() W = Worker() W.run({ 'ping': echo, })
ping
, the echo
function is called, which takes the already prepared request and response objects as arguments. By calling the request.read()
method, we asynchronously read one Chunk of the request (in our case, it is strictly one), and we respond using the response.write(...)
method. def echo(request, response): message = yield request.read() response.write(message) response.write('Additional message') response.close()
cocaine-tool app upload && cocaine-tool app restart --name echo --profile profile@test
invoke
function as follows: @asynchronous def invoke(message): result = yield service.enqueue('ping', message) another_result = yield print(result, another_result) io_loop.stop()
ChokeEvent
will be thrown, which againService
object. What is going on with him? After encoding the message and sending it, it waits. Waits for a response, waits for an error, waits for the timeout to occur. All these expectations are normal IO operations that do not waste processor resources. So why give it one more time to cool, if you can give more work? Imagine that you are making requests to some application or service not from the client code, but from another application . It is clear that if this application makes stop the world on each such request, it will lead to the rapid spawning of such applications, which subsequently will simply be a waste of system resources. #include <cocaine/framework/services/app.hpp> #include <cocaine/framework/services/storage.hpp> #include <iostream> #include <condition_variable> #include <mutex> #include <atomic> namespace cf = cocaine::framework; int main(int argc, char** argv) { auto manager = cf::service_manager_t::create(cf::service_manager_t::endpoint_t("localhost", 10053)); // Get application service object. auto app = manager->get_service<cf::app_service_t>("echo"); std::atomic<int> counter(0); std::condition_variable cv; // Call application. auto g1 = app->enqueue("ping", "Hello from C++"); auto g2 = app->enqueue("ping", "Hello again!"); auto handler = [&counter, &cv](cf::generator<std::string>& g) { counter++; try { // Always packed data. std::cout << "result: " << g.next() << std::endl; } catch (const std::exception& e) { std::cout << "error: " << e.what() << std::endl; } cv.notify_all(); }; g1.then(handler); g2.then(handler); std::mutex m; std::unique_lock<std::mutex> guard(m); while (counter < 2) { cv.wait(guard); } return 0; }
clang++ -std=c++11 -stdlib=libc++ -lcocaine-core -lcocaine-framework -lmsgpack -lboost_system -lev ../src/clients/echo-client.cpp
-stdlib=libc++
.eventmachine
framework, which provides the opportunity require 'cocaine' require 'cocaine/synchrony/service' require 'em-synchrony' class EchoClient EM.synchrony do results = [] service = Cocaine::Synchrony::Service.new 'echo' channel = service.enqueue('ping', 'Hello from Ruby!') channel.each do |result| results.push result end puts results EM.stop end end
resolve
.Storage
service, using secondary requests from the application back to Cocaine.generate
. It will return the generated QR code in binary form as some sort of byte array (well, how else?), Which we will feed to a special object in order to convert them into a real picture.PIL
or pillow
, which in turn requires many binary dependencies, such as libpng. Make sure you have them, and then set qrcode
using the same pip
: >pip install qrcode
#!/usr/bin/env python import StringIO import msgpack import qrcode from cocaine.exceptions import ServiceError from cocaine.decorators import http from cocaine.logging import Logger from cocaine.services import Service from cocaine.worker import Worker storage = Service('storage') def generate_qr(message, size=10): if size <= 0 or size >= 20: raise ValueError('size argument must fit in [0; 20]') out = StringIO.StringIO() img = qrcode.make(message, box_size=size) img.save(out, 'png') return out.getvalue() def generate(request, response): rq = yield request.read() message, size = msgpack.loads(rq) try: if size < 10: data = generate_qr(message, size) else: key = '{0}size={1}'.format(message, size) try: data = yield storage.read('qr-codes', key) except ServiceError: data = generate_qr(message, size) yield storage.write('qr-codes', key, data) response.write(data) except Exception as err: response.error(1, str(err)) finally: response.close() w = Worker() w.run({ 'generate': generate })
storage = Service('storage')
we declare a storage service global for the worker, which, let me remind you, we w = Worker() w.run({ 'generate': generate })
generate
event handler along the way. Now any invoke
message with the correct parameters will be redirected to the specified function. Consider it in more detail.read
method for deferred reading of data sentChunk
, Error
, Choke
). rq = yield request.read() message, size = msgpack.loads(rq)
message, size = yield request.read()
? Do not forget that we have to unpack some structure from raw bytes. Last time it was a string, this time a tuple. Need some kind of coder / decoder. For this purpose it may be useful even json , at least msgpack , even protobuf . We chose msgpack in this example, because its implementations exist for all popular languages and it is fast. key = '{0}size={1}'.format(message, size) try: data = yield storage.read('qr-codes', key) except ServiceError: data = generate_qr(message, size) yield storage.write('qr-codes', key, data)
storage.read('qr-codes', key)
throw an exception. In this case, we still generate the image and save it.read
and write
operations are performed asynchronously. Every time when we in our Corout run into the keyword yield
, control returns to the event loop. It returns back onlyread
. .finally
. , , , .echo
. Cocaine, cocaine-tool
: cd src/qr && cocaine-tool app upload && cocaine-tool app start --name qr --profile profile@test
echo
: import StringIO import msgpack from PIL import Image from tornado.ioloop import IOLoop from cocaine.futures import chain from cocaine.services import Service # Alias for more readability. asynchronous = chain.source if __name__ == '__main__': io_loop = IOLoop.current() service = Service('qr') @asynchronous def invoke(message): try: result = yield service.enqueue('generate', msgpack.dumps([message, 10])) print('Result:', result) out = StringIO.StringIO() out.write(result) out.seek(0) img = Image.open(out) img.save('qr.png', 'png') except Exception as err: print('Error: ', err) finally: io_loop.stop() invoke('What is best in life? To crush your enemies, see them driven before you, and to hear the lamentation of ' 'their women.') io_loop.start()
invoke
, . , , PIL
.@http
. ( ) request
response
. - read
, , , cocaine.decorators.http._HTTPRequest
, — , , .. ( response
) . >#!/usr/bin/env python import StringIO import msgpack import qrcode from cocaine.exceptions import ServiceError from cocaine.decorators import http from cocaine.logging import Logger from cocaine.services import Service from cocaine.worker import Worker storage = Service('storage') log = Logger() def generate_qr(message, size=10): if size <= 0 or size >= 20: raise ValueError('size argument must fit in [0; 20]') out = StringIO.StringIO() img = qrcode.make(message, box_size=size) img.save(out, 'png') return out.getvalue() @http def generate(request, response): request = yield request.read() try: message = request.request['message'] size = int(request.request.get('size', 10)) if size < 10: data = generate_qr(message, size) else: key = '{0}size={1}'.format(message, size) try: data = yield storage.read('qr-codes', key) except ServiceError: data = generate_qr(message, size) yield storage.write('qr-codes', key, data) response.write_head(200, [('Content-type', 'image/png')]) response.write(data) except KeyError: response.write_head(400, [('Content-type', 'text/plain')]) response.write('Query field "message" is required') except Exception as err: response.write_head(400, [('Content-type', 'text/plain')]) response.write(str(err)) finally: response.close() w = Worker() w.run({ 'generate-http': generate })
generate-http
— .cocaine-native-proxy
. , - , cocaine-tool
, , Python, : cocaine-tool proxy start --count=32
count
— , .cocaine-native-proxy
. , .<APP>/<METHOD>[?<Args>]
.X-Cocaine-Service
X-Cocaine-Event
./qr/generate-http?message=Hello%20World!&size=10
.cocaine-proxy
(, Storage
) — .cocaine-runtime
, cocaine-tool info
cocaine-runtime
cocaine-tool info
: ab -n 100000 -c 32 'localhost:8080/qr/generate-http?message=Hello%20World!&size=10'
cocaine-runtime
, 6 QR-, cocaine-runtime
: [Tue Apr 8 15:36:18 2014] [INFO] app/qr: enlarging the pool from 5 to 6 slaves
cocaine-tool info
- :SIGTERM
, . >package main import ( "net/http" "code.google.com/p/rsc/qr" "github.com/ugorji/go/codec" "github.com/cocaine/cocaine-framework-go/cocaine" ) //msgpack specific var ( mh codec.MsgpackHandle h = &mh ) var ( OkHeaders cocaine.Headers = cocaine.Headers{[2]string{"Content-type", "image/png"}} ErrorHeaders cocaine.Headers = cocaine.Headers{[2]string{"Content-type", "text/plain"}} storage *cocaine.Service ) const ( cacheNamepspace = "qr-code" cacheTag = "qr-tag" ) func qenerate(text string) (png []byte, err error) { res := <-storage.Call("read", cacheNamepspace, text) if res.Err() == nil { err = res.Extract(&png) return } c, err := qr.Encode(text, qr.L) if err != nil { return } png = c.PNG() <-storage.Call("write", cacheNamepspace, text, string(png), []string{cacheTag}) return } func on_generate(request *cocaine.Request, response *cocaine.Response) { defer response.Close() inc := <-request.Read() var task struct { Text string Size int } err := codec.NewDecoderBytes(inc, h).Decode(&task) if err != nil { response.ErrorMsg(-100, err.Error()) return } png, err := qenerate(task.Text) if err != nil { response.ErrorMsg(-200, err.Error()) return } response.Write(png) } func on_http_generate(request *cocaine.Request, response *cocaine.Response) { defer response.Close() r, err := cocaine.UnpackProxyRequest(<-request.Read()) if err != nil { response.ErrorMsg(-200, err.Error()) return } message := r.FormValue("message") if len(message) == 0 { response.Write(cocaine.WriteHead(http.StatusBadRequest, ErrorHeaders)) response.Write("Missing argument `message`") return } png, err := qenerate(message) if err != nil { response.Write(cocaine.WriteHead(http.StatusInternalServerError, ErrorHeaders)) response.Write("Unable to generate QR") return } response.Write(cocaine.WriteHead(http.StatusOK, OkHeaders)) response.Write(png) } func main() { binds := map[string]cocaine.EventHandler{ "generate": on_generate, "generate-http": on_http_generate, } Worker, err := cocaine.NewWorker() if err != nil { panic(err) } storage, err = cocaine.NewService("storage") if err != nil { panic(err) } Worker.Loop(binds) }
Source: https://habr.com/ru/post/220243/
All Articles