Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { apm } from '../apm';
import { Instance } from '../apm/instance';
import { elasticsearchSpan, redisSpan, sqliteSpan, Span } from '../apm/span';
import { Transaction } from '../apm/transaction';
import { generateShortId } from '../utils/generate_id';

const ENVIRONMENT = 'Synthtrace: service_map';

Expand Down Expand Up @@ -116,6 +117,7 @@ export interface ServiceMapOpts {
services: Array<string | { [serviceName: string]: AgentName }>;
definePaths: (services: Instance[]) => PathDef[];
environment?: string;
rootWithParent?: boolean;
}

export function serviceMap(options: ServiceMapOpts) {
Expand Down Expand Up @@ -145,7 +147,10 @@ export function serviceMap(options: ServiceMapOpts) {
.transaction({ transactionName, transactionType: 'request' })
.timestamp(timestamp)
.duration(1000)
.children(...getChildren(tracePath, firstTraceItem.serviceInstance, timestamp, index));
.children(...getChildren(tracePath, firstTraceItem.serviceInstance, timestamp, index))
.defaults({
'parent.id': options.rootWithParent ? generateShortId() : undefined,
});

if ('transaction' in traceDef && traceDef.transaction) {
return traceDef.transaction(transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,90 +178,86 @@ export async function fetchServicePathsFromTraceIds({
}

def processAndReturnEvent(def context, def eventId) {
def stack = new Stack();
def reprocessQueue = new LinkedList();

// Avoid reprocessing the same event
def pathStack = new Stack();
def visited = new HashSet();

stack.push(eventId);
def event = context.eventsById.get(eventId);

if (event == null) {
return null;
}

pathStack.push(eventId);

// build a stack with the path from the current event to the root
def parentId = event['parent.id'];
while (parentId != null && !parentId.equals(eventId)) {
def parent = context.eventsById.get(parentId);
if (parent == null || visited.contains(parentId)) {
break;
}

pathStack.push(parentId);
visited.add(parentId);
parentId = parent['parent.id'];
}

while (!stack.isEmpty()) {
def currentEventId = stack.pop();
def event = context.eventsById.get(currentEventId);
// pop the stack starting from the root to current event to build the path
while (!pathStack.isEmpty()) {
def currentEventId = pathStack.pop();
def currentEvent = context.eventsById.get(currentEventId);

def basePath = new ArrayList();

if (event == null || context.processedEvents.get(currentEventId) != null) {
if (currentEvent == null || context.processedEvents.get(currentEventId) != null) {
continue;
}
visited.add(currentEventId);

def service = new HashMap();
service['service.name'] = event['service.name'];
service['service.environment'] = event['service.environment'];
service['agent.name'] = event['agent.name'];

def basePath = new ArrayList();
def parentId = event['parent.id'];

if (parentId != null && !parentId.equals(currentEventId)) {
def parent = context.processedEvents.get(parentId);

if (parent == null) {

// Only adds the parentId to the stack if it hasn't been visited to prevent infinite loop scenarios
// if the parent is null, it means it hasn't been processed yet or it could also mean that the current event
// doesn't have a parent, in which case we should skip it
if (!visited.contains(parentId)) {
stack.push(parentId);
// Add currentEventId to be reprocessed once its parent is processed
reprocessQueue.add(currentEventId);
}


continue;
}
service['service.name'] = currentEvent['service.name'];
service['service.environment'] = currentEvent['service.environment'];
service['agent.name'] = currentEvent['agent.name'];

def currentParentId = currentEvent['parent.id'];
def parent = currentParentId != null ? context.processedEvents.get(currentParentId) : null;

if (parent != null) {
// copy the path from the parent
basePath.addAll(parent.path);
// flag parent path for removal, as it has children
context.locationsToRemove.add(parent.path);

// if the parent has 'span.destination.service.resource' set, and the service is different, we've discovered a service
if (parent['span.destination.service.resource'] != null
&& parent['span.destination.service.resource'] != ""
&& (parent['service.name'] != event['service.name']
|| parent['service.environment'] != event['service.environment'])
&& (parent['service.name'] != currentEvent['service.name']
|| parent['service.environment'] != currentEvent['service.environment'])
) {
def parentDestination = getDestination(parent);
context.externalToServiceMap.put(parentDestination, service);
}
}

def lastLocation = basePath.size() > 0 ? basePath[basePath.size() - 1] : null;
def currentLocation = service;

// only add the current location to the path if it's different from the last one
if (lastLocation == null || !lastLocation.equals(currentLocation)) {
basePath.add(currentLocation);
}

// if there is an outgoing span, create a new path
if (event['span.destination.service.resource'] != null
&& !event['span.destination.service.resource'].equals("")) {
if (currentEvent['span.destination.service.resource'] != null
&& !currentEvent['span.destination.service.resource'].equals("")) {

def outgoingLocation = getDestination(event);
def outgoingLocation = getDestination(currentEvent);
def outgoingPath = new ArrayList(basePath);
outgoingPath.add(outgoingLocation);
context.paths.add(outgoingPath);
}

event.path = basePath;
context.processedEvents[currentEventId] = event;

// reprocess events which were waiting for their parents to be processed
while (!reprocessQueue.isEmpty()) {
stack.push(reprocessQueue.remove());
}
currentEvent.path = basePath;
context.processedEvents[currentEventId] = currentEvent;
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ApmSynthtraceEsClient } from '@kbn/apm-synthtrace';
import expect from 'expect';
import { serviceMap, timerange } from '@kbn/apm-synthtrace-client';
import { Readable } from 'node:stream';
import { APIReturnType } from '@kbn/apm-plugin/public/services/rest/create_call_apm_api';
import type { SupertestReturnType } from '../../../../../../apm_api_integration/common/apm_api_supertest';
import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context';

Expand Down Expand Up @@ -152,5 +153,84 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(response.body.elements.length).toBeGreaterThan(0);
});
});

describe('Root transaction with parent.id', () => {
let synthtraceEsClient: ApmSynthtraceEsClient;

before(async () => {
synthtraceEsClient = await synthtrace.createApmSynthtraceEsClient();

const events = timerange(start, end)
.interval('10s')
.rate(3)
.generator(
serviceMap({
services: [
{ 'frontend-rum': 'rum-js' },
{ 'frontend-node': 'nodejs' },
{ advertService: 'java' },
],
definePaths([rum, node, adv]) {
return [
[
[rum, 'fetchAd'],
[node, 'GET /nodejs/adTag'],
[adv, 'APIRestController#getAd'],
['elasticsearch', 'GET ad-*/_search'],
],
];
},
rootWithParent: true,
})
);

return synthtraceEsClient.index(Readable.from(Array.from(events)));
});

after(async () => {
await synthtraceEsClient.clean();
});

it('returns service map complete path', async () => {
const { body, status } = await apmApiClient.readUser({
endpoint: 'GET /internal/apm/service-map',
params: {
query: {
start: new Date(start).toISOString(),
end: new Date(end).toISOString(),
environment: 'ENVIRONMENT_ALL',
},
},
});

expect(status).toBe(200);

const { nodes, edges } = partitionElements(body.elements);

expect(getIds(nodes)).toEqual([
'>elasticsearch',
'advertService',
'frontend-node',
'frontend-rum',
]);
expect(getIds(edges)).toEqual([
'advertService~>elasticsearch',
'frontend-node~advertService',
'frontend-rum~frontend-node',
]);
});
});
});
}

type ConnectionElements = APIReturnType<'GET /internal/apm/service-map'>['elements'];

function partitionElements(elements: ConnectionElements) {
const edges = elements.filter(({ data }) => 'source' in data && 'target' in data);
const nodes = elements.filter((element) => !edges.includes(element));
return { edges, nodes };
}

function getIds(elements: ConnectionElements) {
return elements.map(({ data }) => data.id).sort();
}