OpenStack Marconi API

Filed in Product & Development by Oz Akan | August 29, 2013 3:00 pm

Rackspace Cloud Queues[1] is backed by the open source project, Marconi.  Below, Oz Akan,  Development Manager for Rackspace Cloud Queues and an active contributor to Marconi, walks us through the project. Want to try out Marconi without managing your own environment?  Want to explore open source code with the comfort of Fanatical Support behind you? Rackspace Cloud Queues is currently accepting Early Access participants.  Sign up here[2].  

What is Marconi?

Marconi is an open source message queue implementation that utilizes a RESTful HTTP interface to provide an asynchronous communications protocol, which is one of the main requirements in today’s scalable applications. Using a queue as a communication layer, the sender and receiver of the message do not need to interact with the message queue at the same time. As a result, these can scale independently and be less prone to individual failures.

Marconi supports publisher-subscriber and producer-consumer patterns. I will focus on producer-consumer pattern and under the section “Python Way” I will give an example using python requests library. First, let’s look at the terminology and our old friend, curl samples.

[3]Terminology

[4]cURL Way

Since there is nothing abstracted in curl and available on Linux servers, I find curl is a good tool to practice restful interfaces and like keeping these commands handy.

[5]Get Authentication Token

If you run Keystone middleware with Marconi then you will have to get authentication token in order to use with the following calls.

First, I assign username, api key and endpoint to shell variables to make getting a token just a copy-paste.

$ USERNAME=my-username
$ APIKEY=my-long-api-key
$ ENDPOINT=ord.queues.api.rackspacecloud.com
$ echo $USERNAME
my-username
$ echo $APIKEY
my-long-api-key
$ echo ENDPOINT
ord.queues.api.rackspacecloud.com

Let’s do curl magic to populate TOKEN variable.

[6]Request
$ TOKEN=`curl -s https://identity.api.rackspacecloud.com/v2.0/tokens -X 'POST' -d '{"auth":{"RAX-\
KSKEY:apiKeyCredentials":{"username":"'$USERNAME'", "apiKey":"'$APIKEY'"}}}' -H "Content-Type: application/json" \
| python -c "import sys,json;print json.loads(sys.stdin.readlines()[0])['access']['token']['id']"`

Check if TOKEN value is set

$ echo $TOKEN
0998e7a6des3344f91184f213eaacbe7

Do a quick check if TOKEN works, by listing the queues we have.

[7]Request
$ curl -i -X GET https://$ENDPOINT:443/v1/queues -H "X-Auth-Token: $TOKEN"
[8]Response
HTTP/1.1 204 No Content

We don’t have a queue yet so we got the expected 204 No Content result. If there was a problem with the TOKEN we would get 401 Unauthorized.

[9]Get Node Health

Let’s see if there is a service we can talk to.

[10]Request
$ curl -i -X GET https://$ENDPOINT:443/v1/health -H "X-Auth-Token: $TOKEN"
[11]Response
HTTP/1.1 204 No Content

Here, we get a 204 response, even though it may seem like something is wrong with the service, this call is just to see if service can reply back. So, 204 is good. Maybe this is an indication that Marconi is brief, to the point and doesn’t like chatter.

[12]Create a Queue

We will have to create a queue in order to be able to post messages into. Queues are not created with the first message, so we need to send the request below to create a queue named “samplequeue.”

[13]Request
$ curl -i -X PUT https://$ENDPOINT:443/v1/queues/samplequeue -H "X-Auth-Token: $TOKEN"
[14]Response
HTTP/1.1 201 Created
Content-Length: 0
Location: /v1/queues/samplequeue

[15]List Queues

So far we have one queue, let’s list our queues.

[16]Request
$ curl -i -X GET https://$ENDPOINT:443/v1/queues -H "X-Auth-Token: $TOKEN"
[17]Response
HTTP/1.1 200 OK
Content-Length: 140
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues

{"queues": [{"href": "/v1/queues/samplequeue", "name": "samplequeue"}], "links": [{"href": "/v1/queues?marker=samplequeue", "rel": "next"}]}

