diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.auth.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.auth.md new file mode 100644 index 0000000000000..0eca831a9f8c7 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.auth.md @@ -0,0 +1,15 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [auth](./kibana-plugin-server.httpserversetup.auth.md) + +## HttpServerSetup.auth property + +Signature: + +```typescript +auth: { + get: AuthStateStorage['get']; + isAuthenticated: AuthStateStorage['isAuthenticated']; + getAuthHeaders: AuthHeadersStorage['get']; + }; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.basepath.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.basepath.md new file mode 100644 index 0000000000000..0354636d12d53 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.basepath.md @@ -0,0 +1,16 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [basePath](./kibana-plugin-server.httpserversetup.basepath.md) + +## HttpServerSetup.basePath property + +Signature: + +```typescript +basePath: { + get: (request: KibanaRequest | Request) => string; + set: (request: KibanaRequest | Request, basePath: string) => void; + prepend: (url: string) => string; + remove: (url: string) => string; + }; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.istlsenabled.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.istlsenabled.md new file mode 100644 index 0000000000000..729dce6d46071 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.istlsenabled.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [isTlsEnabled](./kibana-plugin-server.httpserversetup.istlsenabled.md) + +## HttpServerSetup.isTlsEnabled property + +Signature: + +```typescript +isTlsEnabled: boolean; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.md new file mode 100644 index 0000000000000..3efc18145600e --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.md @@ -0,0 +1,25 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) + +## HttpServerSetup interface + +Signature: + +```typescript +export interface HttpServerSetup +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [auth](./kibana-plugin-server.httpserversetup.auth.md) | {
get: AuthStateStorage['get'];
isAuthenticated: AuthStateStorage['isAuthenticated'];
getAuthHeaders: AuthHeadersStorage['get'];
} | | +| [basePath](./kibana-plugin-server.httpserversetup.basepath.md) | {
get: (request: KibanaRequest | Request) => string;
set: (request: KibanaRequest | Request, basePath: string) => void;
prepend: (url: string) => string;
remove: (url: string) => string;
} | | +| [isTlsEnabled](./kibana-plugin-server.httpserversetup.istlsenabled.md) | boolean | | +| [registerAuth](./kibana-plugin-server.httpserversetup.registerauth.md) | <T>(handler: AuthenticationHandler, cookieOptions: SessionStorageCookieOptions<T>) => Promise<{
sessionStorageFactory: SessionStorageFactory<T>;
}> | To define custom authentication and/or authorization mechanism for incoming requests. A handler should return a state to associate with the incoming request. The state can be retrieved later via http.auth.get(..) Only one AuthenticationHandler can be registered. | +| [registerOnPostAuth](./kibana-plugin-server.httpserversetup.registeronpostauth.md) | (handler: OnPostAuthHandler) => void | To define custom logic to perform for incoming requests. Runs the handler after Auth hook did make sure a user has access to the requested resource. The auth state is available at stage via http.auth.get(..) Can register any number of registerOnPreAuth, which are called in sequence (from the first registered to the last). | +| [registerOnPreAuth](./kibana-plugin-server.httpserversetup.registeronpreauth.md) | (handler: OnPreAuthHandler) => void | To define custom logic to perform for incoming requests. Runs the handler before Auth hook performs a check that user has access to requested resources, so it's the only place when you can forward a request to another URL right on the server. Can register any number of registerOnPostAuth, which are called in sequence (from the first registered to the last). | +| [registerRouter](./kibana-plugin-server.httpserversetup.registerrouter.md) | (router: Router) => void | | +| [server](./kibana-plugin-server.httpserversetup.server.md) | Server | | + diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.registerauth.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.registerauth.md new file mode 100644 index 0000000000000..7183d11c168ce --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.registerauth.md @@ -0,0 +1,15 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [registerAuth](./kibana-plugin-server.httpserversetup.registerauth.md) + +## HttpServerSetup.registerAuth property + +To define custom authentication and/or authorization mechanism for incoming requests. A handler should return a state to associate with the incoming request. The state can be retrieved later via http.auth.get(..) Only one AuthenticationHandler can be registered. + +Signature: + +```typescript +registerAuth: (handler: AuthenticationHandler, cookieOptions: SessionStorageCookieOptions) => Promise<{ + sessionStorageFactory: SessionStorageFactory; + }>; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpostauth.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpostauth.md new file mode 100644 index 0000000000000..ec01851f732c0 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpostauth.md @@ -0,0 +1,13 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [registerOnPostAuth](./kibana-plugin-server.httpserversetup.registeronpostauth.md) + +## HttpServerSetup.registerOnPostAuth property + +To define custom logic to perform for incoming requests. Runs the handler after Auth hook did make sure a user has access to the requested resource. The auth state is available at stage via http.auth.get(..) Can register any number of registerOnPreAuth, which are called in sequence (from the first registered to the last). + +Signature: + +```typescript +registerOnPostAuth: (handler: OnPostAuthHandler) => void; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpreauth.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpreauth.md new file mode 100644 index 0000000000000..ae7e1e08e3283 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.registeronpreauth.md @@ -0,0 +1,13 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [registerOnPreAuth](./kibana-plugin-server.httpserversetup.registeronpreauth.md) + +## HttpServerSetup.registerOnPreAuth property + +To define custom logic to perform for incoming requests. Runs the handler before Auth hook performs a check that user has access to requested resources, so it's the only place when you can forward a request to another URL right on the server. Can register any number of registerOnPostAuth, which are called in sequence (from the first registered to the last). + +Signature: + +```typescript +registerOnPreAuth: (handler: OnPreAuthHandler) => void; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.registerrouter.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.registerrouter.md new file mode 100644 index 0000000000000..2faba49a940e7 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.registerrouter.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [registerRouter](./kibana-plugin-server.httpserversetup.registerrouter.md) + +## HttpServerSetup.registerRouter property + +Signature: + +```typescript +registerRouter: (router: Router) => void; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpserversetup.server.md b/docs/development/core/server/kibana-plugin-server.httpserversetup.server.md new file mode 100644 index 0000000000000..a137eba7c8a5a --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.httpserversetup.server.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) > [server](./kibana-plugin-server.httpserversetup.server.md) + +## HttpServerSetup.server property + +Signature: + +```typescript +server: Server; +``` diff --git a/docs/development/core/server/kibana-plugin-server.httpservicesetup.createnewserver.md b/docs/development/core/server/kibana-plugin-server.httpservicesetup.createnewserver.md index e41684ea2b784..9a31cdedaa8d8 100644 --- a/docs/development/core/server/kibana-plugin-server.httpservicesetup.createnewserver.md +++ b/docs/development/core/server/kibana-plugin-server.httpservicesetup.createnewserver.md @@ -7,5 +7,5 @@ Signature: ```typescript -createNewServer: (cfg: Partial) => Promise; +createNewServer: (port: number, ssl: SslConfigType) => Promise; ``` diff --git a/docs/development/core/server/kibana-plugin-server.httpservicesetup.md b/docs/development/core/server/kibana-plugin-server.httpservicesetup.md index ec4a2537b8404..3883e6c87e14d 100644 --- a/docs/development/core/server/kibana-plugin-server.httpservicesetup.md +++ b/docs/development/core/server/kibana-plugin-server.httpservicesetup.md @@ -15,5 +15,5 @@ export interface HttpServiceSetup extends HttpServerSetup | Property | Type | Description | | --- | --- | --- | -| [createNewServer](./kibana-plugin-server.httpservicesetup.createnewserver.md) | (cfg: Partial<HttpConfig>) => Promise<HttpServerSetup> | | +| [createNewServer](./kibana-plugin-server.httpservicesetup.createnewserver.md) | (port: number, ssl: SslConfigType) => Promise<HttpServerSetup> | | diff --git a/docs/development/core/server/kibana-plugin-server.md b/docs/development/core/server/kibana-plugin-server.md index ab79f2b382909..d309a2863adf7 100644 --- a/docs/development/core/server/kibana-plugin-server.md +++ b/docs/development/core/server/kibana-plugin-server.md @@ -34,6 +34,7 @@ The plugin integrates with the core system via lifecycle events: `setup` | [ElasticsearchError](./kibana-plugin-server.elasticsearcherror.md) | | | [ElasticsearchServiceSetup](./kibana-plugin-server.elasticsearchservicesetup.md) | | | [FakeRequest](./kibana-plugin-server.fakerequest.md) | Fake request object created manually by Kibana plugins. | +| [HttpServerSetup](./kibana-plugin-server.httpserversetup.md) | | | [HttpServiceSetup](./kibana-plugin-server.httpservicesetup.md) | | | [HttpServiceStart](./kibana-plugin-server.httpservicestart.md) | | | [InternalCoreStart](./kibana-plugin-server.internalcorestart.md) | | diff --git a/package.json b/package.json index 4a2ee4150d8d5..2d7ad578bcab8 100644 --- a/package.json +++ b/package.json @@ -124,6 +124,7 @@ "@types/lodash.clonedeep": "^4.5.4", "@types/react-grid-layout": "^0.16.7", "@types/recompose": "^0.30.5", + "@types/wreck": "^14.0.0", "JSONStream": "1.3.5", "abortcontroller-polyfill": "^1.1.9", "angular": "1.6.9", @@ -300,6 +301,7 @@ "@types/graphql": "^0.13.1", "@types/hapi": "^17.0.18", "@types/hapi-auth-cookie": "^9.1.0", + "@types/hapi__wreck": "^15.0.0", "@types/has-ansi": "^3.0.0", "@types/hoek": "^4.1.3", "@types/humps": "^1.1.2", @@ -425,6 +427,7 @@ "regenerate": "^1.4.0", "sass-lint": "^1.12.1", "selenium-webdriver": "^4.0.0-alpha.4", + "selfsigned": "^1.10.4", "simple-git": "1.116.0", "sinon": "^7.2.2", "strip-ansi": "^3.0.1", diff --git a/renovate.json5 b/renovate.json5 index ef38292d024e1..493b7077b56f2 100644 --- a/renovate.json5 +++ b/renovate.json5 @@ -248,6 +248,14 @@ '@types/lodash.clonedeep', ], }, + { + groupSlug: 'wreck', + groupName: 'wreck related packages', + packageNames: [ + 'wreck', + '@types/wreck', + ], + }, { groupSlug: 'bluebird', groupName: 'bluebird related packages', diff --git a/src/core/server/elasticsearch/elasticsearch_service.mock.ts b/src/core/server/elasticsearch/elasticsearch_service.mock.ts index dd35a4c3f5489..77c972f24d787 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.mock.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.mock.ts @@ -34,15 +34,15 @@ const createClusterClientMock = (): jest.Mocked> close: jest.fn(), }); -const createSetupContractMock = () => { - const setupContract: jest.Mocked = { +const createSetupContractMock = (clients = { adminClient: {}, dataClient: {} }) => { + const setupContract: ElasticsearchServiceSetup = { legacy: { config$: new BehaviorSubject({} as ElasticsearchConfig), }, - createClient: jest.fn().mockImplementation(createClusterClientMock), - adminClient$: new BehaviorSubject((createClusterClientMock() as unknown) as ClusterClient), - dataClient$: new BehaviorSubject((createClusterClientMock() as unknown) as ClusterClient), + createClient: jest.fn(), + adminClient$: new BehaviorSubject(clients.adminClient as ClusterClient), + dataClient$: new BehaviorSubject(clients.dataClient as ClusterClient), }; return setupContract; }; diff --git a/src/core/server/http/http_service.test.ts b/src/core/server/http/http_service.test.ts index f003ba1314434..a06ca8d41fd49 100644 --- a/src/core/server/http/http_service.test.ts +++ b/src/core/server/http/http_service.test.ts @@ -19,6 +19,11 @@ import { mockHttpServer } from './http_service.test.mocks'; +import crypto from 'crypto'; +import { tmpdir } from 'os'; +import { writeFileSync, unlinkSync } from 'fs'; +import { join } from 'path'; +import selfsigned from 'selfsigned'; import { noop } from 'lodash'; import { BehaviorSubject } from 'rxjs'; import { HttpService, Router } from '.'; @@ -28,6 +33,9 @@ import { Config, ConfigService, Env, ObjectToConfigAdapter } from '../config'; import { loggingServiceMock } from '../logging/logging_service.mock'; import { getEnvOptions } from '../config/__mocks__/env'; +// `crypto` type definitions doesn't currently include `crypto.constants`, see +// https://github.com/DefinitelyTyped/DefinitelyTyped/blob/fa5baf1733f49cf26228a4e509914572c1b74adf/types/node/v6/index.d.ts#L3412 +const cryptoConstants = (crypto as any).constants; const logger = loggingServiceMock.create(); const env = Env.createDefault(getEnvOptions()); @@ -135,6 +143,13 @@ test('spins up notReady server until started if configured with `autoListen:true // this is an integration test! test('creates and sets up second http server', async () => { + const tmp = tmpdir(); + const keyFile = join(tmp, 'tmp-key'); + const certFile = join(tmp, 'tmp-cert'); + const attrs = [{ name: 'commonName', value: 'kibana.dev' }]; + const pems = selfsigned.generate(attrs, { days: 365 }); + writeFileSync(keyFile, pems.private, 'utf8'); + writeFileSync(certFile, pems.cert, 'utf8'); const configService = createConfigService({ host: 'localhost', port: 1234, @@ -145,32 +160,41 @@ test('creates and sets up second http server', async () => { const service = new HttpService({ configService, env, logger }); const serverSetup = await service.setup(); - const cfg = { port: 2345 }; - await serverSetup.createNewServer(cfg); + const port = 2345; + const ssl = { + enabled: true, + redirectHttpFromPort: port, + certificate: certFile, + key: keyFile, + certificateAuthorities: undefined, + cipherSuites: cryptoConstants.defaultCoreCipherList.split(':'), + keyPassphrase: undefined, + supportedProtocols: ['TLSv1.1', 'TLSv1.2'], + requestCert: true, + }; + + await serverSetup.createNewServer(port, ssl); const server = await service.start(); expect(server.isListening()).toBeTruthy(); - expect(server.isListening(cfg.port)).toBeTruthy(); + expect(server.isListening(port)).toBeTruthy(); try { - await serverSetup.createNewServer(cfg); + await serverSetup.createNewServer(port, ssl); } catch (err) { expect(err.message).toBe('port 2345 is already in use'); } try { - await serverSetup.createNewServer({ port: 1234 }); + await serverSetup.createNewServer(1234, ssl); } catch (err) { expect(err.message).toBe('port 1234 is already in use'); } - try { - await serverSetup.createNewServer({ host: 'example.org' }); - } catch (err) { - expect(err.message).toBe('port must be defined'); - } await service.stop(); expect(server.isListening()).toBeFalsy(); - expect(server.isListening(cfg.port)).toBeFalsy(); + expect(server.isListening(port)).toBeFalsy(); + unlinkSync(certFile); + unlinkSync(keyFile); }); test('logs error if already set up', async () => { diff --git a/src/core/server/http/http_service.ts b/src/core/server/http/http_service.ts index b06c690cf2621..12d05b14bad93 100644 --- a/src/core/server/http/http_service.ts +++ b/src/core/server/http/http_service.ts @@ -25,13 +25,14 @@ import { LoggerFactory } from '../logging'; import { CoreService } from '../../types'; import { Logger } from '../logging'; import { CoreContext } from '../core_context'; -import { HttpConfig, HttpConfigType, config as httpConfig } from './http_config'; +import { HttpConfig, HttpConfigType } from './http_config'; import { HttpServer, HttpServerSetup } from './http_server'; import { HttpsRedirectServer } from './https_redirect_server'; +import { SslConfig, SslConfigType } from './ssl_config'; /** @public */ export interface HttpServiceSetup extends HttpServerSetup { - createNewServer: (cfg: Partial) => Promise; + createNewServer: (port: number, ssl: SslConfigType) => Promise; } /** @public */ export interface HttpServiceStart { @@ -129,8 +130,7 @@ export class HttpService implements CoreService) { - const { port } = cfg; + private async createServer(port: number, ssl: SslConfigType) { const config = await this.config$.pipe(first()).toPromise(); if (!port) { @@ -141,10 +141,10 @@ export class HttpService implements CoreService = { + port, + ssl: new SslConfig(ssl), + }; const baseConfig = await this.config$.pipe(first()).toPromise(); const finalConfig = { ...baseConfig, ...cfg }; diff --git a/src/core/server/http/selfsigned.d.ts b/src/core/server/http/selfsigned.d.ts new file mode 100644 index 0000000000000..16ba14a459084 --- /dev/null +++ b/src/core/server/http/selfsigned.d.ts @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +declare module 'selfsigned' { + export function generate( + attrs: any[], + options: { [key: string]: any }, + done?: (err: Error, certs: any) => void + ): any; +} diff --git a/src/core/server/http/ssl_config.ts b/src/core/server/http/ssl_config.ts index c32b94cf26def..825d5b26156f8 100644 --- a/src/core/server/http/ssl_config.ts +++ b/src/core/server/http/ssl_config.ts @@ -60,7 +60,7 @@ export const sslSchema = schema.object( } ); -type SslConfigType = TypeOf; +export type SslConfigType = TypeOf; export class SslConfig { public enabled: boolean; diff --git a/src/core/server/index.ts b/src/core/server/index.ts index 4582f1362922f..b897b0b1d402a 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -76,6 +76,7 @@ export { SessionStorageFactory, SessionStorage, } from './http'; +export { HttpServerSetup } from './http/http_server'; export { Logger, LoggerFactory, LogMeta, LogRecord, LogLevel } from './logging'; export { diff --git a/src/core/server/server.api.md b/src/core/server/server.api.md index a6fbfebf9d947..10a8928821d13 100644 --- a/src/core/server/server.api.md +++ b/src/core/server/server.api.md @@ -5,7 +5,6 @@ ```ts import Boom from 'boom'; -import { ByteSizeValue } from '@kbn/config-schema'; import { CallCluster } from 'src/legacy/core_plugins/elasticsearch'; import { ConfigOptions } from 'elasticsearch'; import { Duration } from 'moment'; @@ -165,14 +164,43 @@ export type GetAuthHeaders = (request: KibanaRequest | Request) => AuthHeaders | // @public (undocumented) export type Headers = Record; -// Warning: (ae-forgotten-export) The symbol "HttpServerSetup" needs to be exported by the entry point index.d.ts +// Warning: (ae-missing-release-tag) "HttpServerSetup" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) // +// @public (undocumented) +export interface HttpServerSetup { + // (undocumented) + auth: { + get: AuthStateStorage['get']; + isAuthenticated: AuthStateStorage['isAuthenticated']; + getAuthHeaders: AuthHeadersStorage['get']; + }; + // (undocumented) + basePath: { + get: (request: KibanaRequest | Request) => string; + set: (request: KibanaRequest | Request, basePath: string) => void; + prepend: (url: string) => string; + remove: (url: string) => string; + }; + // (undocumented) + isTlsEnabled: boolean; + // Warning: (ae-forgotten-export) The symbol "SessionStorageCookieOptions" needs to be exported by the entry point index.d.ts + registerAuth: (handler: AuthenticationHandler, cookieOptions: SessionStorageCookieOptions) => Promise<{ + sessionStorageFactory: SessionStorageFactory; + }>; + registerOnPostAuth: (handler: OnPostAuthHandler) => void; + registerOnPreAuth: (handler: OnPreAuthHandler) => void; + // (undocumented) + registerRouter: (router: Router) => void; + // (undocumented) + server: Server; +} + // @public (undocumented) export interface HttpServiceSetup extends HttpServerSetup { - // Warning: (ae-forgotten-export) The symbol "HttpConfig" needs to be exported by the entry point index.d.ts + // Warning: (ae-forgotten-export) The symbol "SslConfigType" needs to be exported by the entry point index.d.ts // // (undocumented) - createNewServer: (cfg: Partial) => Promise; + createNewServer: (port: number, ssl: SslConfigType) => Promise; } // @public (undocumented) @@ -690,6 +718,8 @@ export interface SessionStorageFactory { // Warnings were encountered during analysis: // +// src/core/server/http/http_server.ts:74:5 - (ae-forgotten-export) The symbol "AuthStateStorage" needs to be exported by the entry point index.d.ts +// src/core/server/http/http_server.ts:76:5 - (ae-forgotten-export) The symbol "AuthHeadersStorage" needs to be exported by the entry point index.d.ts // src/core/server/plugins/plugin_context.ts:34:10 - (ae-forgotten-export) The symbol "EnvironmentMode" needs to be exported by the entry point index.d.ts // src/core/server/plugins/plugins_service.ts:37:5 - (ae-forgotten-export) The symbol "DiscoveredPluginInternal" needs to be exported by the entry point index.d.ts diff --git a/x-pack/dev-tools/jest/create_jest_config.js b/x-pack/dev-tools/jest/create_jest_config.js index fa8cae2b6b86e..b4078bfebfb08 100644 --- a/x-pack/dev-tools/jest/create_jest_config.js +++ b/x-pack/dev-tools/jest/create_jest_config.js @@ -12,8 +12,7 @@ export function createJestConfig({ rootDir: xPackKibanaDirectory, roots: [ '/plugins', - '/legacy/plugins', - '/legacy/server', + '/server', ], moduleFileExtensions: [ 'js', @@ -23,11 +22,8 @@ export function createJestConfig({ ], moduleNameMapper: { '^ui/(.*)': `${kibanaDirectory}/src/legacy/ui/public/$1`, - 'uiExports/(.*)': `${kibanaDirectory}/src/dev/jest/mocks/file_mock.js`, '^src/core/(.*)': `${kibanaDirectory}/src/core/$1`, - '^plugins/watcher/models/(.*)': `${xPackKibanaDirectory}/legacy/plugins/watcher/public/models/$1`, - '^plugins/([^\/.]*)(.*)': `${kibanaDirectory}/src/legacy/core_plugins/$1/public$2`, - '^legacy/plugins/xpack_main/(.*);': `${xPackKibanaDirectory}/legacy/plugins/xpack_main/public/$1`, + '^plugins/xpack_main/(.*);': `${xPackKibanaDirectory}/plugins/xpack_main/public/$1`, '\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$': `${kibanaDirectory}/src/dev/jest/mocks/file_mock.js`, '\\.(css|less|scss)$': `${kibanaDirectory}/src/dev/jest/mocks/style_mock.js`, @@ -38,9 +34,6 @@ export function createJestConfig({ `/dev-tools/jest/setup/polyfills.js`, `/dev-tools/jest/setup/enzyme.js`, ], - setupFilesAfterEnv: [ - `${kibanaDirectory}/src/dev/jest/setup/mocks.js`, - ], testMatch: [ '**/*.test.{js,ts,tsx}' ], diff --git a/x-pack/plugins/proxy/kibana.json b/x-pack/plugins/proxy/kibana.json new file mode 100644 index 0000000000000..facc815fb4834 --- /dev/null +++ b/x-pack/plugins/proxy/kibana.json @@ -0,0 +1,7 @@ +{ + "id": "proxy", + "server": true, + "ui": false, + "configPath": ["xpath", "proxy"], + "version": "kibana" +} diff --git a/x-pack/plugins/proxy/server/cluster_doc.test.mock.ts b/x-pack/plugins/proxy/server/cluster_doc.test.mock.ts new file mode 100644 index 0000000000000..095ec6772013a --- /dev/null +++ b/x-pack/plugins/proxy/server/cluster_doc.test.mock.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const mockClusterDocClient = jest.fn(); + +jest.mock('./cluster_doc', () => { + const realClusterDocClient = jest.requireActual('./cluster_doc'); + + return { + ...realClusterDocClient, + ClusterDocClient: mockClusterDocClient, + }; +}); diff --git a/x-pack/plugins/proxy/server/cluster_doc.test.ts b/x-pack/plugins/proxy/server/cluster_doc.test.ts new file mode 100644 index 0000000000000..9d1197b2ff71b --- /dev/null +++ b/x-pack/plugins/proxy/server/cluster_doc.test.ts @@ -0,0 +1,276 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import Boom from 'boom'; +import { Observable, BehaviorSubject } from 'rxjs'; + +import { ClusterDocClient, RouteState, RoutingNode } from './cluster_doc'; +import { + Config, + ConfigService, + Env, + ObjectToConfigAdapter, +} from '../../../../src/core/server/config'; +import { loggingServiceMock } from '../../../../src/core/server/logging/logging_service.mock'; +import { elasticsearchServiceMock } from '../../../../src/core/server/elasticsearch/elasticsearch_service.mock'; +import { getEnvOptions } from '../../../../src/core/server/config/__mocks__/env'; +import { ProxyConfig, ProxyPluginType } from './proxy'; +import { + unassignResource, + updateHeartbeat, + removeHeartbeat, + cullDeadResources, + cullDeadNodes, +} from './painless_queries'; + +const logger = loggingServiceMock.create(); +const env = Env.createDefault(getEnvOptions()); + +const createConfigService = (value: Partial = {}) => { + const conf = Object.assign( + { + updateInterval: 0, + port: 0, + maxRetry: 0, + cert: '', + key: '', + ca: '', + }, + value + ); + const cs = new ConfigService( + new BehaviorSubject( + new ObjectToConfigAdapter({ + xpack: { + proxy: conf, + }, + }) + ), + env, + logger + ); + cs.setSchema('xpack.proxy', ProxyConfig.schema); + return cs; +}; + +function configService(value: Partial) { + return { + create: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + createIfExists: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + }; +} + +beforeEach(() => { + jest.useFakeTimers(); +}); + +afterEach(() => { + jest.clearAllMocks(); +}); + +test('initial run of main loop works', async () => { + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async function() { + return { _source: {} }; + }), + }, + }; + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + + try { + await clusterDoc.setup(elasticClient.dataClient$); + await clusterDoc.start(); + await clusterDoc.stop(); + } catch (err) { + expect(err).toBeFalsy(); + } + + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(esClients.dataClient.callAsInternalUser.mock.calls[0][0]).toBe('update'); + expect(esClients.dataClient.callAsInternalUser.mock.calls[1][0]).toBe('get'); + expect(esClients.dataClient.callAsInternalUser.mock.calls[2][0]).toBe('update'); + // 2 calls per "loop", 1 call on "stop" + expect(esClients.dataClient.callAsInternalUser).toHaveBeenCalledTimes(3); + const update = esClients.dataClient.callAsInternalUser.mock.calls[0][1].body.script; + const remove = esClients.dataClient.callAsInternalUser.mock.calls[2][1].body.script; + expect(update).toBe(updateHeartbeat); + expect(remove).toBe(removeHeartbeat); +}); + +test('removes stale nodes, keeps good nodes', async () => { + const nodeName = 'd4fa4018-8510-420c-aa99-d6d722792b3c'; + const nodeName2 = '073fb287-161c-49f3-976d-1e507575e354'; + const mockHeartbeatReply: { _source: { [key: string]: number } } = { + _source: { + [nodeName2]: 1, // this node will be culled + [nodeName]: 2, + }, + }; + + const mockResourceReply: { _source: { [key: string]: RoutingNode } } = { + _source: { + 'git@github.com:elastic/kibana': { + state: RouteState.Started, + node: nodeName2, + type: 'code', + }, + 'git@github.com:elastic/elasticsearch': { + state: RouteState.Started, + node: nodeName, + type: 'code', + }, + }, + }; + + // yay lets implement what es would do with these scripts... + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async (method, params) => { + if (params.id === 'proxy-heartbeat-list') { + if (method === 'get') { + // don't forget js is pass-by-reference! we need a _new_ object here + return JSON.parse(JSON.stringify(mockHeartbeatReply)); + } else { + if (params.body.script === updateHeartbeat) { + mockHeartbeatReply._source[params.body.params.resource]++; + } else if (params.body.script === cullDeadNodes) { + const nodes = params.body.params.nodeList; + for (const [key, val] of Object.entries(mockHeartbeatReply._source)) { + if (val === nodes[key]) { + delete mockHeartbeatReply._source[key]; + } + } + } + } + } else { + if (method === 'update') { + if (params.body.script) { + if (params.body.script === unassignResource) { + const key = Object.entries(mockResourceReply._source).find( + entry => entry[1].node === params.body.params.resource + ); + if (Array.isArray(key)) { + delete mockResourceReply._source[key[0]]; + } + } else if (params.body.script === cullDeadResources) { + const nodeList = params.body.params.nodes; + for (const [key, val] of Object.entries(mockResourceReply._source)) { + if (!nodeList.includes(val.node)) { + if (val.state === RouteState.Closing) { + delete mockResourceReply._source[key]; + } else { + val.state = RouteState.Closing; + } + } else if (val.state === RouteState.Closing) { + val.state = RouteState.Started; + } + } + } + } else { + Object.assign(mockResourceReply._source, params.body); + } + } else { + return JSON.parse(JSON.stringify(mockResourceReply)); + } + } + }), + }, + }; + + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + clusterDoc.nodeName = nodeName; + + try { + await clusterDoc.setup(elasticClient.dataClient$); + } catch (err) { + expect(err).toBeFalsy(); + } + + // misses a single update, remove node and set resource to closing + await clusterDoc.updateHeartbeat(); + (clusterDoc as any).nodeCache = await clusterDoc.getHeartbeats(); + await clusterDoc.updateHeartbeat(); + await (clusterDoc as any).cullDeadNodes(); + expect(esClients.dataClient.callAsInternalUser).toHaveBeenCalledTimes(6); + expect(Object.keys(mockHeartbeatReply._source).length).toBe(1); + expect(Object.keys(mockResourceReply._source).length).toBe(2); + expect(mockResourceReply._source['git@github.com:elastic/kibana'].state).toBe(RouteState.Closing); + + // updates, restores to routing doc and resource is started + await clusterDoc.updateHeartbeat(); + mockHeartbeatReply._source[nodeName2] = 3; + (clusterDoc as any).nodeCache = await clusterDoc.getHeartbeats(); + await clusterDoc.updateHeartbeat(); + mockHeartbeatReply._source[nodeName2] = 4; + await (clusterDoc as any).cullDeadNodes(); + expect(Object.keys(mockHeartbeatReply._source).length).toBe(2); + expect(Object.keys(mockResourceReply._source).length).toBe(2); + expect(mockResourceReply._source['git@github.com:elastic/kibana'].state).toBe(RouteState.Started); + + // misses two updates, remove resource + await clusterDoc.updateHeartbeat(); + (clusterDoc as any).nodeCache = await clusterDoc.getHeartbeats(); + await clusterDoc.updateHeartbeat(); + mockHeartbeatReply._source[nodeName2] = (clusterDoc as any).nodeCache[nodeName2]; + mockResourceReply._source['git@github.com:elastic/kibana'].state = RouteState.Closing; + await (clusterDoc as any).cullDeadNodes(); + expect(Object.keys(mockResourceReply._source).length).toBe(1); + expect(mockResourceReply._source['git@github.com:elastic/kibana']).toBeFalsy(); + + // add a new resource + await clusterDoc.assignResource('git@github.com:elastic/beats', 'code', RouteState.Started); + expect(Object.keys(mockResourceReply._source).length).toBe(2); + + // add existing resource + try { + await clusterDoc.assignResource('git@github.com:elastic/beats', 'code', RouteState.Started); + } catch (err) { + expect(err.message).toBe( + `git@github.com:elastic/beats already exists on ${clusterDoc.nodeName}` + ); + } +}); + +test('it continues on errors', async () => { + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async method => { + if (method === 'get') { + return { _source: {} }; + } + const err = Boom.boomify(new Error('foo'), { statusCode: 409 }); + throw err; + }), + }, + }; + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + + try { + await clusterDoc.setup(elasticClient.dataClient$); + await clusterDoc.start(); + } catch (err) { + expect(err).toBeFalsy(); + } + + expect(setTimeout).toHaveBeenCalledTimes(1); +}); diff --git a/x-pack/plugins/proxy/server/cluster_doc.ts b/x-pack/plugins/proxy/server/cluster_doc.ts new file mode 100644 index 0000000000000..989177ff13726 --- /dev/null +++ b/x-pack/plugins/proxy/server/cluster_doc.ts @@ -0,0 +1,323 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Boom from 'boom'; +import { v4 } from 'uuid'; +import { Observable, pairs } from 'rxjs'; +import { first } from 'rxjs/operators'; + +import { PluginInitializerContext, Logger, ClusterClient } from '../../../../src/core/server'; + +import { ProxyPluginType } from './proxy'; +import { + unassignResource, + updateHeartbeat, + removeHeartbeat, + cullDeadResources, + cullDeadNodes, +} from './painless_queries'; + +interface LivenessNode { + lastUpdate: number; +} + +export interface NodeList { + [key: string]: LivenessNode; +} + +export function randomInt(min: number, max: number) { + return Math.floor(Math.random() * (min - max) + min); +} + +export enum RouteState { + Initializing, + Started, + Closed, + Closing, +} + +export interface RoutingNode { + type: string; // what are all the types this can be? + node: string; + state: RouteState; +} + +export interface RoutingTable { + [key: string]: RoutingNode; +} + +export class ClusterDocClient { + public nodeName: string; + private elasticsearch$?: Observable; + private updateInterval: number = 15 * 1000; + private updateTimer: null | NodeJS.Timer = null; + private maxRetry: number = 0; + private runCull: boolean = false; + private nodeCache: NodeList = {}; + + private readonly minUpdateShuffle = 0; + private readonly maxUpdateShuffle = 1000; + private readonly proxyIndex = '.kibana'; + private readonly routingDoc = 'proxy-resource-list'; + private readonly heartbeatDoc = 'proxy-heartbeat-list'; + private readonly docType = '_doc'; + private readonly log: Logger; + private readonly config$: Observable; + + constructor(initializerContext: PluginInitializerContext) { + this.nodeName = v4(); + this.config$ = initializerContext.config.create(); + this.log = initializerContext.logger.get('proxy'); + } + + public async setup(esClient: Observable) { + this.elasticsearch$ = esClient; + const config = await this.config$.pipe(first()).toPromise(); + this.setConfig(config); + } + + public async start() { + return await this.mainLoop(); + } + + public async stop() { + if (this.updateTimer) { + clearTimeout(this.updateTimer); + } + await this.removeHeartbeat(); + } + + public async getNodeForResource(resource: string) { + const table = await this.getRoutingData(); + return table[resource]; + } + + public async getRoutingData(): Promise { + const client = await this.getESClient(); + const params = { + index: this.proxyIndex, + type: this.docType, + id: this.routingDoc, + source: true, + }; + const res = await client.callAsInternalUser('get', params); + const table = res._source as RoutingTable; + return table; + } + + public async getRoutingTable(): Promise> { + const table = await this.getRoutingData(); + return pairs(table); + } + + public async assignResource(resource: string, type: string, state: RouteState, node?: string) { + const table = await this.getRoutingData(); + if (table[resource]) { + throw new Error(`${resource} already exists on ${table[resource].node}`); + } + const body = { + [resource]: { + type, + state, + node: node || this.nodeName, + }, + }; + + const client = await this.getESClient(); + const params = { + index: this.proxyIndex, + type: this.docType, + id: this.routingDoc, + body, + retryOnConflict: this.maxRetry, + }; + await client.callAsInternalUser('update', params); + } + + public async unassignResource(resource: string) { + const client = await this.getESClient(); + const body = { + script: unassignResource, + params: { + resource, + }, + }; + const params = { + index: this.proxyIndex, + type: this.docType, + id: this.routingDoc, + body, + retryOnConflict: this.maxRetry, + }; + await client.callAsInternalUser('update', params); + } + + private async getESClient(): Promise { + if (!this.elasticsearch$) { + const err = Boom.boomify(new Error('You must call setup first'), { statusCode: 412 }); + throw err; + } + const client = await this.elasticsearch$.pipe(first()).toPromise(); + return client; + } + + private setConfig(config: ProxyPluginType) { + const update = config.updateInterval + randomInt(this.minUpdateShuffle, this.maxUpdateShuffle); + this.updateInterval = update; + this.maxRetry = config.maxRetry; + } + + private setTimer() { + if (this.updateTimer) return; + this.log.debug('Set timer to updateNodeMap'); + this.updateTimer = global.setTimeout(() => { + this.log.debug('Updating heartbeat'); + this.mainLoop(); + }, this.updateInterval); + } + + public async getHeartbeats() { + const client = await this.getESClient(); + const params = { + id: this.heartbeatDoc, + type: this.docType, + index: this.proxyIndex, + _source: true, + }; + const reply = await client.callAsInternalUser('get', params); + return reply._source as NodeList; + } + + public async removeHeartbeat() { + const client = await this.getESClient(); + const body = { + script: removeHeartbeat, + params: { + resource: this.nodeName, + }, + }; + const params = { + id: this.heartbeatDoc, + type: this.docType, + index: this.proxyIndex, + body, + retryOnConflict: this.maxRetry, + }; + await client.callAsInternalUser('update', params); + } + + /** + * Node heartbeats are monotonically increasing integers + * @param remove [boolean] should this node be deleted + */ + public async updateHeartbeat(remove: boolean = false) { + const client = await this.getESClient(); + const body = { + script: updateHeartbeat, + params: { + resource: this.nodeName, + }, + }; + const params = { + id: this.heartbeatDoc, + type: this.docType, + index: this.proxyIndex, + body, + retryOnConflict: this.maxRetry, + }; + await client.callAsInternalUser('update', params); + } + + /** + * remove resources that no longer have a live nodei. since each node runs + * this in the same way, we ignore conflicts here -- one of them will win + * eventually, and update the correctly + * @param nodes + */ + private async cullDeadResources(nodes: string[]) { + const client = await this.getESClient(); + const body = { + script: cullDeadResources, + params: { + nodes, + routeInitializing: RouteState.Initializing, + routeStarted: RouteState.Started, + routeClosing: RouteState.Closing, + routeClosed: RouteState.Closed, + }, + }; + + const params = { + index: this.proxyIndex, + type: this.docType, + id: this.routingDoc, + body, + }; + await client.callAsInternalUser('update', params); + } + + /** + * remove nodes that are past their timeout. since each node runs this in the + * same way, we ignore conflicts here -- one of them will win eventually + */ + private async cullDeadNodes() { + const client = await this.getESClient(); + const body = { + script: cullDeadNodes, + params: { + nodeList: this.nodeCache, + }, + }; + const params = { + id: this.heartbeatDoc, + type: this.docType, + index: this.proxyIndex, + body, + }; + await client.callAsInternalUser('update', params); + const nodes = await this.getHeartbeats(); + await this.cullDeadResources(Object.keys(nodes)); + } + + /** + * The logic for this loop works as such: + * Since a node has to miss _two_ heartbeat updates in a row, the logic for how + * this works is controlled by a flag. + * + * It will always update the current node's heartbeat, and then on even loop + * runs (total count of loop is even) it'll cache the current node list (as it + * appears after the heartbeat is updated), and then flip the cull flag + * + * On odd loops, it'll then remove dead nodes which can be determined by looking + * at which nodes haven't updated since we cached the node list. Once they've + * missed this single check-in, the resource that node represents is moved into + * "closing". If on the next odd loop, that resource is still not in the heartbeat + * document, we remove the resource from the list + * + * Even loops: update heartbeat, cache current node list, flip cull flag to true + * Odd loops: update heartbeat, cull nodes, cull resources, flip cull flag to false + * + */ + private async mainLoop() { + this.updateTimer = null; + try { + await this.updateHeartbeat(); + // we only want to run the cull every other pass so we'll fiddle with + // a boolean like a real programmer + if (this.runCull) { + await this.cullDeadNodes(); + this.runCull = false; + } else { + this.nodeCache = await this.getHeartbeats(); + this.runCull = true; + } + } catch (err) { + this.log.warn('Unable to update heartbeat', err); + } finally { + this.setTimer(); + } + } +} diff --git a/x-pack/plugins/proxy/server/cluster_doc_original.test.ts b/x-pack/plugins/proxy/server/cluster_doc_original.test.ts new file mode 100644 index 0000000000000..9edf9de5b9eab --- /dev/null +++ b/x-pack/plugins/proxy/server/cluster_doc_original.test.ts @@ -0,0 +1,201 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { Observable, BehaviorSubject } from 'rxjs'; + +import { RouteState } from './cluster_doc'; +import { ClusterDocClient } from './cluster_doc_original'; +import { + Config, + ConfigService, + Env, + ObjectToConfigAdapter, +} from '../../../../src/core/server/config'; +import { loggingServiceMock } from '../../../../src/core/server/logging/logging_service.mock'; +import { elasticsearchServiceMock } from '../../../../src/core/server/elasticsearch/elasticsearch_service.mock'; +import { getEnvOptions } from '../../../../src/core/server/config/__mocks__/env'; +import { ProxyConfig, ProxyPluginType } from './proxy'; + +const logger = loggingServiceMock.create(); +const env = Env.createDefault(getEnvOptions()); + +const createConfigService = (value: Partial = {}) => { + const conf = Object.assign( + { + updateInterval: 0, + timeoutThreshold: 0, + port: 0, + maxRetry: 0, + cert: '', + key: '', + ca: '', + }, + value + ); + const cs = new ConfigService( + new BehaviorSubject( + new ObjectToConfigAdapter({ + xpack: { + proxy: conf, + }, + }) + ), + env, + logger + ); + cs.setSchema('xpack.proxy', ProxyConfig.schema); + return cs; +}; + +function configService(value: Partial) { + return { + create: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + createIfExists: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + }; +} + +beforeEach(() => { + jest.useFakeTimers(); +}); + +afterEach(() => { + jest.clearAllMocks(); +}); + +test('initial run of main loop works', async () => { + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async () => ({ _source: {} })), + }, + }; + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + timeoutThreshold: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + + try { + await clusterDoc.setup(elasticClient.dataClient$); + await clusterDoc.start(); + await clusterDoc.stop(); + } catch (err) { + expect(err).toBeFalsy(); + } + + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(esClients.dataClient.callAsInternalUser.mock.calls[0][0]).toBe('get'); + expect(esClients.dataClient.callAsInternalUser.mock.calls[1][0]).toBe('index'); + const nodeList = esClients.dataClient.callAsInternalUser.mock.calls[1][1].body; + const nodeKeys = Object.keys(nodeList.nodes); + expect(nodeList.routing_table).toMatchObject({}); + expect(nodeKeys.length).toBe(1); + expect(nodeList.nodes[nodeKeys[0]].lastUpdate).not.toBe(0); + expect(nodeList.nodes[nodeKeys[0]].lastUpdate).toBeLessThan(new Date().getTime()); +}); + +test('removes stale nodes, keeps good nodes', async () => { + const mockESReply = { + _source: { + nodes: { + '073fb287-161c-49f3-976d-1e507575e354': { + lastUpdate: 100, + }, + 'd4fa4018-8510-420c-aa99-d6d722792b3c': { + lastUpdate: new Date().getTime(), + }, + }, + routing_table: { + resource1: { + type: 'code', + node: '073fb287-161c-49f3-976d-1e507575e354', + state: RouteState.Started, + }, + resource2: { + type: 'code', + node: 'd4fa4018-8510-420c-aa99-d6d722792b3c', + state: RouteState.Started, + }, + }, + }, + }; + + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async () => mockESReply), + }, + }; + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + timeoutThreshold: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + + try { + await clusterDoc.setup(elasticClient.dataClient$); + await clusterDoc.start(); + } catch (err) { + expect(err).toBeFalsy(); + } + + const nodeList = esClients.dataClient.callAsInternalUser.mock.calls[1][1].body; + const nodeKeys = Object.keys(nodeList.nodes); + + expect(nodeList.routing_table).toEqual({ + resource2: mockESReply._source.routing_table.resource2, + }); + expect(nodeKeys.length).toBe(2); + expect(nodeKeys.includes('073fb287-161c-49f3-976d-1e507575e354')).toBeFalsy(); + expect(nodeKeys.includes('d4fa4018-8510-420c-aa99-d6d722792b3c')).toBeTruthy(); + expect(nodeKeys.includes(clusterDoc.nodeName)).toBeTruthy(); + expect(nodeList.nodes[nodeKeys[0]].lastUpdate).not.toBe(100); + expect(nodeList.nodes[nodeKeys[0]].lastUpdate).toBeLessThan(new Date().getTime()); + + await clusterDoc.stop(); +}); + +test('assign and unassign resource', async () => { + const esClients = { + adminClient: {}, + dataClient: { + callAsInternalUser: jest.fn, any>(async () => ({ _source: {} })), + }, + }; + const elasticClient = elasticsearchServiceMock.createSetupContract(esClients); + const config = configService({ + updateInterval: 100, + timeoutThreshold: 100, + }); + const clusterDoc = new ClusterDocClient({ config, env, logger }); + + try { + await clusterDoc.setup(elasticClient.dataClient$); + await clusterDoc.start(); + } catch (err) { + expect(err).toBeFalsy(); + } + + await clusterDoc.assignResource('/foo/bar', 'code', RouteState.Started); + const nodeList = esClients.dataClient.callAsInternalUser.mock.calls[3][1]; + const expected = { + '/foo/bar': { + type: 'code', + state: 1, + node: clusterDoc.nodeName, + }, + }; + expect(nodeList.body.routing_table).toEqual(expected); + + await clusterDoc.unassignResource('/foo/bar'); + const nodeList2 = esClients.dataClient.callAsInternalUser.mock.calls[5][1]; + expect(nodeList2.body.routing_table).toEqual({}); + + await clusterDoc.stop(); +}); diff --git a/x-pack/plugins/proxy/server/cluster_doc_original.ts b/x-pack/plugins/proxy/server/cluster_doc_original.ts new file mode 100644 index 0000000000000..5597bbbdc3a59 --- /dev/null +++ b/x-pack/plugins/proxy/server/cluster_doc_original.ts @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Boom from 'boom'; +import { v4 } from 'uuid'; +import { Observable, Subscription, pairs } from 'rxjs'; +import { first } from 'rxjs/operators'; + +import { PluginInitializerContext, Logger, ClusterClient } from 'src/core/server'; + +import { ProxyPluginType } from './proxy'; +import { RouteState, RoutingNode, RoutingTable, NodeList, randomInt } from './cluster_doc'; + +interface ClusterDoc { + nodes: NodeList; + routing_table: RoutingTable; +} + +export class ClusterDocClient { + public nodeName: string; + private readonly updateFloor = 30 * 1000; // 30 seconds is the fastest you can update + private routingTable: RoutingTable = {}; + private elasticsearch?: Observable; + private updateInterval? = this.updateFloor; + private timeoutThreshold = 15 * 1000; + private timer: null | number = null; + private configSubscription?: Subscription; + private seq_no = 0; + private primary_term = 0; + + private readonly minUpdateShuffle = 0; + private readonly maxUpdateShuffle = 1000; + private readonly proxyIndex = '.kibana'; + private readonly proxyDoc = 'proxy-resource-list'; + private readonly log: Logger; + private readonly config$: Observable; + + constructor(initializerContext: PluginInitializerContext) { + this.nodeName = v4(); + this.config$ = initializerContext.config.create(); + this.log = initializerContext.logger.get('proxy'); + } + + public async setup(esClient: Observable) { + this.elasticsearch = esClient; + this.configSubscription = this.config$.subscribe(config => { + this.setConfig(config); + }); + const config = await this.config$.pipe(first()).toPromise(); + this.setConfig(config); + } + + public async start() { + await this.mainLoop(); + } + + public async stop() { + // stop http service + if (this.timer) { + clearTimeout(this.timer); + } + + const nodes = await this.getNodeList(); + delete nodes[this.nodeName]; + await this.updateNodeList(nodes); + + if (this.configSubscription === undefined) { + return; + } + + this.configSubscription.unsubscribe(); + this.configSubscription = undefined; + } + + public async getRoutingTable(): Promise> { + return Promise.resolve(pairs(this.routingTable)); + } + + public async getNodeForResource(resource: string) { + return Promise.resolve(this.routingTable[resource]); + } + + public async assignResource(resource: string, type: string, state: RouteState, node?: string) { + // getting the ndoe list will refresh the internal routing table to match + // whatever the es doc contains, so this needs to be done before we assign + // the new node to the routing table, or we lose the update + const nodes = await this.getNodeList(); + if (this.routingTable[resource]) { + throw new Error(`${resource} already exists on ${this.routingTable[resource].node}`); + } + const data = { + type, + state, + node: node || this.nodeName, + }; + this.routingTable[resource] = data; + const currentTime = new Date().getTime(); + await this.updateNodeList(this.updateLocalNode(nodes, currentTime)); + } + + public async unassignResource(resource: string) { + const nodes = await this.getNodeList(); + delete this.routingTable[resource]; + const currentTime = new Date().getTime(); + await this.updateNodeList(this.updateLocalNode(nodes, currentTime)); + } + + private setConfig(config: ProxyPluginType) { + let update = randomInt(this.minUpdateShuffle, this.maxUpdateShuffle); + if (config.updateInterval < this.updateFloor) { + update += this.updateFloor; + } else { + update += this.updateFloor; + } + + let timeout = config.timeoutThreshold || this.timeoutThreshold; + if (timeout < update) { + timeout = update + randomInt(this.minUpdateShuffle, this.maxUpdateShuffle); + } + this.updateInterval = update; + this.timeoutThreshold = timeout; + } + + private setTimer() { + if (this.timer) return; + this.log.debug('Set timer to updateNodeMap'); + this.timer = setTimeout(async () => { + this.log.debug('Updating node map'); + await this.mainLoop(); + }, this.updateInterval); + } + + private updateRoutingTable(routingTable: RoutingTable): void { + const currentRoutes = [...Object.keys(this.routingTable)]; + for (const [key, node] of Object.entries(routingTable)) { + this.routingTable[key] = node; + const idx = currentRoutes.findIndex(k => k === key); + if (idx) currentRoutes.splice(idx, 1); + } + + for (const key of currentRoutes.values()) { + delete this.routingTable[key]; + } + } + + private async getNodeList(): Promise { + if (!this.elasticsearch) { + const err = Boom.boomify(new Error('You must call setup first'), { statusCode: 412 }); + throw err; + } + const client = await this.elasticsearch.pipe(first()).toPromise(); + const params = { + id: this.proxyDoc, + index: this.proxyIndex, + _source: true, + }; + const reply = await client.callAsInternalUser('get', params); + this.seq_no = reply._seq_no; + this.primary_term = reply._primary_term; + const data: ClusterDoc = reply._source; + this.updateRoutingTable(data.routing_table || {}); + const nodes: NodeList = data.nodes || {}; + return nodes; + } + + private async updateNodeList(nodes: NodeList): Promise { + if (!this.elasticsearch) { + const err = Boom.boomify(new Error('You must call setup first'), { statusCode: 412 }); + throw err; + } + const doc = { + nodes, + routing_table: this.routingTable, + }; + const client = await this.elasticsearch.pipe(first()).toPromise(); + const params = { + id: this.proxyDoc, + index: this.proxyIndex, + if_seq_no: this.seq_no, + if_primary_term: this.primary_term, + body: doc, + }; + await client.callAsInternalUser('index', params); + } + + private updateLocalNode(nodes: NodeList, finishTime: number): NodeList { + nodes[this.nodeName] = { + lastUpdate: finishTime, + }; + return nodes; + } + + private removeNode(node: string) { + for (const [resource, data] of Object.entries(this.routingTable)) { + if (data.node === node) { + delete this.routingTable[resource]; + } + } + } + + private async mainLoop(): Promise { + const nodes = await this.getNodeList(); + const finishTime = new Date().getTime(); + + for (const [key, node] of Object.entries(nodes)) { + const timeout = finishTime - node.lastUpdate; + if (!node || timeout > this.timeoutThreshold) { + this.log.warn(`Node ${key} has not updated in ${timeout}ms and has been dropped`); + this.removeNode(key); + delete nodes[key]; + } + } + + try { + await this.updateNodeList(this.updateLocalNode(nodes, finishTime)); + } catch (err) { + // on conflict, skip until next loop + if (err.statusCode !== 409) { + throw err; + } + } + this.setTimer(); + } +} diff --git a/x-pack/plugins/proxy/server/fs.mock.ts b/x-pack/plugins/proxy/server/fs.mock.ts new file mode 100644 index 0000000000000..797f6bb63e984 --- /dev/null +++ b/x-pack/plugins/proxy/server/fs.mock.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const mockReadFile = jest.fn(); +jest.mock('fs', () => ({ readFile: mockReadFile })); diff --git a/x-pack/plugins/proxy/server/index.ts b/x-pack/plugins/proxy/server/index.ts new file mode 100644 index 0000000000000..a2e3ca87c6b3b --- /dev/null +++ b/x-pack/plugins/proxy/server/index.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { PluginInitializer } from 'src/core/server'; +import { ProxyConfig, ProxyService, ProxyServiceSetup, ProxyServiceStart } from './proxy'; +export { ProxyPluginType } from './proxy'; +export { RouteState } from './cluster_doc'; + +export const config = ProxyConfig; +export const plugin: PluginInitializer< + ProxyServiceSetup | undefined, + ProxyServiceStart | undefined +> = initializerContext => new ProxyService(initializerContext); diff --git a/x-pack/plugins/proxy/server/painless_queries.ts b/x-pack/plugins/proxy/server/painless_queries.ts new file mode 100644 index 0000000000000..0951d3d168349 --- /dev/null +++ b/x-pack/plugins/proxy/server/painless_queries.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const unassignResource = ` +if (ctx['source'].containsKey(resource)) { + ctx['source'].remove(resource); +} +`; + +export const updateHeartbeat = ` +ctx['_source'].put(resource, ctx['_source'].getOrDefault(resource, 0) + 1); +`; + +export const removeHeartbeat = ` +if (ctx['_source'].containsKey(resource)) { + ctx['_source'].remove(resource); +} +`; + +export const cullDeadResources = ` +for (node in ctx['_source'].entrySet()) { + data = node.getValue(); + key = node.getKey(); + if (!nodes.contains(data.get('node'))) { + if (data.get('state') == routeClosing) { + ctx['_source'].remove(key); + } else { + data.put('state', routeClosing) + ctx['_source'].put(key, data) + } + } else { + if (data.get('state') == routeClosing) { + data.put('state', routeStarted) + ctx['_source'].put(key, data) + } + } +} +`; + +export const cullDeadNodes = ` +for (node in ctx['_source'].entrySet()) { + if (node.getValue() == nodeList[key]) { + key = node.getKey(); + ctx['_source'].remove(key) + } +} +`; diff --git a/x-pack/plugins/proxy/server/proxy.test.ts b/x-pack/plugins/proxy/server/proxy.test.ts new file mode 100644 index 0000000000000..f1fc6bc370363 --- /dev/null +++ b/x-pack/plugins/proxy/server/proxy.test.ts @@ -0,0 +1,268 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { Observable, BehaviorSubject } from 'rxjs'; +import { noop } from 'lodash'; + +import { + Config, + ConfigService, + Env, + ObjectToConfigAdapter, +} from '../../../../src/core/server/config'; +import { httpServiceMock } from '../../../../src/core/server/http/http_service.mock'; +import { loggingServiceMock } from '../../../../src/core/server/logging/logging_service.mock'; +import { getEnvOptions } from '../../../../src/core/server/config/__mocks__/env'; +import { elasticsearchServiceMock } from '../../../../src/core/server/elasticsearch/elasticsearch_service.mock'; +import { httpServerMock } from '../../../../src/core/server/http/http_server.mocks'; +import { KibanaRequest } from '../../../../src/core/server/http/router/request'; + +const mockWreck = { + request: jest.fn(), +}; + +jest.mock('wreck', () => ({ + defaults: () => mockWreck, + read: jest.fn(), +})); + +import { mockReadFile } from './fs.mock'; +import { mockClusterDocClient } from './cluster_doc.test.mock'; +import { ProxyService, ProxyConfig, ProxyPluginType } from './proxy'; +import { RouteState } from './cluster_doc'; + +const logger = loggingServiceMock.create(); +const env = Env.createDefault(getEnvOptions()); + +const createConfigService = (value: Partial = {}) => { + const conf = Object.assign( + { + updateInterval: 0, + port: 0, + maxRetry: 1, + cert: '', + key: '', + ca: '', + }, + value + ); + const cs = new ConfigService( + new BehaviorSubject( + new ObjectToConfigAdapter({ + xpack: { + proxy: conf, + }, + }) + ), + env, + logger + ); + cs.setSchema('xpack.proxy', ProxyConfig.schema); + return cs; +}; + +function configService(value: Partial) { + return { + create: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + createIfExists: () => + createConfigService(value).atPath('xpack.proxy') as Observable, + }; +} + +beforeEach(() => {}); + +afterEach(() => { + jest.clearAllMocks(); +}); + +test('no ssl, no server', async () => { + const clusterDocClient = { + setup: jest.fn(), + start: jest.fn(), + stop: noop, + }; + + mockClusterDocClient.mockImplementation(() => clusterDocClient); + const elasticClient = elasticsearchServiceMock.createSetupContract(); + const httpService = httpServiceMock.createSetupContract(); + + const core = { + elasticsearch: elasticClient, + http: httpService, + }; + + mockReadFile.mockImplementation((x, cb) => cb(new Error('foo'))); + + const proxy = new ProxyService({ config: configService({}), env, logger }); + try { + await proxy.setup(core, {}); + } catch (err) { + expect(err.message).toBe('You must provide valid paths for cert, key and ca'); + } + + await proxy.stop(); +}); + +test('creates and sets up proxy server', async () => { + const clusterDocClient = { + setup: jest.fn(), + start: jest.fn(), + stop: noop, + }; + + mockClusterDocClient.mockImplementation(() => clusterDocClient); + const elasticClient = elasticsearchServiceMock.createSetupContract(); + const httpService = httpServiceMock.createSetupContract(); + + const core = { + elasticsearch: elasticClient, + http: httpService, + }; + + mockReadFile.mockImplementation((x, cb) => cb(null, Buffer.from('foo'))); + + const proxy = new ProxyService({ config: configService({}), env, logger }); + await proxy.setup(core, {}); + + expect(clusterDocClient.setup.mock.calls.length).toBe(1); + expect(httpService.createNewServer.mock.calls.length).toBe(1); + expect(mockReadFile.mock.calls.length).toBe(3); + const passedConfig = httpService.createNewServer.mock.calls[0][1]; + expect(passedConfig).toBeTruthy(); + expect(passedConfig.certificate).toBe('foo'); + + const proxyStart = await proxy.start(); + + expect(clusterDocClient.start.mock.calls.length).toBe(1); + expect(proxyStart).toBeTruthy(); + + await proxy.stop(); +}); + +test('handles allocate and unallocate', async () => { + const clusterDocClient = { + setup: jest.fn(), + start: jest.fn(), + assignResource: jest.fn(), + unassignResource: jest.fn(), + stop: noop, + }; + + mockClusterDocClient.mockImplementation(() => clusterDocClient); + const elasticClient = elasticsearchServiceMock.createSetupContract(); + const httpService = httpServiceMock.createSetupContract(); + + const core = { + elasticsearch: elasticClient, + http: httpService, + }; + + mockReadFile.mockImplementation((x, cb) => cb(null, Buffer.from('foo'))); + + const proxy = new ProxyService({ config: configService({}), env, logger }); + await proxy.setup(core, {}); + const proxyStart = await proxy.start(); + expect(proxyStart).toBeTruthy(); + await proxyStart!.assignResource('/foo/bar', 'code', RouteState.Started, proxy.nodeName); + await proxyStart!.unassignResource('/foo/bar'); + expect(clusterDocClient.assignResource.mock.calls.length).toBe(1); + expect(clusterDocClient.unassignResource.mock.calls.length).toBe(1); + + await proxy.stop(); +}); + +test('proxy resource', async () => { + const clusterDocClient = { + setup: jest.fn(), + start: jest.fn(), + assignResource: jest.fn(), + unassignResource: jest.fn(), + getNodeForResource: jest.fn(() => ({ + type: 'code', + state: RouteState.Started, + node: 'beep', + })), + stop: noop, + }; + + mockClusterDocClient.mockImplementation(() => clusterDocClient); + const elasticClient = elasticsearchServiceMock.createSetupContract(); + const httpService = httpServiceMock.createSetupContract(); + + const core = { + elasticsearch: elasticClient, + http: httpService, + }; + + mockReadFile.mockImplementation((x, cb) => cb(null, Buffer.from('foo'))); + + const proxy = new ProxyService({ config: configService({}), env, logger }); + await proxy.setup(core, {}); + const proxyStart = await proxy.start(); + expect(proxyStart).toBeTruthy(); + await proxyStart!.assignResource('/foo/bar', 'code', RouteState.Started, proxy.nodeName); + const agent = proxyStart!.proxyResource('/foo/bar'); + const r = httpServerMock.createRawRequest({ + headers: {}, + url: new URL('https://beep/foo/bar'), + path: '/foo/bar', + method: 'get', + }); + const req = KibanaRequest.from(r); + + // everything is ok + await agent(req); + expect(clusterDocClient.getNodeForResource.mock.calls.length).toBe(1); + expect(mockWreck.request.mock.calls.length).toBe(1); + + await proxyStart!.proxyRequest(req, '/foo/bar'); + expect(clusterDocClient.getNodeForResource.mock.calls.length).toBe(2); + expect(mockWreck.request.mock.calls.length).toBe(2); + + // node does not exist + clusterDocClient.getNodeForResource = jest.fn().mockReturnValue(undefined); + + try { + await agent(req); + } catch (err) { + // this gets reset when we return a new value ¯\_(ツ)_/¯ + expect(clusterDocClient.getNodeForResource.mock.calls.length).toBe(1); + expect(err.message).toBe('No node was found for resource /foo/bar'); + } + + // node is closed + clusterDocClient.getNodeForResource = jest.fn().mockReturnValue({ + type: 'code', + state: RouteState.Closed, + node: 'beep', + }); + + try { + await agent(req); + } catch (err) { + expect(clusterDocClient.getNodeForResource.mock.calls.length).toBe(1); + expect(err.message).toBe('No node was found for resource /foo/bar'); + } + + // wreck error + mockWreck.request = jest.fn(async () => { + throw new Error('robots'); + }); + + // node is closed + clusterDocClient.getNodeForResource = jest.fn().mockReturnValue({ + type: 'code', + state: RouteState.Started, + node: 'beep', + }); + + try { + await agent(req); + } catch (err) { + expect(clusterDocClient.getNodeForResource.mock.calls.length).toBe(1); + expect(err.message).toBe('Unable to complete request to beep for /foo/bar because robots'); + } +}); diff --git a/x-pack/plugins/proxy/server/proxy.ts b/x-pack/plugins/proxy/server/proxy.ts new file mode 100644 index 0000000000000..35a4af4168812 --- /dev/null +++ b/x-pack/plugins/proxy/server/proxy.ts @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { Agent as HTTPSAgent } from 'https'; +import { Agent as HTTPAgent } from 'http'; +import { URL } from 'url'; +import { promisify } from 'util'; +import { readFile } from 'fs'; +import crypto from 'crypto'; + +// `crypto` type definitions doesn't currently include `crypto.constants`, see +// https://github.com/DefinitelyTyped/DefinitelyTyped/blob/fa5baf1733f49cf26228a4e509914572c1b74adf/types/node/v6/index.d.ts#L3412 +const cryptoConstants = (crypto as any).constants; + +const readFileAsync = promisify(readFile); + +import { Observable } from 'rxjs'; +import { first } from 'rxjs/operators'; +import Wreck from 'wreck'; +import { schema, TypeOf } from '@kbn/config-schema'; + +import { + Plugin, + PluginInitializerContext, + Logger, + CoreSetup, + KibanaRequest, + HttpServiceSetup, +} from '../../../../src/core/server'; + +import { RouteState, RoutingNode } from './cluster_doc'; +import { ClusterDocClient } from './cluster_doc'; + +// When we upgrade to typescript 3.5 we can remove this +type Omit = Pick>; + +export interface ProxyServiceSetup { + httpSetup: Omit; +} + +export interface ProxyServiceStart { + assignResource: ( + resource: string, + type: string, + state: RouteState, + node?: string + ) => Promise; + unassignResource: (resource: string) => Promise; + proxyResource: (resource: string) => (req: KibanaRequest) => Promise; + proxyRequest: (req: KibanaRequest, resource: string) => Promise; + getAllocation: () => Promise>; +} + +export const ProxyConfig = { + schema: schema.object({ + updateInterval: schema.number({ defaultValue: 1500 }), + timeoutThreshold: schema.maybe(schema.number({ defaultValue: 1500 })), + port: schema.number({ defaultValue: 5602 }), + maxRetry: schema.number({ defaultValue: 0 }), + cert: schema.maybe(schema.string()), + key: schema.maybe(schema.string()), + ca: schema.maybe(schema.string()), + cipherSuites: schema.arrayOf(schema.string(), { + defaultValue: cryptoConstants.defaultCoreCipherList.split(':'), + }), + supportedProtocols: schema.arrayOf( + schema.oneOf([schema.literal('TLSv1'), schema.literal('TLSv1.1'), schema.literal('TLSv1.2')]), + { defaultValue: ['TLSv1.1', 'TLSv1.2'], minSize: 1 } + ), + }), +}; + +export type ProxyPluginType = TypeOf; + +export class ProxyService + implements Plugin { + public nodeName: string; + private clusterDocClient: ClusterDocClient; + private port = 0; + + private httpsAgent: HTTPSAgent = new HTTPSAgent({ keepAlive: true }); + private httpAgent: HTTPAgent = new HTTPAgent({ keepAlive: true }); + private allowUnauthAgent: HTTPAgent = new HTTPAgent({ keepAlive: true }); + private wreck: typeof Wreck = Wreck; + private disabled: boolean = false; // if a user doesn't provide SSL configuration, we need to skip running the proxy + private readonly log: Logger; + private readonly config$: Observable; + + constructor(initializerContext: PluginInitializerContext) { + this.config$ = initializerContext.config.create(); + this.log = initializerContext.logger.get('proxy'); + this.clusterDocClient = new ClusterDocClient(initializerContext); + this.nodeName = this.clusterDocClient.nodeName; + } + + public async setup(core: CoreSetup, plugins: {}) { + await this.clusterDocClient.setup(core.elasticsearch.dataClient$); + const config = await this.config$.pipe(first()).toPromise(); + this.setConfig(config); + + try { + const ssl = await this.configureSSL(config); + this.wreck = Wreck.defaults({ + agent: { + https: this.httpsAgent, + http: this.httpAgent, + httpsAllowUnauthorized: this.allowUnauthAgent, + }, + }); + + const httpSetup = await core.http.createNewServer(config.port, ssl); + + const setup: ProxyServiceSetup = { + httpSetup, + }; + + return setup; + } catch (err) { + this.disabled = true; + return; + } + } + + private async configureSSL(config: ProxyPluginType) { + let tlsCert; + let tlsKey; + let tlsCa; + + if (!config.cert || !config.key) { + this.log.warn( + 'Unable to read SSL cerificate information: configuration was missing cert or key' + ); + throw new Error('You must provide valid paths for cert, key and ca'); + } + + const tasks = [readFileAsync(config.cert), readFileAsync(config.key)]; + + if (config.ca) { + tasks.push(readFileAsync(config.ca)); + } + try { + [tlsCert, tlsKey, tlsCa] = await Promise.all(tasks); + } catch (err) { + this.log.warn('Unable to read SSL cerificate information', err); + throw new Error('You must provide valid paths for cert, key and ca'); + } + + this.httpsAgent = new HTTPSAgent({ + keepAlive: true, + cert: tlsCert, + key: tlsKey, + ca: tlsCa, + }); + + const ssl = { + enabled: true, + redirectHttpFromPort: this.port, + certificate: tlsCert.toString(), + key: tlsKey.toString(), + certificateAuthorities: [tlsCa.toString()], + cipherSuites: config.cipherSuites, + keyPassphrase: undefined, + supportedProtocols: config.supportedProtocols, + requestCert: true, + }; + return ssl; + } + + private setConfig(config: ProxyPluginType) { + this.port = config.port; + } + + public async start() { + if (this.disabled) return; + await this.clusterDocClient.start(); + const start: ProxyServiceStart = { + assignResource: this.assignResource.bind(this), + unassignResource: this.unassignResource.bind(this), + proxyResource: this.proxyResource.bind(this), + proxyRequest: this.proxyRequest.bind(this), + getAllocation: this.getAllocation.bind(this), + }; + return start; + } + + public async stop() { + await this.clusterDocClient.stop(); + } + + public async assignResource( + resource: string, + type: string, + state: RouteState, + node?: string + ): Promise { + await this.clusterDocClient.assignResource(resource, type, state, node); + } + + public async unassignResource(resource: string) { + await this.clusterDocClient.unassignResource(resource); + } + + public proxyResource(resource: string): (req: KibanaRequest) => Promise { + return (req: KibanaRequest) => { + return this.proxyRequest(req, resource); + }; + } + + // @TODO update to allow passing of request parametsrs + public async proxyRequest(req: KibanaRequest, resource?: string, retryCount = 0): Promise { + const method = req.route.method; + const url = new URL(req.url.toString()); + const headers = req.headers; + const body = req.body; + resource = resource || url.pathname; + const node = await this.clusterDocClient.getNodeForResource(resource); + + if (!node || node.state === RouteState.Closed) { + const msg = `No node was found for resource ${resource}`; + this.log.debug(msg); + throw new Error(msg); + } + + if (node.state === RouteState.Initializing) { + throw new Error(`Node ${node.node} is initializing`); + } + if (node.state === RouteState.Closing) { + throw new Error(`Node ${node.node} is closing.`); + } + + url.hostname = node.node; + try { + const opts = { + headers, + payload: body, + }; + const res = await this.wreck.request(method, url.toString(), opts); + const data = Wreck.read(res, {}); + return data; + } catch (err) { + const msg = `Unable to complete request to ${node.node} for ${resource} because ${err.message}`; + this.log.warn(msg); + throw new Error(msg); + } + } + + public async getAllocation() { + return await this.clusterDocClient.getRoutingTable(); + } +} diff --git a/x-pack/plugins/proxy/syncing.md b/x-pack/plugins/proxy/syncing.md new file mode 100644 index 0000000000000..37fdd8b12ce08 --- /dev/null +++ b/x-pack/plugins/proxy/syncing.md @@ -0,0 +1,169 @@ +# Client Syncing Strategy + +A proposal for dealing with syncing conflicts on the proxy project. + +- Version: Draft +- Date: 19/06/2019 +- Authors + - Todd Kennedy + +## Abstract + +This document to meant to describe a strategy for conflict-free updating of a shared document stored in elasticsearch, allowing for eventual consistency of the shared document across a number of client nodes. Rather than trying to cache the document locally and dealing with concerns over editing a stale local document state (and conflict that will arise from that), we should leverage some of the facilities built into elasticsearch. + +## Introduction + +The editing of shared documents across multiple sites in real-time is a well-explored part of computer science which has resulted in the establishment of two main protocols for syncing: conflict-free replicated data types (CRDTs) and operational transform (OT). These two protocols are designed to handle real time editing of shared documents and require either peer-to-peer connections or a centralized server to coordinate the changes. + +Given the time constraints presented, these protocols are out of reach for this implementation, so we need to design another method to ensure that these clients are able to stay in synchronization with one another and update the shared document in a conflict-free manner. + +In general the problem exists because each client needs to grab the latest version of the document from the server, update it locally and then put it back to the server. If the document has been changed in the meantime though, this results in an error on the out-of-sync client. + +
                                                                               
