Queueing jobs with RabbitMQ and Symfony2

@peterpeerdeman

Problem space

  • API rate limits
  • crunching numbers
  • generate PDFs
  • webscraping
  • html fragment creation

RabbitMQ

"RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP standard."

RabbitMQ Ansible

                		
---
- name: rabbitmq | add repository
  shell: echo 'deb http://www.rabbitmq.com/debian/ testing main' > /etc/apt/sources.list.d/rabbitmq.list creates=/etc/apt/sources.list.d/rabbitmq.list

- name: rabbitmq | download key
  get_url: url=http://www.rabbitmq.com/rabbitmq-signing-key-public.asc dest=/tmp/rabbitmq-signing-key-public.asc

- name: rabbitmq | add key
  sudo: yes
  command: apt-key add /tmp/rabbitmq-signing-key-public.asc

- name: rabbitmq | update apt
  sudo: yes
  apt:  update_cache=yes

- name: rabbitmq | install
  sudo: yes
  apt:  package=rabbitmq-server state=latest

- name: rabbitmq | enable plugins
  shell: rabbitmq-plugins enable rabbitmq_management

- name: rabbitmq | add admin user
  shell: rabbitmqctl add_user admin password
  ignore_errors: true

- name: rabbitmq | set admin tags
  shell: rabbitmqctl set_user_tags admin administrator
  ignore_errors: true

- name: rabbitmq | set admin permissions
  shell: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  ignore_errors: true

- name: rabbitmq | delete guest user
  shell: rabbitmqctl delete_user guest
  notify: rabbitmq | restart
  ignore_errors: true
    					
    				
                        
public function updateFacebookShares() {
    $lastupdate = $this->findLastUpdateTimestampForType('shares');
    $users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers();

    foreach($users as $user) {
        //fetch facebook posts since last update
        $this->shareupdateservice->updateShares($user, $lastupdate);
    }
}
                        
                    
                            
public function updateFacebookShares() {
    $lastupdate = $this->findLastUpdateTimestampForType('shares');
    $users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers();

    foreach($users as $user) {
        //queue retrieval of facebook posts since last update
        $this->facebookjobproducer->queueFacebookShares($user, $lastupdate);
    }
}
                            
                        

JobProducer

                    	
class FacebookJobProducer {    
    private $sharesproducer;

    public function __construct(Producer $sharesproducer) {
        $this->sharesproducer = $sharesproducer;
    }

    public function queueFacebookShares($user, $lastupdate) {
        $message = array(
            'user_id' => $user->getId(),
            'lastupdate' => $lastupdate
        );
        
        $this->sharesproducer->publish(serialize($message));
    }
}
                        
    				

JobConsumer

                    	
class FacebookJobConsumer {    
    private $em;
    private $shareupdateservice;

    public function __construct(EntityManager $entityManager, $shareupdateservice) {
        $this->em = $entityManager;
        $this->shareupdateservice = $shareupdateservice;
    }

    public function execute(AMQPMessage $msg) {
        $message = unserialize($msg->body);

        switch($msg->delivery_info['exchange']) {
            case 'shares':
                $this->processShares($message);
                break;
        }

    }

    private function processShares($message) {
        $user = $this->em->getRepository('LifelyTFSSiteBundle:User')->findOneById($message['user_id']);
        $this->shareupdateservice->updateShares($user, $message['lastupdate']);
    }
    					
    				

RabbitMQ config

                    	
old_sound_rabbit_mq:
    connections:
        default:
            host:     "%rabbitmq_host%"
            port:     "%rabbitmq_port%"
            user:     "%rabbitmq_user%"
            password: "%rabbitmq_password%"
            vhost:    "%rabbitmq_vhost%"
            lazy:     true
    producers:
        shares:
            connection:       default
            exchange_options: 
                name: 'shares'
                type: direct 
    consumers:
        shares:
            connection:       default
            exchange_options: { name: 'shares', type: direct }
            queue_options:    { name: 'shares' }
            callback:         tfs.consumer.facebookjob
            idle_timeout:     5
    					
    				

RabbitMQ REST API

                    	
[{
    "memory": 1038768,
    "message_stats": {
        "ack": 4816716,
        "ack_details": {
            "rate": 0
        },
        "deliver": 9460951,
        "deliver_details": {
            "rate": 0
        },
        "deliver_get": 9460951,
        "deliver_get_details": {
            "rate": 0
        },
        "publish": 4817957,
        "publish_details": {
            "rate": 0
        },
        "redeliver": 4644385,
        "redeliver_details": {
            "rate": 0
        }
    },
    "messages": 1391,
    "messages_details": {
        "rate": 0
    },
    "messages_ready": 1391,
    "messages_ready_details": {
        "rate": 0
    },
    "messages_unacknowledged": 0,
    "messages_unacknowledged_details": {
        "rate": 0
    },
    "idle_since": "2014-10-14 23:46:05",
    "consumer_utilisation": "",
    "policy": "",
    "exclusive_consumer_tag": "",
    "consumers": 0,
    "backing_queue_status": {
        "q1": 0,
        "q2": 0,
        "delta": ["delta", "undefined", 0, "undefined"],
        "q3": 0,
        "q4": 1391,
        "len": 1391,
        "pending_acks": 0,
        "target_ram_count": "infinity",
        "ram_msg_count": 1391,
        "ram_ack_count": 0,
        "next_seq_id": 15680549,
        "persistent_count": 1391,
        "avg_ingress_rate": 162.49305731486965,
        "avg_egress_rate": 8.60980992660947e-21,
        "avg_ack_ingress_rate": 8.60980992660947e-21,
        "avg_ack_egress_rate": 1.427091082672728e-13
    },
    "state": "running",
    "name": "sharelikes",
    "vhost": "tfs",
    "durable": true,
    "auto_delete": false,
    "arguments": {},
    "node": "rabbit@lfy-production"
}, {
    ...
}]
    					
    				

Findings

  • extra moving element in stack
  • configuration overhead
  • lightweight alternatives (JMSJobQueueBundle / ZeroMQ)
  • feature complete (dashboard, api, exchanges)
  • advocates application modularisation

Queueing jobs with RabbitMQ and Symfony2

@peterpeerdeman