Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added UI Output section #506

Open
wants to merge 1 commit into
base: feature/add_output_buffer_rt_control
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ func GetOutputJSONInfo(id string) ([]byte, error) {
}

// GetDeviceStats returns a map with the basic info of each device.
func GetOutputStats() map[string]*output.InfluxStats {
outstats := make(map[string]*output.InfluxStats)
func GetOutputStats() map[string]*output.SinkDB {
out := make(map[string]*output.SinkDB)
mutex.RLock()
defer mutex.RUnlock()
for k, v := range outputdb {
outstats[k] = v.GetBasicStats()
out[k] = v.GetSinkDBBasicStats()
}
mutex.RUnlock()
return outstats
return out
}

// GetDeviceStats returns a map with the basic info of each device.
Expand Down
86 changes: 60 additions & 26 deletions pkg/agent/output/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type SinkDB struct {
smutex sync.Mutex
statsData sync.RWMutex

dummy bool
iChan chan *client.BatchPoints
client client.Client
OutInfo string
PingTime time.Duration
Node *bus.Node `json:"-"`
Active bool
dummy bool
iChan chan *client.BatchPoints
client client.Client
OutInfo string
PingTime time.Duration
Node *bus.Node `json:"-"`
Active bool
EnqueueOnWriteError bool
}

// DummyDB a BD struct needed if no database configured
Expand Down Expand Up @@ -102,6 +103,13 @@ func (db *SinkDB) GetBasicStats() *InfluxStats {
return db.Stats
}

// GetBasicStats get basic info for this device
func (db *SinkDB) GetSinkDBBasicStats() *SinkDB {
db.statsData.RLock()
defer db.statsData.RUnlock()
return db
}

// GetBasicStats get basic info for this device
func (db *SinkDB) getBasicStats() *InfluxStats {
stat := db.stats.ThSafeCopy()
Expand All @@ -110,6 +118,8 @@ func (db *SinkDB) getBasicStats() *InfluxStats {

// GetResetStats return outdb stats and reset its counters
func (db *SinkDB) GetResetStats() *InfluxStats {
db.statsData.RLock()
defer db.statsData.RUnlock()
if db.dummy == true {
log.Debug("Reseting Influxstats for DUMMY DB ")
return &InfluxStats{}
Expand Down Expand Up @@ -365,29 +375,45 @@ func (db *SinkDB) sendBatchPoint(data *client.BatchPoints, enqueueonerror bool)
nf += len(fields)
}
}
// keep trying until we get it (don't drop the data)
startSend := time.Now()
err := db.client.Write(*data)
elapsedSend := time.Since(startSend)

bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize)
log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent)
if err != nil {
db.stats.WriteErrUpdate(elapsedSend, bufferPercent)
log.Errorf("ERROR on Write batchPoint in DB %s (%d points) | elapsed : %s | Error: %s ", db.cfg.ID, np, elapsedSend.String(), err)
// If the queue is not full we will resend after a while
// Check if the output is active
if !db.Active {
// Enqueue on error is active, try to store data on buffer
if enqueueonerror {
log.Debug("queing data again...")
if len(db.iChan) < db.cfg.BufferSize {
db.iChan <- data
bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize)
log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent)
db.stats.WriteErrUpdate(0, bufferPercent)
time.Sleep(time.Duration(db.cfg.TimeWriteRetry) * time.Second)
}
} else {
// Drop data if enqueue on error is false and output is not active
log.Infof("Skipped point on output %s, output is not active and enqueue on error is false", db.cfg.ID)
}
} else {
log.Debugf("OK on Write batchPoint in DB %s (%d points) | elapsed : %s ", db.cfg.ID, np, elapsedSend.String())
db.stats.WriteOkUpdate(int64(np), int64(nf), elapsedSend, bufferPercent)
}
// keep trying until we get it (don't drop the data)
startSend := time.Now()
err := db.client.Write(*data)
elapsedSend := time.Since(startSend)

bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize)
log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent)
if err != nil {
db.stats.WriteErrUpdate(elapsedSend, bufferPercent)
log.Errorf("ERROR on Write batchPoint in DB %s (%d points) | elapsed : %s | Error: %s ", db.cfg.ID, np, elapsedSend.String(), err)
// If the queue is not full we will resend after a while
if enqueueonerror {
log.Debug("queing data again...")
if len(db.iChan) < db.cfg.BufferSize {
db.iChan <- data
time.Sleep(time.Duration(db.cfg.TimeWriteRetry) * time.Second)
}
}
} else {
log.Debugf("OK on Write batchPoint in DB %s (%d points) | elapsed : %s ", db.cfg.ID, np, elapsedSend.String())
db.stats.WriteOkUpdate(int64(np), int64(nf), elapsedSend, bufferPercent)
}
}
}