+                                                                               
+                           ┌─────────────────┐              ┌─────────────────┐
+                           │     Clients     │              │     Server      │
+                           └────────┬────────┘              └────────────┬────┘
+                                    │                                    │     
+                                    │client 1 -> get doc                 │     
+                                    ├───────────────────────────────────▶│     
+                                    │                                    │     
+                                    │      client 1 <- send doc version 1│     
+            ┌───────────────────────┼────────────────────────────────────│     
+            ▼                       │                                    │     
+ ┌────────────────────┐             │ client 2 -> get doc                │     
+ │client 1            │             ├───────────────────────────────────▶│     
+ │update internal doc │             │                                    │     
+ │version 1+X         │             │      client 2 <- send doc version 1│     
+ └────────────────────┘┌────────────┼────────────────────────────────────│     
+            │          ▼            │                                    │     
+            ├────────────────────┐  │                                    │     
+            │client 2            │  │client 2 -> update doc version 1+   │     
+            ├────────────────────┼──▶───────────────────────────────────▶│     
+            │version 1+Y         │  │                                    │     
+            ├────────────────────┘  │  client 2 <- accept doc version 1+Y│     
+            │                       ◀────────────────────────────────────│     
+            │                       │                                    │     
+            │                       │                                    │     
+            │                       │client 1 -> update doc version 1+X  │     
+            └───────────────────────▶───────────────────────────────────▶│     
+                                    │                                    │     
+                                    │            client 1 <- 409 conflict│     
+                                    ◀────────────────────────────────────│     
+                                    │                                    │     
+                                    │client 1 -> get doc                 │     
+                                    ├───────────────────────────────────▶│     
+                                    │                                    │     
+                                    │    client 1 <- send doc version 1+Y│     
+            ┌───────────────────────┼────────────────────────────────────│     
+            ▼                       │                                    │     
+ ┌────────────────────┐             │ client 2 -> get doc                │     
+ │client 1            │             ├───────────────────────────────────▶│     
+ │UPDATE CONFLICT!    │             │                                    │     
+ │                    │             │    client 2 <- send doc version 1+Y│     
+ └────────────────────┘ ┌───────────┼────────────────────────────────────│     
+                        ▼           │                                    │     
+             ┌────────────────────┐ │                                    │     
+             │client 2            │ │                                    │     
+             │NOOP                │ │                                    │     
+             │version 1+Y         │ │                                    │     
+             └────────────────────┘ │                                    │     
+                                    │                                    │     
+                                    │                                    │     
+                                    │                                    │     
+                                    │                                    │     
+                                    │                                    │     
+                                    ▼                                    ▼     
+
+ +We could figure out a merge algorithm (first in wins would be appropriate for this use-case), mark the failed client as being out sync, and require it to attempt to update again. Due to the variable number of clients and the time in which processing can take, this scenario could lead us to a dead client (one that is unable to respond because it is never up-to-date), which would also have information locally that it is unable to share. + +By dropping the real time requirement of this, we _should_ be able to approximate a consensus on shared state. Alternatively we can implement an on-demand retrieval of cluster routes. + +## Real time document retrieval + +One of the properties of how this proxy is designed is that there are two separate documents that make up the information we need: the heartbeat (or liveness) document that explains which nodes exist and when the last time they checked in was, and the cluster routing doc that explains which of these nodes is responsible for a given resource. + +If we split this combined document into two separate ones, we can see that the routing document recieves much fewer updates, especially with regards to concurrency. If we don't store this document locally, we can utilize the elasticsearch partial update API to add new routing table entries without ever having to know what the entire document looks like, with deletes occurring via `painless` script queries. If there happens to be a conflict, we can ask the client to retry the query, as the lower frequency of updates should help ensure that the change is able to be applied quickly. + +The heartbeat doc will receive frequent updates as the notes are required to note that they are available to the system. In this case, the document will be stored locally, but we will still use partial updates to note that a node is available by updating it's unique time. To handle the culling of nodes that have timed out, each node will run a `painless` script query asking elasticsearch to remove entries that haven't been updated since the timeout threshold. This query is bound to conflict, but since it's just dropping nodes that have timed out, we only need one of these updates to succeed at a given time. + +Upon proxy request, the routing table will be retrieved from elasticsearch and compared with the liveness table. If the node that is responsible for a given resource is still alive, the request will be made, otherwise an error will be returned. + +The heartbeat doc consists of key/value pairs, where the key is the name of the node, and the value is a monotonically increasing integer. Every time the node checks in, it'll increase this integer by one. On every second heartbeat loop, we will check to see what has been updated. If a node hasn't updated it's integer in two cycles, we will move it to a new `Closing` state. If, on the next pass, it is stil closing, then we will drop that node and it's resources. If it has updated again, we will move the resources back into the `Started` state. + +### Issues + +- Using `painless` makes this system hard to test; there is no REPL nor syntax-checker available, so testing can only be accomplished via integration tests +- There still exists potential for `conflict` on updates; we are no longer maintanining a local document state so we have removed a major source for it, but two nodes attempting to update at the same time will still generate a conflict. Ideally the conflict is modestly irrelevant since the potential exists when running generic updates to cull deadnotes +- The complexity of this system to detect and remove dead nodes is much higher than the original system. +- Additional latency is involved since the routing document must be retrieved upon each request + +## Sync Order Traversal + +_note: this solution has been deprecated in favor of real time document retrieval_ + +A proposed solution here is to only allow one client to sync at a time, and to run the synchronization list in order, not allowing a resource to be made externally available until all the clients have seen a resource once. + +The clients will sync in the order of the oldest synced client going first, with alphabetical order of nodes breaking ties. + +
                                                                               
