"RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP standard."
---
- name: rabbitmq | add repository
shell: echo 'deb http://www.rabbitmq.com/debian/ testing main' > /etc/apt/sources.list.d/rabbitmq.list creates=/etc/apt/sources.list.d/rabbitmq.list
- name: rabbitmq | download key
get_url: url=http://www.rabbitmq.com/rabbitmq-signing-key-public.asc dest=/tmp/rabbitmq-signing-key-public.asc
- name: rabbitmq | add key
sudo: yes
command: apt-key add /tmp/rabbitmq-signing-key-public.asc
- name: rabbitmq | update apt
sudo: yes
apt: update_cache=yes
- name: rabbitmq | install
sudo: yes
apt: package=rabbitmq-server state=latest
- name: rabbitmq | enable plugins
shell: rabbitmq-plugins enable rabbitmq_management
- name: rabbitmq | add admin user
shell: rabbitmqctl add_user admin password
ignore_errors: true
- name: rabbitmq | set admin tags
shell: rabbitmqctl set_user_tags admin administrator
ignore_errors: true
- name: rabbitmq | set admin permissions
shell: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
ignore_errors: true
- name: rabbitmq | delete guest user
shell: rabbitmqctl delete_user guest
notify: rabbitmq | restart
ignore_errors: true
public function updateFacebookShares() {
$lastupdate = $this->findLastUpdateTimestampForType('shares');
$users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers();
foreach($users as $user) {
//fetch facebook posts since last update
$this->shareupdateservice->updateShares($user, $lastupdate);
}
}
public function updateFacebookShares() {
$lastupdate = $this->findLastUpdateTimestampForType('shares');
$users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers();
foreach($users as $user) {
//queue retrieval of facebook posts since last update
$this->facebookjobproducer->queueFacebookShares($user, $lastupdate);
}
}
class FacebookJobProducer {
private $sharesproducer;
public function __construct(Producer $sharesproducer) {
$this->sharesproducer = $sharesproducer;
}
public function queueFacebookShares($user, $lastupdate) {
$message = array(
'user_id' => $user->getId(),
'lastupdate' => $lastupdate
);
$this->sharesproducer->publish(serialize($message));
}
}
class FacebookJobConsumer {
private $em;
private $shareupdateservice;
public function __construct(EntityManager $entityManager, $shareupdateservice) {
$this->em = $entityManager;
$this->shareupdateservice = $shareupdateservice;
}
public function execute(AMQPMessage $msg) {
$message = unserialize($msg->body);
switch($msg->delivery_info['exchange']) {
case 'shares':
$this->processShares($message);
break;
}
}
private function processShares($message) {
$user = $this->em->getRepository('LifelyTFSSiteBundle:User')->findOneById($message['user_id']);
$this->shareupdateservice->updateShares($user, $message['lastupdate']);
}
old_sound_rabbit_mq:
connections:
default:
host: "%rabbitmq_host%"
port: "%rabbitmq_port%"
user: "%rabbitmq_user%"
password: "%rabbitmq_password%"
vhost: "%rabbitmq_vhost%"
lazy: true
producers:
shares:
connection: default
exchange_options:
name: 'shares'
type: direct
consumers:
shares:
connection: default
exchange_options: { name: 'shares', type: direct }
queue_options: { name: 'shares' }
callback: tfs.consumer.facebookjob
idle_timeout: 5
[{
"memory": 1038768,
"message_stats": {
"ack": 4816716,
"ack_details": {
"rate": 0
},
"deliver": 9460951,
"deliver_details": {
"rate": 0
},
"deliver_get": 9460951,
"deliver_get_details": {
"rate": 0
},
"publish": 4817957,
"publish_details": {
"rate": 0
},
"redeliver": 4644385,
"redeliver_details": {
"rate": 0
}
},
"messages": 1391,
"messages_details": {
"rate": 0
},
"messages_ready": 1391,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0
},
"idle_since": "2014-10-14 23:46:05",
"consumer_utilisation": "",
"policy": "",
"exclusive_consumer_tag": "",
"consumers": 0,
"backing_queue_status": {
"q1": 0,
"q2": 0,
"delta": ["delta", "undefined", 0, "undefined"],
"q3": 0,
"q4": 1391,
"len": 1391,
"pending_acks": 0,
"target_ram_count": "infinity",
"ram_msg_count": 1391,
"ram_ack_count": 0,
"next_seq_id": 15680549,
"persistent_count": 1391,
"avg_ingress_rate": 162.49305731486965,
"avg_egress_rate": 8.60980992660947e-21,
"avg_ack_ingress_rate": 8.60980992660947e-21,
"avg_ack_egress_rate": 1.427091082672728e-13
},
"state": "running",
"name": "sharelikes",
"vhost": "tfs",
"durable": true,
"auto_delete": false,
"arguments": {},
"node": "rabbit@lfy-production"
}, {
...
}]