Skip to content

Commit 65b8714

Browse files
Merge pull request #120 from basho/ts
Implement Riak TimeSeries support
2 parents 13654bd + b8c0394 commit 65b8714

28 files changed

+1524
-2
lines changed

examples/TimeSeries.php

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
<?php
2+
3+
require __DIR__ . '/../vendor/autoload.php';
4+
5+
use Basho\Riak;
6+
use Basho\Riak\Command;
7+
use Basho\Riak\Node;
8+
9+
$node = (new Node\Builder)
10+
->atHost('riak-test')
11+
->onPort(8087)
12+
->build();
13+
14+
$riak = new Riak([$node], [], new Riak\Api\Pb());
15+
16+
17+
# create table
18+
$table_definition = "
19+
CREATE TABLE %s (
20+
family varchar not null,
21+
series varchar not null,
22+
time timestamp not null,
23+
weather varchar not null,
24+
temperature double,
25+
PRIMARY KEY((family, series, quantum(time, 15, 'm')), family, series, time)
26+
)";
27+
28+
$command = (new Command\Builder\TimeSeries\Query($riak))
29+
->withQuery(sprintf($table_definition, "GeoCheckins"))
30+
->build();
31+
32+
if (!$response->isSuccess()) {
33+
echo $response->getMessage();
34+
exit;
35+
}
36+
37+
38+
# describe table
39+
$command = (new Command\Builder\TimeSeries\DescribeTable($riak))
40+
->withTable('GeoCheckins')
41+
->build();
42+
43+
if (!$response->isSuccess()) {
44+
echo $response->getMessage();
45+
exit;
46+
}
47+
48+
foreach ($response->getResults() as $column_index => $column_definition) {
49+
print_r([$column_index => $column_definition]);
50+
}
51+
52+
53+
# store a row
54+
$response = (new Command\Builder\TimeSeries\StoreRows($riak))
55+
->inTable('GeoCheckins')
56+
->withRow([
57+
(new Cell("family"))->setValue("family1"),
58+
(new Cell("series"))->setValue("series1"),
59+
(new Cell("time"))->setTimestampValue(1420113600),
60+
(new Cell("weather"))->setValue("hot"),
61+
(new Cell("temperature"))->setValue(23.5),
62+
])
63+
->build()
64+
->execute();
65+
66+
if (!$response->isSuccess()) {
67+
echo $response->getMessage();
68+
exit;
69+
}
70+
71+
72+
# store rows
73+
$response = (new Command\Builder\TimeSeries\StoreRows($riak))
74+
->inTable('GeoCheckins')
75+
->withRows([
76+
[
77+
(new Cell("family"))->setValue("family1"),
78+
(new Cell("series"))->setValue("series1"),
79+
(new Cell("time"))->setTimestampValue(1420115400),
80+
(new Cell("weather"))->setValue("hot"),
81+
(new Cell("temperature"))->setValue(22.4),
82+
],
83+
[
84+
(new Cell("family"))->setValue("family1"),
85+
(new Cell("series"))->setValue("series1"),
86+
(new Cell("time"))->setTimestampValue(1420117200),
87+
(new Cell("weather"))->setValue("warm"),
88+
(new Cell("temperature"))->setValue(20.5),
89+
],
90+
])
91+
->build()
92+
->execute();
93+
94+
if (!$response->isSuccess()) {
95+
echo $response->getMessage();
96+
exit;
97+
}
98+
99+
100+
# fetch a row
101+
/** @var Command\TimeSeries\Response $response */
102+
$response = (new Command\Builder\TimeSeries\FetchRow($riak))
103+
->atKey([
104+
(new Cell("family"))->setValue("family1"),
105+
(new Cell("series"))->setValue("series1"),
106+
(new Cell("time"))->setTimestampValue(1420113600),
107+
])
108+
->inTable('GeoCheckins')
109+
->build()
110+
->execute();
111+
112+
if (!$response->isSuccess()) {
113+
echo $response->getMessage();
114+
exit;
115+
}
116+
117+
# output row data
118+
foreach ($response->getRow() as $index => $column) {
119+
switch ($column->getType()) {
120+
case Riak\TimeSeries\Cell::INT_TYPE:
121+
printf("Column %d: %s is an integer equal to %d\n", $index, $column->getName(), $column->getValue());
122+
break;
123+
case Riak\TimeSeries\Cell::DOUBLE_TYPE:
124+
printf("Column %d: %s is a double equal to %d\n", $index, $column->getName(), $column->getValue());
125+
break;
126+
case Riak\TimeSeries\Cell::BOOL_TYPE:
127+
printf("Column %d: %s is a boolean equal to %s\n", $index, $column->getName(), $column->getValue());
128+
break;
129+
case Riak\TimeSeries\Cell::TIMESTAMP_TYPE:
130+
printf("Column %d: %s is a timestamp equal to %d\n", $index, $column->getName(), $column->getValue());
131+
break;
132+
default:
133+
printf("Column %d: %s is a string equal to %s\n", $index, $column->getName(), $column->getValue());
134+
break;
135+
}
136+
}
137+
138+
139+
# query for data
140+
$response = (new Command\Builder\TimeSeries\Query($riak))
141+
->withQuery("select * from GeoCheckins where family = 'family1' and series = 'myseries1' and (time > 1420113500 and time < 1420116000)")
142+
->build()
143+
->execute();
144+
145+
# output rows
146+
foreach ($response->getResults() as $row_index => $row) {
147+
foreach ($row as $column_index => $column) {
148+
switch ($column->getType()) {
149+
case Riak\TimeSeries\Cell::INT_TYPE:
150+
printf("Column %d: %s is an integer equal to %d\n", $index, $column->getName(), $column->getValue());
151+
break;
152+
case Riak\TimeSeries\Cell::DOUBLE_TYPE:
153+
printf("Column %d: %s is a double equal to %d\n", $index, $column->getName(), $column->getValue());
154+
break;
155+
case Riak\TimeSeries\Cell::BOOL_TYPE:
156+
printf("Column %d: %s is a boolean equal to %s\n", $index, $column->getName(), $column->getValue());
157+
break;
158+
case Riak\TimeSeries\Cell::TIMESTAMP_TYPE:
159+
printf("Column %d: %s is a timestamp equal to %d\n", $index, $column->getName(), $column->getValue());
160+
break;
161+
default:
162+
printf("Column %d: %s is a string equal to %s\n", $index, $column->getName(), $column->getValue());
163+
break;
164+
}
165+
}
166+
}
167+
168+
169+
# delete a row
170+
$response = (new Command\Builder\TimeSeries\DeleteRow($riak))
171+
->atKey([
172+
(new Cell("family"))->setValue("family1"),
173+
(new Cell("series"))->setValue("series1"),
174+
(new Cell("time"))->setTimestampValue(1420113600),
175+
])
176+
->inTable('GeoCheckins')
177+
->build()
178+
->execute();
179+
180+
if (!$response->isSuccess()) {
181+
echo $response->getMessage();
182+
exit;
183+
}

