Skip to content

Commit

Permalink
Merge pull request #3052 from witekest/merged_protocols
Browse files Browse the repository at this point in the history
Add IPv4 and IPv6 handling in one file
  • Loading branch information
Vogtinator committed Feb 21, 2024
2 parents 9919e02 + 1e4d1e0 commit 84b76ec
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 65 deletions.
147 changes: 108 additions & 39 deletions metrics/access/aggregate.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/php
<?php

use InfluxDB\Point;
use InfluxDB\Database;
use InfluxDB2\Client;
use InfluxDB2\Point;

$CACHE_DIR = $_SERVER['HOME'] . '/.cache/openSUSE-release-tools/metrics-access';
const PROTOCOLS = ['ipv4', 'ipv6'];
const DOWNLOAD_OPENSUSE_ORG = 'https://download.opensuse.org/logs';
const PONTIFEX = 'http://pontifex.infra.opensuse.org/logs';
const BACKUP = 'http://backup.infra.opensuse.org';
const LANGLEY = 'http://langley.suse.de/pub/pontifex%s-opensuse.suse.de';
Expand All @@ -19,22 +20,24 @@
$begin->sub(date_interval_create_from_date_string('1 day'));
$source_map = [
'ipv4' => [
// the first item defines the starting date for aggregation
'2023-01-01' => false,
//'2014-04-14' => sprintf(LANGLEY, 2) . '/' . VHOST,
//'2017-12-04' => sprintf(LANGLEY, 3) . '/' . VHOST,
'2023-06-18' => BACKUP . '/' . VHOST,
$begin->format('Y-m-d') => PONTIFEX . '/' . VHOST,
'2023-11-13' => DOWNLOAD_OPENSUSE_ORG . '/' . VHOST,
'filename' => FILENAME,
],
'ipv6' => [
'2012-12-31' => false,
//'2017-12-04' => sprintf(LANGLEY, 3) . '/' . IPV6_PREFIX . VHOST,
'2023-06-18' => BACKUP . '/' . IPV6_PREFIX . VHOST,
$begin->format('Y-m-d') => PONTIFEX . '/' . IPV6_PREFIX . VHOST,
'2023-11-13' => DOWNLOAD_OPENSUSE_ORG . '/' . IPV6_PREFIX . VHOST,
'filename' => IPV6_PREFIX . FILENAME,
],
'ipv4+6' => [
'2023-11-13' => false,
$begin->format('Y-m-d') => DOWNLOAD_OPENSUSE_ORG . '/' . VHOST,
'filename' => FILENAME,
],
];
$end = new DateTime(key($source_map['ipv4'])); // decide about adding one day
$migration_date = new DateTime(key($source_map['ipv4+6']));
$period_reversed = date_period_reversed($end, '1 day', $begin);

error_log('begin: ' . $begin->format('Y-m-d'));
Expand All @@ -53,9 +56,11 @@ function cache_init()
foreach (PROTOCOLS as $protocol) {
mkdir("$CACHE_DIR/$protocol", 0755, true);
}
mkdir("$CACHE_DIR/ipv4+6", 0755, true);

// Avoid packaging mess while still automating, but not ideal.
passthru('cd ' . escapeshellarg($CACHE_DIR) . ' && composer require influxdb/influxdb-php ~1');
passthru('cd ' . escapeshellarg($CACHE_DIR) .
' && composer require influxdata/influxdb-client-php:~3.4 guzzlehttp/guzzle');
}