+    ┌───doc version 0──┐       ┌───doc version 1──┐      ┌───doc version 2──┐  
+    │    RESOURCES     │       │    RESOURCES     │      │    RESOURCES     │  
+    │******************│       │******************│      │******************│  
+    │                  │       │Resource A        │      │Resource A        │  
+    │                  │       │- node a          │      │- node a          │  
+    │                  │       │- added version 1 │      │- added version 1 │  
+    │                  │       │- closed          │      │- closed          │  
+    │                  │       │                  │      │                  │  
+    │      NODES       │       │      NODES       │      │      NODES       │  
+    │******************│       │******************│      │******************│  
+    │    node a        │       │    node b        │      │    node c        │  
+    │  - version 0     │       │  - version 0     │      │  - version 0     │  
+    │    node b        │       │    node c        │      │    node a        │  
+    │  - version 0     │       │  - version 0     │      │  - version 1     │  
+    │    node c        │       │    node a        │      │    node b        │  
+    │  - version 0     │       │  - version 1     │      │  - version 2     │  
+    │                  │       │                  │      │                  │  
+    └──────────────────┘       └──────────────────┘      └──────────────────┘  
+                                                                               
+                                                                               
+    ┌───doc version 3──┐                                                       
+    │    RESOURCES     │                                                       
+    │******************│                                                       
+    │Resource A        │                                                       
+    │- node a          │                                                       
+    │- added version 1 │                                                       
+    │- open            │                                                       
+    │                  │                                                       
+    │      NODES       │                                                       
+    │******************│                                                       
+    │    node a        │                                                       
+    │  - version 1     │                                                       
+    │    node b        │                                                       
+    │  - version 2     │                                                       
+    │    node c        │                                                       
+    │  - version 3     │                                                       
+    │                  │                                                       
+    └──────────────────┘                                                       
+  
+ +In this example, `Node A` adds `Resource A` and generates version 1. It then moves itself to the end of the sync order. `Node B` has no updates to the resources, but updates that it now knows about `version 1` (and moves itself), then `Node C` updates to say that it has no new resources, but it now is on `version 3` (and moves itself, making `Node A` the oldest and top-most client). Since the oldest client version is >= the version the resource was added in, all of the proxies are able to route data to this resource. + +Until this happens, clients will not be able to route to these resources to prevent clients from having inconsistent results where some nodes are able to proxy resources but some nodes are not able to. + +Conflicts are handled by a `first-in wins` strategy -- if a later node also attempts to mark itself as being responsible for a resource, it will lose, and update its routing table to point to the existing node. + +### Initialization handshake + +When the system starts for the first time, all the nodes in the system will attempt to connect and initialize the system by updating the document adding themselves. This will either result in a successful update, or a conflict. Upon conflict, the node will stop trying to update the document, and then follow the adding a new node strategy. + +### Adding a new node + +When a new node wants to connect, it will get the document from the server and send a message to the oldest node on the list asking it to include it in the document, and then iterate on pulling down the document. When the node appears in the document it means that the node is now ready to be used. + +### Issues + +- The amount of time it takes for a resource to be made available is the `updateInterval * n` where `n` is the number of clients. We can make the `updateInterval` smaller and allow clients to check more frequently, but with large numbers of nodes in a cluster this will cause delay before making a resource available for routing purposes. +- Processing the document from elasticsearch will take longer as we will have to check the validity of resources against the lowest document version available. +- This system relies on properly synchronized clocks across the nodes. diff --git a/x-pack/tsconfig.json b/x-pack/tsconfig.json index 11983b9db9ccd..60730fb70a82a 100644 --- a/x-pack/tsconfig.json +++ b/x-pack/tsconfig.json @@ -8,36 +8,17 @@ "plugins/**/*", "test_utils/**/*" ], - "exclude": [ - "test/**/*", - "legacy/plugins/siem/cypress/**/*", - "**/typespec_tests.ts" - ], + "exclude": ["test/**/*", "legacy/plugins/siem/cypress/**/*", "**/typespec_tests.ts"], "compilerOptions": { "outDir": ".", "paths": { - "ui/*": [ - "src/legacy/ui/public/*" - ], - "plugins/xpack_main/*": [ - "x-pack/legacy/plugins/xpack_main/public/*" - ], - "plugins/security/*": [ - "x-pack/legacy/plugins/security/public/*" - ], - "plugins/spaces/*": [ - "x-pack/legacy/plugins/spaces/public/*" - ], - "test_utils/*": [ - "x-pack/test_utils/*" - ], - "monitoring/common/*": [ - "x-pack/monitoring/common/*" - ] + "ui/*": ["src/legacy/ui/public/*"], + "plugins/xpack_main/*": ["x-pack/legacy/plugins/xpack_main/public/*"], + "plugins/security/*": ["x-pack/legacy/plugins/security/public/*"], + "plugins/spaces/*": ["x-pack/legacy/plugins/spaces/public/*"], + "test_utils/*": ["x-pack/test_utils/*"], + "monitoring/common/*": ["x-pack/monitoring/common/*"] }, - "types": [ - "node", - "jest" - ] + "types": ["node", "jest"] } } diff --git a/yarn.lock b/yarn.lock index 59ce1d873be34..c0309d68ba950 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3830,6 +3830,19 @@ "@types/podium" "*" "@types/shot" "*" +"@types/hapi__boom@*": + version "7.4.0" + resolved "https://registry.yarnpkg.com/@types/hapi__boom/-/hapi__boom-7.4.0.tgz#81d58cffb67c54896614560c007a5015a79c30ba" + integrity sha512-i4nerd/aJd5zW8OPHxgeuDYiuiuHxHczkqIaALhINUqBCavRgbiwjfndpgrarB43jcb/2LxzrohMOH7akyt3og== + +"@types/hapi__wreck@^15.0.0": + version "15.0.0" + resolved "https://registry.yarnpkg.com/@types/hapi__wreck/-/hapi__wreck-15.0.0.tgz#73f8fe182799d6e5f6b8f67c09935125605a5d91" + integrity sha512-6vTM0cDzKWp0N/qurjgpfCbFOVrnxGpD02rJvmXfMnLLPA5SU45ou039GqV+gFdSmvt73sPoUCbbkhW4ZhrsIA== + dependencies: + "@types/hapi__boom" "*" + "@types/node" "*" + "@types/has-ansi@^3.0.0": version "3.0.0" resolved "https://registry.yarnpkg.com/@types/has-ansi/-/has-ansi-3.0.0.tgz#636403dc4e0b2649421c4158e5c404416f3f0330" @@ -4564,6 +4577,15 @@ resolved "https://registry.yarnpkg.com/@types/wrap-ansi/-/wrap-ansi-2.0.14.tgz#5afbdd8374de9ff8ad752cb03ab9f225f7c2ee24" integrity sha1-Wvvdg3Ten/itdSywOrnyJffC7iQ= +"@types/wreck@^14.0.0": + version "14.0.0" + resolved "https://registry.yarnpkg.com/@types/wreck/-/wreck-14.0.0.tgz#8bf39fd789d32af63176bee5eb5705108cd8be5e" + integrity sha512-cxA8o5fGbXg2e/UUoA7z8lMaEw4CwxvJW1Fv+E8xP/n2MOzRdeQUX+e7aUxgr1ganECiIQXUka0OwwKR2ixo1w== + dependencies: + "@types/boom" "*" + "@types/events" "*" + "@types/node" "*" + "@types/write-pkg@^3.1.0": version "3.1.0" resolved "https://registry.yarnpkg.com/@types/write-pkg/-/write-pkg-3.1.0.tgz#f58767f4fb9a6a3ad8e95d3e9cd1f2d026ceab26"