func (db *SinkDB) resetBuffer(length int) {
Expand All @@ -410,14 +436,14 @@ func (db *SinkDB) startSenderGo(r int, wg *sync.WaitGroup) {
defer wg.Done()

time.Sleep(5)
EnqueueOnWriteError := db.cfg.EnqueueOnWriteError
db.EnqueueOnWriteError = db.cfg.EnqueueOnWriteError

log.Infof("beginning Influx Sender thread: [%s]", db.cfg.ID)
for {
select {
case data := <-db.iChan:
if !db.Active {
log.Warn("skip send: Output not active")
db.sendBatchPoint(data, db.EnqueueOnWriteError)
continue
}
if data == nil {
Expand All @@ -429,18 +455,26 @@ func (db *SinkDB) startSenderGo(r int, wg *sync.WaitGroup) {
continue
}

db.sendBatchPoint(data, EnqueueOnWriteError)
db.sendBatchPoint(data, db.EnqueueOnWriteError)
case val := <-db.Node.Read:
log.Infof("Received Message...%s: %+v", val.Type, val.Data)
switch val.Type {
case "resetbuffer":
db.statsData.Lock()
db.resetBuffer(db.cfg.BufferSize)
db.statsData.Unlock()
case "flushbuffer":
db.statsData.Lock()
db.flushBuffer()
db.statsData.Unlock()
case "setactive":
db.statsData.Lock()
db.Active = val.Data.(bool)
db.statsData.Unlock()
case "enqueue_policy_change":
EnqueueOnWriteError = val.Data.(bool)
db.statsData.Lock()
db.EnqueueOnWriteError = val.Data.(bool)
db.statsData.Unlock()
case "exit":
log.Infof("invoked Force Exit")
//need to flush all data
Expand Down
8 changes: 3 additions & 5 deletions pkg/webui/apirt-output.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ func NewAPIRtOutput(m *macaron.Macaron) error {
func RTOutputBufferAction(ctx *Context) {
id := ctx.Params(":id")
action := ctx.Params(":action")
log.Infof("activating runtime on device %s", id)
log.Infof("apply action: %s on output %s runtime", action, id)
out, err := agent.GetOutput(id)
if err != nil {
ctx.JSON(404, err.Error())
return
}
log.Infof("activating runtime on device %s", id)
out.Action(action)
ctx.JSON(200, "OK")
}
Expand All @@ -49,14 +48,13 @@ func RTGetOutputInfo(ctx *Context) {
ctx.JSON(404, err.Error())
return
}

log.Infof("get runtime data from id %s", id)
ctx.RawAsJSON(200, json)

//get only one device info
} else {
devstats := agent.GetOutputStats()
ctx.JSON(200, &devstats)
outputs := agent.GetOutputStats()
ctx.JSON(200, &outputs)
}
return
}
10 changes: 7 additions & 3 deletions src/common/elapsedseconds.pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import {Pipe, PipeTransform} from '@angular/core';
@Pipe({ name: 'elapsedseconds' })
export class ElapsedSecondsPipe implements PipeTransform {
transform(value: any, ...args: string[]): string {
if (typeof args === 'undefined' || args.length !== 1) {
if (typeof args === 'undefined' || args.length < 1) {
throw new Error('ElapsedSecondsPipe: missing required decimals');
}

return this.toSeconds(value, args[0] ,0);
return this.toSeconds(value, args[0], args[1], 0);
}
//from kbn.js in grafana project
toFixed(value , decimals ) {
Expand Down Expand Up @@ -44,9 +44,13 @@ export class ElapsedSecondsPipe implements PipeTransform {
}
}

toSeconds(size , decimals, scaledDecimals) {
toSeconds(size , decimals, units = "seconds", scaledDecimals) {
if (size === null) { return ""; }

if (units == "ns") {
size = size * 1.e-9
}

// Less than 1 µs, devide in ns
if (Math.abs(size) < 0.000001) {
return this.toFixedScaled(size * 1.e9, decimals, scaledDecimals - decimals, -9, " ns");
Expand Down
8 changes: 8 additions & 0 deletions src/common/ng-table/components/table/ng-table.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export class NgTableComponent {
@Input() public showCustom: boolean = false;
@Input() public showConnection: boolean = false;
@Input() public showStatus: boolean = false;
@Input() public showOutput: boolean = false;
@Input() public editMode: boolean = false;
@Input() public exportType: string;
@Input() public extraActions: Array<any>;
Expand Down Expand Up @@ -112,6 +113,13 @@ export class NgTableComponent {
let test = new ElapsedSecondsPipe().transform(html,'3');
html = test.toString();
}
if (transform === "elapsednanoseconds") {
let test = new ElapsedSecondsPipe().transform(html,'3','ns');
html = test.toString();
}
if (transform === "percent") {
html = html.toString() + " %"
}
if (typeof html === 'object') {
var test: any = '<ul class="list-unstyled">';
for (var item of html) {
Expand Down
15 changes: 11 additions & 4 deletions src/common/ng-table/components/table/ng-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
</th>
</ng-container>
</ng-container>
<th *ngFor="let column of columns" [ngTableSorting]="config" [column]="column" (sortChanged)="onChangeTable($event)" ngClass="{{column.className || ''}}" style="vertical-align: middle; text-align: center; width:auto !important; white-space:nowrap" container=body [tooltip]="column.tooltipInfo ? tooltipValues : ( column.tooltip ? column.name : '')">
<th *ngFor="let column of columns" [ngTableSorting]="config" [column]="column" (sortChanged)="onChangeTable($event)" ngClass="{{column.className || ''}}" style="vertical-align: middle; text-align: center; width:auto !important; white-space:nowrap" container=body [tooltip]="column.tooltipInfo ? tooltipValues : ( column.tooltip ? column.tooltip : column.name)">
<ng-template #tooltipValues>
<ng-container>
<h6> {{column.title}} </h6>
Expand Down Expand Up @@ -52,7 +52,7 @@ <h6> {{column.title}} </h6>
</thead>
<tbody>
<tr *ngIf="showFilterRow ">
<td *ngIf="(tableRole === 'fulledit' || tableRole === 'viewdelete' || tableRole === 'view') && showStatus === false" style="width:100px ">
<td *ngIf="(tableRole === 'fulledit' || tableRole === 'viewdelete' || tableRole === 'view') && showStatus === false && showOutput === false" style="width:100px ">
</td>
<td *ngIf="tableRole === 'fulledit' || tableRole === 'viewdelete' || tableRole === 'view'" style="width:100px ">
</td>
Expand All @@ -65,7 +65,7 @@ <h6> {{column.title}} </h6>
<td *ngIf="editMode==true ">
<i [ngClass]="checkItems(row.ID) ? [ 'glyphicon glyphicon-unchecked text-danger'] : [ 'glyphicon glyphicon-check text-success'] " (click)="selectItem(row) "></i>
</td>
<td *ngIf="(tableRole === 'fulledit' || tableRole === 'viewdelete' || tableRole === 'view') && showStatus === false">
<td *ngIf="(tableRole === 'fulledit' || tableRole === 'viewdelete' || tableRole === 'view') && showStatus === false && showOutput === false">
<i *ngFor="let action of roleActions" [ngClass]="action.icon" [tooltip]="action.tooltip" (click)="customClick(action.name, row)" style="padding-left:5px">
</i>
</td>
Expand All @@ -84,6 +84,13 @@ <h6> {{column.title}} </h6>
</span>
<label style="display: inline; margin-right: 2px " container=body [tooltip]="row.DeviceConnected ? 'Connected' : 'Not connected' " [ngClass]="row.DeviceConnected ? 'glyphicon glyphicon-globe label label-success' :
'glyphicon glyphicon-warning-sign label label-danger' "></label>
</td>
<td *ngIf="showOutput==true " style="min-width: 50px ">
<span style="padding-left: 12px ">
<label style="display: inline; margin-right: 2px " container=body [tooltip]="row.Active ? 'Active' : 'Not active' " [ngClass]="row.Active ? 'glyphicon glyphicon-play label label-success' :
'glyphicon glyphicon-pause label label-danger' "></label>
</span>

</td>
<ng-container *ngIf="checkRows ">
<td style="width: 80px; text-align: center; padding-top: 15px; " [ngClass]="row.valid===true? [ 'bg-success'] : [ 'bg-warning'] ">
Expand Down Expand Up @@ -120,7 +127,7 @@ <h6> Metric:{{column.name}}</h6>
<label *ngIf="i.type=='boolean-label'"role="button" [tooltip]= "i.tooltip" container=body (click)="extraActionClick(row,i.action,i.property) " [ngClass]="row[i.property] ? [ 'label label-success'] : [ 'label label-danger'] " [innerHtml]="row[i.property] ? i['enabled'] : i['disabled'] "></label>
<button *ngIf="i.type=='boolean'" [tooltip]= "i.tooltip" container=body (click)="extraActionClick(row,i.action,i.property) " [ngClass]="row[i.property] ? [ 'btn btn-success'] : [ 'btn btn-danger'] " [innerHtml]="row[i.property] ? i['enabled'] : i['disabled'] "></button>
<label *ngIf="i.type=='button-label'" role="button" [tooltip]= "i.tooltip" container=body class="label label-primary " (click)="extraActionClick(row,i.action)" [innerHtml]="i['text']"></label>
<button *ngIf="i.type=='button'" [tooltip]= "i.tooltip" container=body class="btn btn-primary " (click)="extraActionClick(row,i.action)" [innerHtml]="i['text']"></button>
<button *ngIf="i.type=='button'" [tooltip]= "i.tooltip" container=body class="btn btn-{{i.color? i.color : 'primary'}}" (click)="extraActionClick(row,i.action)" [innerHtml]="i['text']"></button>
</ng-container>
</td>
</ng-template>
Expand Down
7 changes: 5 additions & 2 deletions src/home/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@
<ng-template ngSwitchCase="snmpdevice">
<snmpdevs></snmpdevs>
</ng-template>
<ng-template ngSwitchCase="runtime">
<runtime #RuntimeComponent></runtime>
<ng-template ngSwitchCase="runtime-device">
<runtime-device #RuntimeDeviceComponent></runtime-device>
</ng-template>
<ng-template ngSwitchCase="runtime-output">
<runtime-output #RuntimeOutputComponent></runtime-output>
</ng-template>
<ng-template ngSwitchCase="varcatalog">
<varcatalog></varcatalog>
</ng-template>
Expand Down
7 changes: 4 additions & 3 deletions src/home/home.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class Home {
@ViewChild('importFileModal') public importFileModal : ImportFileModal;
@ViewChild('exportBulkFileModal') public exportBulkFileModal : ExportFileModal;
@ViewChild('aboutModal') public aboutModal : AboutModal;
@ViewChild('RuntimeComponent') public rt : any;
@ViewChild('RuntimeDeviceComponent') public rt : any;
nativeWindow: any
response: string;
api: string;
Expand All @@ -44,7 +44,8 @@ export class Home {
];

runtimeItems : Array<any> = [
{'title': 'Device status', 'selector' : 'runtime'},
{'title': 'Device status', 'selector' : 'runtime-device'},
{'title': 'Output status', 'selector' : 'runtime-output'},
];

mode : boolean = false;
Expand All @@ -56,7 +57,7 @@ export class Home {
constructor(private winRef: WindowRef,public router: Router, public httpAPI: HttpService, private _blocker: BlockUIService, public homeService: HomeService) {
this.nativeWindow = winRef.nativeWindow;
this.getFooterInfo();
this.item_type= "runtime";
this.item_type= "runtime-device";
}

link(url: string) {
Expand Down
6 changes: 4 additions & 2 deletions src/main.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import { InfluxMeasCfgComponent } from './influxmeas/influxmeascfg.component';
import { MeasGroupCfgComponent } from './measgroup/measgroupcfg.component';
import { MeasFilterCfgComponent } from './measfilter/measfiltercfg.component';
import { InfluxServerCfgComponent } from './influxserver/influxservercfg.component';
import { RuntimeComponent } from './runtime/runtime.component';
import { RuntimeDeviceComponent } from './runtime_device/runtime_device.component';
import { RuntimeOutputComponent } from './runtime_output/runtime_output.component';
import { CustomFilterCfgComponent } from './customfilter/customfiltercfg.component';
import { BlockUIService } from './common/blockui/blockui-service';

Expand Down Expand Up @@ -83,7 +84,8 @@ import { SpinnerComponent } from './common/spinner';
CustomFilterCfgComponent,
VarCatalogCfgComponent,
TableListComponent,
RuntimeComponent,
RuntimeDeviceComponent,
RuntimeOutputComponent,
GenericModal,
AboutModal,
ExportFileModal,
Expand Down
Loading