phpunit.xml

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<testsuite name="Functional Tests">
99
<directory>tests/functional</directory>
1010
<exclude>tests/functional/SecurityFeaturesTest.php</exclude>
11+
<exclude>tests/functional/TimeSeriesOperationsTest.php</exclude>
1112
</testsuite>
1213
<testsuite name="Scenario Tests">
1314
<directory>tests/scenario</directory>
@@ -18,4 +19,4 @@
1819
<directory>./src</directory>
1920
</whitelist>
2021
</filter>
21-
</phpunit>
22+
</phpunit>

src/Riak.php

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
*/
3333
class Riak
3434
{
35+
const VERSION = "2.0.3";
36+
3537
/**
3638
* Riak server ring
3739
*

src/Riak/Api/Http.php

+36
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Basho\Riak\Node;
1414
use Basho\Riak\Object;
1515
use Basho\Riak\Search\Doc;
16+
use Basho\Riak\TimeSeries\Cell;
1617

1718
/**
1819
* Handles communications between end user app & Riak via Riak HTTP API using cURL
@@ -31,6 +32,7 @@ class Http extends Api implements ApiInterface
3132
const CONTENT_TYPE_JSON = 'application/json';
3233
const CONTENT_TYPE_XML = 'application/xml';
3334

35+
const TS_API_PREFIX = '/ts/v1';
3436

3537
/**
3638
* Request body to be sent
@@ -251,6 +253,20 @@ protected function buildPath()
251253
case 'Basho\Riak\Command\Object\FetchPreflist':
252254
$this->path = sprintf('/types/%s/buckets/%s/keys/%s/preflist', $bucket->getType(), $bucket->getName(), $key);
253255
break;
256+
case 'Basho\Riak\Command\TimeSeries\Fetch':
257+
case 'Basho\Riak\Command\TimeSeries\Delete':
258+
/** @var $command Command\TimeSeries\Fetch */
259+
$command = $this->command;
260+
$this->path = sprintf('%s/tables/%s/keys/%s', static::TS_API_PREFIX, $command->getTable(), implode("/", $command->getData()));
261+
break;
262+
case 'Basho\Riak\Command\TimeSeries\Store':
263+
/** @var $command Command\TimeSeries\Store */
264+
$command = $this->command;
265+
$this->path = sprintf('%s/tables/%s/keys', static::TS_API_PREFIX, $command->getTable());
266+
break;
267+
case 'Basho\Riak\Command\TimeSeries\Query\Fetch':
268+
$this->path = sprintf('%s/query', static::TS_API_PREFIX);
269+
break;
254270
default:
255271
$this->path = '';
256272
}
@@ -752,6 +768,26 @@ protected function parseResponse()
752768
case 'Basho\Riak\Command\Stats':
753769
$response = new Command\Stats\Response($this->success, $this->statusCode, $this->error, json_decode($body, true));
754770
break;
771+
case 'Basho\Riak\Command\TimeSeries\Fetch':
772+
$row = in_array($this->statusCode, ['200','201','204']) ? json_decode($body, true) : [];
773+
$response = new Command\TimeSeries\Response($this->success, $this->statusCode, $this->error, [$row]);
774+
break;
775+
case 'Basho\Riak\Command\TimeSeries\Query\Fetch':
776+
$results = in_array($this->statusCode, ['200','204']) ? json_decode($body) : [];
777+
$rows = [];
778+
if (isset($results->rows)) {
779+
foreach ($results->rows as $row) {
780+
$cells = [];
781+
foreach ($results->columns as $index => $column) {
782+
$cells[$column] = $row[$index];
783+
}
784+
$rows[] = $cells;
785+
}
786+
}
787+
$response = new Command\TimeSeries\Query\Response($this->success, $this->statusCode, $this->error, $rows);
788+
break;
789+
case 'Basho\Riak\Command\TimeSeries\Store':
790+
case 'Basho\Riak\Command\TimeSeries\Delete':
755791
case 'Basho\Riak\Command\Object\Delete':
756792
case 'Basho\Riak\Command\Bucket\Delete':
757793
case 'Basho\Riak\Command\Search\Index\Delete':