require "$CACHE_DIR/vendor/autoload.php";
Expand All @@ -68,17 +73,18 @@ function ingest_all($period_reversed, $source_map)
$found = [];
// Walk backwards until found in cache.
foreach ($period_reversed as $date) {
$date_string = $date->format('Y-m-d');
$date_string = print_date($date);
$protocols_on_day = get_protocols($date);

foreach (PROTOCOLS as $protocol) {
foreach ($protocols_on_day as $protocol) {
if (!empty($found[$protocol])) continue;
if (isset($source_map[$protocol][$date_string]))
$source[$protocol] = $source_map[$protocol][$date_string];

// Skip date+protocol if no source is available.
if (empty($source[$protocol])) continue;

$cache_file = "$CACHE_DIR/$protocol/$date_string.json";
$cache_file = get_cache_file($protocol, $date);
if (file_exists($cache_file)) {
error_log("[$date_string] [$protocol] found");
$found[$protocol] = true;
Expand All @@ -88,7 +94,8 @@ function ingest_all($period_reversed, $source_map)
}
}

if (count($found) == count(PROTOCOLS)) {
// Stop when all cache files were found
if (count($found) == count($protocols_on_day)) {
error_log('ingest initialization complete');
break;
}
Expand All @@ -98,6 +105,47 @@ function ingest_all($period_reversed, $source_map)
subprocess_wait(1, 1);
}

function print_date($date)
{
return $date->format('Y-m-d');
}

// Logs before migration date have been kept in separate files for IPv4 and IPv6 addresses
function has_separate_protocol_logs($date)
{
global $migration_date;
if ($date > $migration_date)
return false;
else
return true;
}

function get_cache_file($protocol, $date)
{
global $CACHE_DIR;
if (has_separate_protocol_logs($date))
return "$CACHE_DIR/$protocol/" . print_date($date) . ".json";
else
return "$CACHE_DIR/ipv4+6/" . print_date($date) . ".json";
}

function get_cache_files($date)
{
$files = [];
foreach (get_protocols($date) as $protocol)
array_push($files, get_cache_file($protocol, $date));

return $files;
}

function get_protocols($date)
{
if (has_separate_protocol_logs($date))
return PROTOCOLS;
else
return array("ipv4+6");
}

function ingest($date, $source, $filename, $destination)
{
$url = implode('/', [
Expand All @@ -107,7 +155,7 @@ function ingest($date, $source, $filename, $destination)
sprintf($filename, $date->format('Ymd')),
]);
$command = implode(' ', [
'curl -s',
'curl -s --digest --netrc',
escapeshellarg($url),
'| xzcat',
'| ' . __DIR__ . '/ingest.php',
Expand Down Expand Up @@ -159,32 +207,46 @@ function aggregate_all($period)
$merged_protocol = [];
$date_previous = null;
foreach ($period as $date) {
$date_string = $date->format('Y-m-d');
$date_string = print_date($date);

$data = null;
foreach (PROTOCOLS as $protocol) {
$cache_file = "$CACHE_DIR/$protocol/$date_string.json";
$cache_file = get_cache_file($protocol, $date);
if (!file_exists($cache_file) or !filesize($cache_file)) continue;

error_log("[$date_string] [$protocol] load cache");
error_log("[$date_string]" . (has_separate_protocol_logs($date) ? " [$protocol]" : "") . " load cache");
$data_new = json_decode(file_get_contents($cache_file), true);
if (!$data_new) {
error_log('ERROR: failed to load ' . $cache_file);
unlink($cache_file); // Trigger it to be re-ingested next run.
exit(1);
}

if (isset($data_new[$protocol])) {
// new cache files have 'ipv4' and 'ipv6' array keys
$data_protocol = $data_new[$protocol];
// we don't want to count 'total_invalid' and 'bytes' twice
if ($data) {
$data_protocol['total_invalid'] = 0;
$data_protocol['bytes'] = 0;
} else {
$data_protocol['total_invalid'] = $data_new['total_invalid'];
$data_protocol['bytes'] = $data_new['bytes'];
}
}
else
$data_protocol = $data_new;
if (!isset($merged_protocol[$protocol])) $merged_protocol[$protocol] = [];
$data_new['days'] = 1;
normalize($data_new);
aggregate($intervals, $merged_protocol[$protocol], $date, $date_previous, $data_new,
$data_protocol['days'] = 1;
normalize($data_protocol);
aggregate($intervals, $merged_protocol[$protocol], $date, $date_previous, $data_protocol,
['protocol' => $protocol], 'protocol');

if ($data) {
merge($data, $data_new);
merge($data, $data_protocol);
$data['days'] = 1;
} else {
$data = $data_new;
$data = $data_protocol;
}
}

Expand Down Expand Up @@ -213,12 +275,12 @@ function aggregate_all($period)
function aggregate($intervals, &$merged, $date, $date_previous, $data, $tags = [], $prefix = 'access')
{
foreach ($intervals as $interval => $format) {
if ($interval == 'FQ') {
if ($interval === 'FQ') {
$value = format_FQ($date);
if (isset($date_previous))
$value_previous = format_FQ($date_previous);
}
elseif ($interval == 'FY') {
elseif ($interval === 'FY') {
$value = format_FY($date);
if (isset($date_previous))
$value_previous = format_FY($date_previous);
Expand All @@ -231,7 +293,7 @@ function aggregate($intervals, &$merged, $date, $date_previous, $data, $tags = [
if (!isset($merged[$interval]) || $value != $merged[$interval]['value']) {
if (!empty($merged[$interval]['data'])) {
$summary = summarize($merged[$interval]['data']);
if ($prefix == 'protocol') {
if ($prefix === 'protocol') {
$summary = ['-' => $summary['-']];
}
$flavors = [];
Expand All @@ -248,7 +310,7 @@ function aggregate($intervals, &$merged, $date, $date_previous, $data, $tags = [
$count += write_flavors($interval, $date_previous, $flavors);
}

if ($prefix == 'access') {
if ($prefix === 'access') {
$summary = summarize_product_plus_key($merged[$interval]['data']['total_image_product']);
$count += write_summary_product_plus_key($interval, $date_previous, $summary, 'image');
}
Expand Down Expand Up @@ -313,7 +375,7 @@ function merge(&$data1, $data2)
if (empty($data1['total_product'][$product]))
$data1['total_product'][$product] = 0;

$data1['total_product'][$product] += $data2['total_product'][$product];
$data1['total_product'][$product] += $total;
}

merge_unique_products($data1['unique_product'], $data2['unique_product']);
Expand Down Expand Up @@ -451,8 +513,7 @@ function write_summary($interval, DateTime $value, $summary, $tags = [], $prefix
$measurement = $prefix . '_' . $interval;
$points = [];
foreach ($summary as $product => $fields) {
$points[] = new Point($measurement, null,
['product' => $product] + $tags, $fields, $value->getTimestamp());
$points[] = new Point($measurement, ['product' => $product] + $tags, $fields, $value->getTimestamp());
}
write($points);
return count($points);
Expand All @@ -465,7 +526,8 @@ function write_flavors($interval, DateTime $value, $flavors)
foreach ($flavors as $product => $unique_flavors) {
foreach($unique_flavors as $flavor => $unique_count) {
$tags = ['product' => $product, 'flavor' => $flavor];
$points[] = new Point($measurement, $unique_count, $tags, [], $value->getTimestamp());
$fields = ['value' => $unique_count];
$points[] = new Point($measurement, $tags, $fields, $value->getTimestamp());
}
}
write($points);
Expand All @@ -478,7 +540,7 @@ function write_summary_product_plus_key($interval, DateTime $date, $summary, $pr
$points = [];
foreach ($summary as $product => $pairs) {
foreach ($pairs as $key => $value) {
$points[] = new Point($measurement, null,
$points[] = new Point($measurement,
['product' => $product, 'key' => $key], ['value' => $value], $date->getTimestamp());
}
}
Expand All @@ -488,13 +550,20 @@ function write_summary_product_plus_key($interval, DateTime $date, $summary, $pr

function write($points)
{
static $database = null;

if (!$database) {
$database = InfluxDB\Client::fromDSN('influxdb://0.0.0.0:8086/osrt_access');
// $database->drop();
// $database->create();
static $client;
static $writeApi;

if (!$client) {
$client = new Client([
"url" => "http://localhost:8086",
"token" => "",
"bucket" => "osrt_access/autogen",
"org" => "-",
"precision" => InfluxDB2\Model\WritePrecision::S
]);
$writeApi = $client->createWriteApi();
}

if (!$database->writePoints($points, Database::PRECISION_SECONDS)) die('failed to write points');
if (!is_null($writeApi->write($points)))
die('failed to write points');
}
Loading

0 comments on commit 84b76ec

Please sign in to comment.