[18]Post a Message

As we have a queue named samplequeue we can not post a message to the queue. We will post a message with a TTL value of 300 and it will have a key-value pair in the body as "event" : "one".

[19]Request
$ curl -i -X POST https://$ENDPOINT:443/v1/queues/samplequeue/messages -d '
[{"ttl": 300, "body": {"event": "one"}}]
' -H "Content-type: application/json" -H "Client-ID: QClient" -H "X-Auth-Token: $TOKEN"
[20]Response
HTTP/1.1 201 Created
Content-Length: 93
Content-Type: application/json; charset=utf-8
Location: /v1/queues/samplequeue/51e840b61d10b20570d56ff4

{"partial": false, "resources": ["/v1/queues/samplequeue/messages/51e840b61d10b20570d56ff4"]}

[21]Post Messages

Marconi supports posting 10 messages at the same time, so let’s try to post two within the same request.

[22]Request
$ curl -i -X POST https://$ENDPOINT:443/v1/queues/samplequeue/messages -d '
[   
    {"ttl": 300, "body": {"event": "two"}},
    {"ttl": 60, "body": {"event": "three"}}
]
' -H "Content-type: application/json" -H "Client-ID: QClient" -H "X-Auth-Token: $TOKEN"
[23]Response
HTTP/1.1 201 Created
Content-Length: 153
Content-Type: application/json; charset=utf-8
Location: /v1/queues/samplequeue/messages?ids=51e840e71d10b2055fd565fb,51e840e71d10b2055fd565fc

{"partial": false, "resources": ["/v1/queues/samplequeue/messages/51e840e71d10b2055fd565fb", "/v1/queues/samplequeue/messages/51e840e71d10b2055fd565fc"]}

Above, if you check the response, you will see that Marconi returned two ids. It is always a good practice to post messages in batches as network latency will be a smaller factor in overall performance compared to sending one message at a time.

[24]Get Messages

We can get 10 messages with a call.

[25]Request
$ curl -i -X GET https://$ENDPOINT:443/v1/queues/samplequeue/messages?echo=true -H "X-Auth-Token: $TOKEN" -H "Content-type: application/json" -H "Client-ID: QClient"
[26]Response
HTTP/1.1 200 OK
Content-Length: 461
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues/samplequeue/messages?echo=true

{"messages": [
{"body": {"event": "two"}, "age": 230, "href": "/v1/queues/samplequeue/messages/51e84e8b1d10b2055fd565fd", "ttl": 300}, 
{"body": {"event": "two"}, "age": 3, "href": "/v1/queues/samplequeue/messages/51e84f6e1d10b20571d56f0e", "ttl": 300}, 
{"body": {"event": "three"}, "age": 3, "href": "/v1/queues/samplequeue/messages/51e84f6e1d10b20571d56f0f", "ttl": 60}], "links": [{"href": "/v1/queues/samplequeue/messages?marker=9&echo=true", "rel": "next"}
]}

If we wait for a while, 60 seconds to be exact, we will see two messages instead of three as the message with a TTL value of 60 seconds will expire.

We can also get a specific message by providing the message id.

[27]Request
$ curl -i -X GET https://$ENDPOINT:443/v1/queues/samplequeue/messages?echo=true&ids=51e84e8b1d10b2055fd565fd -H "X-Auth-Token: $TOKEN" -H "Content-type: application/json" -H "Client-ID: QClient"
[28]Response
HTTP/1.1 200 OK
Content-Length: 261
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues/samplequeue/messages?echo=true

{"messages": [
{"body": {"event": "two"}, "age": 230, "href": "/v1/queues/samplequeue/messages/51e84e8b1d10b2055fd565fd", "ttl": 300}
]}

[29]Claim Messages

Claiming a message is pretty much like marking a message, so it will be invisible when another worker wants to claim messages. By default, 10 messages are claimed. In the sample request below, we will get two messages claimed as we use pass two as limit.