src/Riak/Command/Builder.php

+3
Original file line numberDiff line numberDiff line change
@@ -127,5 +127,8 @@ protected function required($objectName)
127127
if (is_object($value) && $value instanceof $class === false) {
128128
throw new Builder\Exception("Expected instance of {$class}, received instance of " . get_class($value));
129129
}
130+
if (is_array($value) && count($value) == 0) {
131+
throw new Builder\Exception("Expected non-empty array value for {$objectName}");
132+
}
130133
}
131134
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace Basho\Riak\Command\Builder\TimeSeries;
4+
5+
use Basho\Riak;
6+
use Basho\Riak\Command;
7+
8+
/**
9+
* @author Christopher Mancini <cmancini at basho d0t com>
10+
*/
11+
class DeleteRow extends Command\Builder implements Command\BuilderInterface
12+
{
13+
use TableTrait;
14+
use KeyTrait;
15+
16+
/**
17+
* {@inheritdoc}
18+
*
19+
* @return Command\TimeSeries\Store
20+
*/
21+
public function build()
22+
{
23+
$this->validate();
24+
25+
return new Command\TimeSeries\Delete($this);
26+
}
27+
28+
/**
29+
* {@inheritdoc}
30+
*/
31+
public function validate()
32+
{
33+
$this->required('Key');
34+
$this->required('Table');
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Basho\Riak\Command\Builder\TimeSeries;
4+
5+
use Basho\Riak;
6+
use Basho\Riak\Command;
7+
8+
/**
9+
* @author Christopher Mancini <cmancini at basho d0t com>
10+
*/
11+
class DescribeTable extends Command\Builder\TimeSeries\Query implements Command\BuilderInterface
12+
{
13+
/**
14+
* Which table do you want to describe?
15+
*
16+
* @param $table
17+
*/
18+
public function withTable($table)
19+
{
20+
if ($table) {
21+
$this->query = "DESCRIBE {$table}";
22+
}
23+
24+
return $this;
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace Basho\Riak\Command\Builder\TimeSeries;
4+
5+
use Basho\Riak;
6+
use Basho\Riak\Command;
7+
8+
/**
9+
* @author Christopher Mancini <cmancini at basho d0t com>
10+
*/
11+
class FetchRow extends Command\Builder implements Command\BuilderInterface
12+
{
13+
use TableTrait;
14+
use KeyTrait;
15+
16+
/**
17+
* {@inheritdoc}
18+
*
19+
* @return Command\TimeSeries\Store
20+
*/
21+
public function build()
22+
{
23+
$this->validate();
24+
25+
return new Command\TimeSeries\Fetch($this);
26+
}
27+
28+
/**
29+
* {@inheritdoc}
30+
*/
31+
public function validate()
32+
{
33+
$this->required('Key');
34+
$this->required('Table');
35+
}
36+
}

0 commit comments

Comments
 (0)