-
Notifications
You must be signed in to change notification settings - Fork 56
/
ARedisChannel.php
executable file
·154 lines (147 loc) · 4.13 KB
/
ARedisChannel.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
<?php
/**
* Represents a redis pub/sub channel.
*
* Publishing messages:
* <pre>
* $channel = new ARedisChannel("myChannel");
* $channel->publish("hello world"); // sends a message to the channel
* </pre>
*
* Subscribing to channels:
* <pre>
* $channel = new ARedisChannel("myChannel");
* $channel->onReceiveMessage = function($redis, $channel, $message) {
* echo "Message Received:".$message."\n";
* };
* $channel->subscribe(); // blocks, the callback is triggered when a message is received
* </pre>
* @author Charles Pick
* @package packages.redis
*/
class ARedisChannel extends ARedisIterableEntity {
/**
* Holds the data in the entity
* @var array
*/
protected $_data = array();
/**
* Subscribes to the channel
* @return ARedisIterableChannel $this subscribed to the channel
*/
public function subscribe() {
if ($this->name === null) {
throw new CException(get_class($this)." requires a name!");
}
$this->getConnection()->getClient()->subscribe(array($this->name),array($this,"receiveMessage"));
return $this;
}
/**
* Unsubscribes from the channel
* @return ARedisIterableChannel $this unsubscribed from the channel
*/
public function unsubscribe() {
if ($this->name === null) {
throw new CException(get_class($this)." requires a name!");
}
$this->getConnection()->getClient()->unsubscribe(array($this->name));
return $this;
}
/**
* Publishes a message to the channel
* @param string $message The message to publish
* @return integer the number of clients that received the message
*/
public function publish($message) {
if ($this->name === null) {
throw new CException(get_class($this)." requires a name!");
}
$this->_data[] = $message;
return $this->getConnection()->getClient()->publish($this->name,$message);
}
/**
* Receives a message from a subscribed channel
* @param Redis $redis the redis client instance
* @param string $channel the name of the channel
* @param string $message the message content
*/
public function receiveMessage($redis, $channel, $message) {
$this->_data[] = $message;
$event=new CEvent($this);
$this->onReceiveMessage($event);
}
/**
* Gets the last received / sent message
* @return mixed the last message received, or null if no messages have been received yet
*/
public function getLastMessage() {
$count = count($this->_data);
if (!$count) {
return null;
}
return $this->_data[$count - 1];
}
/**
* This event is raised after a message is received
* @param CEvent $event the event parameter
*/
public function onReceiveMessage($event)
{
$this->raiseEvent('onReceiveMessage',$event);
}
/**
* Gets the number of items in the channel
* @return integer the number of items in the channel
*/
public function getCount() {
return count($this->_data);
}
/**
* Gets all the members in the sorted set
* @param boolean $forceRefresh whether to force a refresh or not, IGNORED!
* @return array the members in the set
*/
public function getData($forceRefresh = false) {
return $this->_data;
}
/**
* Returns whether there is an item at the specified offset.
* This method is required by the interface ArrayAccess.
* @param integer $offset the offset to check on
* @return boolean
*/
public function offsetExists($offset)
{
return isset($this->data[$offset]);
}
/**
* Returns the item at the specified offset.
* This method is required by the interface ArrayAccess.
* @param integer $offset the offset to retrieve item.
* @return mixed the item at the offset
* @throws CException if the offset is invalid
*/
public function offsetGet($offset)
{
return $this->_data[$offset];
}
/**
* Sets the item at the specified offset.
* This method is required by the interface ArrayAccess.
* @param integer $offset the offset to set item
* @param mixed $item the item value
*/
public function offsetSet($offset,$item)
{
$this->_data[$offset] = $item;
}
/**
* Unsets the item at the specified offset.
* This method is required by the interface ArrayAccess.
* @param integer $offset the offset to unset item
*/
public function offsetUnset($offset)
{
unset($this->_data[$offset]);
}
}