[30]Request
$ curl -i -X POST https://$ENDPOINT:443/v1/queues/samplequeue/claims?limit=2 -d '
{ "ttl" : 60, "grace": 60}
' -H "X-Auth-Token: $TOKEN" -H "Content-type: application/json" -H "Client-ID: QClient"
[31]Response
HTTP/1.1 200 OK
Content-Length: 306
Content-Type: application/json; charset=utf-8
Location: /v1/queues/samplequeue/claims/51e852d01d10b2056dd5703c

[{"body": {"event": "two"}, "age": 5, "href": "/v1/queues/samplequeue/messages/51e852cb1d10b20571d56f10?claim_id=51e852d01d10b2056dd5703c", "ttl": 300}, 
{"body": {"event": "three"}, "age": 5, "href": "/v1/queues/samplequeue/messages/51e852cb1d10b20571d56f11?claim_id=51e852d01d10b2056dd5703c", "ttl": 120}]

[32]Delete Message

[33]Request
$ curl -i -X DELETE curl -i -X POST https://$ENDPOINT:443/v1/queues/samplequeue/messages/51e852cb1d10b20571d56f10?claim_id=51e852d01d10b2056dd5703c -H "X-Auth-Token: $TOKEN" -H "Content-type: application/json" -H "Client-ID: QClient"
[34]Response
HTTP/1.1 204 No Content

204 is a valid response which validates that there isn’t a message with the given message and claim id. It doesn’t necessarily say that message is deleted though.

[35]Python Way

Curl provides a convenient way to test the marconi restful interface, but likely not the tool to develop an application. Now let’s see how these requests would be used in an application written in Python.

Most of the applications will have a logic similar to this:

Below, I created three classes. Queue_Connection handles http calls. Producer handles queue creation and posts messages to the queue. Consumer claims messages from the queue and deletes afterwards.

Import requests import json from time import sleep.

class Queue_Connection(object):

    def __init__(self, username, apikey):
        url = 'https://identity.api.rackspacecloud.com/v2.0/tokens'
        payload  = {"auth":{"RAX-KSKEY:apiKeyCredentials":{"username": username , "apiKey": apikey }}}
        headers = {'Content-Type': 'application/json'}
        r = requests.post(url, data=json.dumps(payload), headers=headers)
        self.token = r.json()['access']['token']['id']
        self.headers = {'X-Auth-Token' : self.token, 'Content-Type': 'application/json', 'Client-ID': 'QClient1'}

    def token(self):
        return self.token

    def get(self, url, payload=None):
        r = requests.get(url, data=json.dumps(payload), headers=self.headers)
        return [r.status_code, r.headers, r.content]

    def post(self, url, payload=None):
        r = requests.post(url, data=json.dumps(payload), headers=self.headers)
    return [r.status_code, r.headers, r.content]

    def put(self, url, payload=None):
        r = requests.put(url, data=json.dumps(payload), headers=self.headers)
    return [r.status_code, r.headers, r.content]

    def delete(self, url, payload=None):
        r = requests.delete(url, data=json.dumps(payload), headers=self.headers)
        return [r.status_code, r.headers, r.content]

class Producer(Queue_Connection):

    def __init__(self, url, username, apikey):
        super(Producer, self).__init__(username, apikey)
        self.base_url = url

    def queue_name():
        def fget(self):
            return self._queue_name
        def fset(self, value):
            self._queue_name = value
        def fdel(self):
            del self._queue_name
            return locals()
    queue_name = property(**queue_name())

    def queue_exists(self):
        url = self.base_url + '/v1/queues/' + self.queue_name + '/stats'
        if self.get(url)[0] == 200:
            return True
        return False

    def create_queue(self, payload=None):
        url = self.base_url + "/v1/queues/" + self.queue_name
        res =  self.put(url, payload)
    if res[0] == 200:
            print '%s created' % self.queue_name
    elif res[0] == 204:
            print 'A queue named %s is present' % self.queue_name
        else:
            print 'Problem with queue creation,'

    def post_messages(self, payload):
        url = self.base_url + '/v1/queues/' + self.queue_name + '/messages'
        res = self.post(url, payload)
    if res[0] == 201:
            return json.loads(res[2])['resources']
        else:
            print "Couldn't post messages"

