-
Notifications
You must be signed in to change notification settings - Fork 8
Storage engines
WT1 features pluggable storage engines, allowing you to select an implementation suitable to your requirements.
Storage engines are configured in the config.properties
file.
The Java class to use is specified in the storage.class
parameter.
Specific configuration for each engine must be put in storage.params.PARAMNAME
parameter. The available PARAMNAME
s depend on the selected storage engine.
File config-sample.properties
contains a sample configuration for each of the storage engines except Flume. File config-flume.properties
contains a sample configuration for the Flume storage engine.
With this storage, the events are reported in flat CSV files on the local filesystem of the servlet container. The files are compressed using GZip.
The backend server creates a new file when the previous one reaches a configurable size (before compression) or when a configurable time interval has elapsed and an event is received.
The files have a naming schema like ROOT_DIR/YYYY/MM/DD/HH/MM-SS.log.gz
when closed. Files being written have a temporary name starting with an underscore (_) character, so they can be ignored by processes reading the output directories. These temporary files are not removed on abnormal termination, and any partially-written data they may contain may be manually retrieved if needed.
The local filesystem storage cannot be used on the GAE application implementation
The class for this storage is com.dataiku.wt1.storage.FSStorageProcessor
Parameter name | Description |
---|---|
rootDir |
Absolute path to the folder where the logs must be stored. |
newFileTriggerSize |
(Optional) Size in bytes, which, when reached by the log data (before compression) triggers the creation of a new file. Default: 1 MB |
newFileTriggerInterval |
(Optional) Interval in seconds, which, when elapsed, triggers the creation of a new file on the next event received. If this parameter is zero, interval-based file creation is disabled. Default: 0 |
storage.class=com.dataiku.wt1.storage.FSStorageProcessor
storage.params.rootDir=/data/tracker/
# Create a new file after 100MB of tracking data or at least once a day
storage.params.newFileTriggerSize=100000000
storage.params.newFileTriggerInterval=86400
With this storage, the events are reported in flat CSV files on the Google Cloud Storage service.The files are compressed using GZip.
CAUTION: This storage engine can ONLY be used when using the GAE backend server implementation
The backend server creates a new file when the previous one reaches a configurable size (before compression) or when a configurable time interval has elapsed and an event is received.
The files have a naming schema like BUCKET/YYYY/MM/DD/HH/MM-SS-BACKEND_ID.log.gz
The GCS-for-GAE processor does not require any authentication. This task is performed by the GAE infrastructure, which automatically authentifies to GCS.
Please read the Google documentation for configuring your GAE project to identify to GCS: Google Storage Prerequisites (Steps 2 to 5)
The class for this storage is `com.dataiku.wt1.storage.GCSGAEStorageProcessor``
Parameter name | Description |
---|---|
bucketName |
The GCS bucket name where the logs must be stored. |
newFileTriggerInterval |
(Optional) Interval in seconds, which, when elapsed, triggers the creation of a new file on the next event received. If this parameter is zero, interval-based file creation is disabled. Default: 0 |
newFileTriggerSize |
(Optional) Size in bytes, which, when reached by the log data (before compression) triggers the creation of a new file. Default: 1 MB |
storage.class=com.dataiku.wt1.storage.GCSGAEStorageProcessor
storage.params.bucketName=my-gcs-bucket
# Create a new file after 100MB of tracking data or at least once a day
storage.params.newFileTriggerSize=100000000
storage.params.newFileTriggerInterval=86400
With this storage, the events are reported in flat CSV files on the Amazon S3 service.The files are compressed using GZip.
To avoid data download costs, we recommend to use this storage engine if you plan on using the AWS infrastructure to perform analytics (for example, by using Elastic MapReduce)
The backend server creates a new file when the previous one reaches a configurable size (before compression) or when a configurable time interval has elapsed and an event is received.
The files have a naming schema like BUCKET/YYYY/MM/DD/HH/MM-SS[-INSTANCE_ID].log.gz
The storage engine uses the regular AWS Access Key / Secret Key pair. Your access and secret keys are available in the "Security Credentials" page of your AWS console.
We strongly recommend that you create a dedicated IAM role, with only write access on the S3 bucket that you select for the tracking logs.
For more information, please see the AWS IAM documentation.
The class for this storage is com.dataiku.wt1.storage.S3StorageProcessor
Parameter name | Description |
---|---|
bucketName |
The S3 bucket name where the logs must be stored. |
accessKey |
Your AWS access key |
secretKey |
Your AWS secret key |
newFileTriggerSize |
(Optional) Size in bytes, which, when reached by the log data (before compression) triggers the creation of a new file. Default: 1 MB |
newFileTriggerInterval |
(Optional) Interval in seconds, which, when elapsed, triggers the creation of a new file on the next event received. If this parameter is zero, interval-based file creation is disabled. Default: 0 |
tmpFolder |
(Optional) Folder on the local server where temporary files will be stored before upload to S3. The folder should have enough space to hold one full file before its upload. If omitted, the system's temporary storage will be used |
instanceId |
(Optional) If multiple WT1 instances are configured to write to the same S3 bucket, use this to have each instance write to its own file names |
storage.class=com.dataiku.wt1.storage.S3StorageProcessor
storage.params.bucketName=my-s3-bucket
storage.params.accessKey=AKIAD4VMZYLLDPZF6CNE
storage.params.secretKey=/Z/81CIOdjk123/hFPJzuhEUIdfjkWO3z
# Create a new file after 100MB of tracking data or at least once a day
storage.params.newFileTriggerSize=100000000
storage.params.newFileTriggerInterval=86400
With this storage engine, tracker events are directly delivered to a Flume-NG agent, using an embedded Flume RPC client.
See Flume documentation for information about setting up Flume. Apache Flume version 1.4 and above is supported.
The backend server buffers events in memory until a configurable maximum number of events is reached, or a configurable time interval has elapsed. The buffered events are then sent to the Flume agent in a single transaction.
The client libraries for the Flume agent should be available in the Java class path of the web tracker servlet (technically, the flume-ng-sdk JAR file and required dependencies). Refer to the documentation for your Flume distribution and servlet engine for this.
In an installation where the Flume agent is deployed and managed using Cloudera Manager using parcels (see details here), one would typically do this by linking or copying the required Flume JAR files from /opt/cloudera/parcels/CDH/lib/flume-ng/lib
to the WEB-INF/lib
subdirectory of the WT1 servlet webapp.
At the time of writing, if using Flume under Cloudera Distribution for Hadoop (CDH) version 4.4.0, the following set of JAR files is necessary:
avro-1.4.0-cdh4.4.0.jar avro-ipc-1.4.0-cdh4.4.0.jar commons-logging-1.4.0-cdh4.4.0.jar flume-ng-sdk-1.4.0-cdh4.4.0.jar jetty-1.4.0-cdh4.4.0.jar jetty-util-1.4.0-cdh4.4.0.jar libthrift-1.4.0-cdh4.4.0.jar netty-1.4.0-cdh4.4.0.jar paranamer-1.4.0-cdh4.4.0.jar slf4j-api-1.4.0-cdh4.4.0.jar slf4j-log4j12-1.4.0-cdh4.4.0.jar snappy-java-1.4.0-cdh4.4.0.jar velocity-1.4.0-cdh4.4.0.jar
This list is of course subject to change with further releases of CDH or the WT1 tracker.
The class for this storage is com.dataiku.wt1.storage.FlumeSourceProcessor
Parameter name | Description |
---|---|
flume.FLUME_PARAMETER |
A configuration parameter to be passed to the embedded Flume RPC client. See Flume Developer Guide for reference on the configuration parameters supported by the Flume RPC client. At least the flume.hosts and flume.hosts.HOSTID parameters should be given to specify the host and port of the Flume agent to which events should be sent. See below or the config-flume.properties file for examples. |
maxBufferSize |
(Optional) Maximum number of events which will be buffered in memory in the backend server. When this limit is reached, buffered events will be sent to the Flume agent. If this operation fails (because the Flume agent cannot be reached, for example), further events received on this web tracker will be discarded. Default: 1000. Note that buffered events are normally flushed when their number exceeds the batch-size configuration parameter of the embedded Flume RPC client (typical default value: 100). |
flushInterval |
(Optional) Interval in seconds, which, when elapsed, triggers a flush of the buffered events on the next event received. If this parameter is zero, interval-based events flush is disabled. Default: 0 |
reconnectDelay |
(Optional) Minimum delay in seconds before which reconnection to the Flume agent is attempted after an event delivery error has occurred. Default: 60 |
timestampHeader |
(Optional) When set to true, Flume events are tagged with a timestamp header recording the time at which the event was received on the backend server. This timestamp may then be used by downstream Flume agents and sinks to properly process and store the event. Default: false
|
hostHeader |
(Optional) When present, this parameter value is added to Flume events as a host header. This header may then be used by downstream Flume agents and sinks to properly process and store the event. |
storage.class=com.dataiku.wt1.storage.FlumeSource
storage.params.flume.hosts=h1
storage.params.flume.hosts.h1=myflumehost.mydomain.org:41414
# Send events to Flume agent every 20 events or 10 seconds
storage.params.flume.batch-size=20
storage.params.flushInterval=10
storage.class=com.dataiku.wt1.storage.FlumeSource
# Configure two redundant Flume agents as backends
storage.params.flume.client.type=org.apache.flume.api.FailoverRpcClient
storage.params.flume.hosts=f1 f2
storage.params.flume.hosts.f1=flumehost1.mydomain.org:41414
storage.params.flume.hosts.f2=flumehost2.mydomain.org:41414
storage.params.flume.connect-timeout=10s
storage.params.flume.request-timeout=10s
storage.params.reconnectDelay=30
# Flush events at most every 10 seconds
storage.params.flushInterval=10
# Add Flume headers to identify source host and timestamp of event arrival
storage.params.timestampHeader=true
storage.params.hostHeader=tracker1
The WT1 servlet embeds a Flume RPC client, which uses the Avro RPC protocol by default, and should be connected to a Flume agent through an Avro source as documented here.
A typical Flume configuration snippet for instantiating this source on TCP port 41414 on a Flume agent named a1 would be as follows:
# Configure the Avro source
a1.sources = r1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
# Configure the Flume channel
a1.channels = c1
[... Configuration of the c1 channel ...]
# Bind the source to the channel
a1.sources.r1.channels = c1