class Consumer(Queue_Connection):

    def __init__(self, url, username, apikey):
        super(Consumer, self).__init__(username, apikey)
        self.base_url = url

    def claim_messages(self, payload, limit=1):
        url = self.base_url + '/v1/queues/' + self.queue_name + '/claims?limit=' + str(limit)
        res = self.post(url, payload)
    if res[0] == 201:
            return json.loads(res[2])
        else:
            print "Couldn't claim messages"

    def delete_message(self, url):
        url = self.base_url + url

    res = self.delete(url)
        if res[0] == 204:
            return True
        else:
            return False

if __name__ == "__main__":

    username = 'my-username'
    apikey = 'my-long-apikey'
    url = 'https://ord.queues.api.rackspacecloud.com:443'
    queue_name = 'testqueue1'

    """ create a Producer instance """
    pub = Producer(url, username, apikey)
    pub.queue_name = queue_name

    if not pub.queue_exists():
    print "Creating queue", pub.queue_name
        pub.create_queue({"metadata": "My Queue"})
    else:
        print "Queue exists"

    """ create and post two messages """
    data = [{"ttl": 60,"body": {"task":"task one"}},{"ttl": 60,"body": {"task":"task two"}}]
    for message in pub.post_messages(data):
        print "message: ", message

    """ create a Consumer instance """
    con = Consumer(url, username, apikey)

    """ define ttl and grace times for the claim """
    data = {"ttl":60, "grace":60}
    con.queue_name = queue_name
    """ claim 2 messages """
    messages = con.claim_messages(data, 2)
    for message in messages:
        print "I am working on message : ", message['body']['task']
        """
        pretend to be doing something with the message
        """
        sleep(1)
        """
        when done delete
        """
        print "finished working on message : ", message['body']['task']
        if con.delete_message(message['href']):
            print message['href'], " deleted"

This is a very primitive example. A real application would require handling exceptions but still it wouldn’t be very far from this.

I believe the effort to start using queues in an application is well worth the benefits. With Marconi, we are going to have queue as a service, which means someone will manage it for us and we will just enjoy the benefits. In a matter of weeks there is going to be a python client ready and it will get just easier to talk to Marconi.

Happy queuing.

Endnotes:
  1. Rackspace Cloud Queues: http://www.rackspace.com/blog/join-the-cloud-queues-early-access-program-today/
  2. here:
  3. : https://gist.github.com/ozgurakan/b409978317b1c3075780#terminology
  4. : https://gist.github.com/ozgurakan/b409978317b1c3075780#curl-way
  5. : https://gist.github.com/ozgurakan/b409978317b1c3075780#get-authentication-token
  6. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request
  7. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-1
  8. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response
  9. : https://gist.github.com/ozgurakan/b409978317b1c3075780#get-node-health
  10. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-2
  11. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-1
  12. : https://gist.github.com/ozgurakan/b409978317b1c3075780#create-a-queue
  13. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-3
  14. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-2
  15. : https://gist.github.com/ozgurakan/b409978317b1c3075780#list-queues
  16. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-4
  17. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-3
  18. : https://gist.github.com/ozgurakan/b409978317b1c3075780#post-a-message
  19. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-5
  20. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-4
  21. : https://gist.github.com/ozgurakan/b409978317b1c3075780#post-messages
  22. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-6
  23. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-5
  24. : https://gist.github.com/ozgurakan/b409978317b1c3075780#get-messages
  25. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-7
  26. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-6
  27. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-8
  28. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-7
  29. : https://gist.github.com/ozgurakan/b409978317b1c3075780#claim-messages
  30. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-9
  31. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-8
  32. : https://gist.github.com/ozgurakan/b409978317b1c3075780#delete-message
  33. : https://gist.github.com/ozgurakan/b409978317b1c3075780#request-10
  34. : https://gist.github.com/ozgurakan/b409978317b1c3075780#response-9
  35. : https://gist.github.com/ozgurakan/b409978317b1c3075780#python-way

Source URL: http://www.rackspace.com/blog/openstack-marconi-api/