Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/java-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
java_build:
name: Build
timeout-minutes: 60
runs-on: [self-hosted, it]
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
Expand All @@ -94,7 +94,7 @@ jobs:
name: Unit Tests
needs: [java_build]
timeout-minutes: 60
runs-on: [self-hosted, it]
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
Expand Down Expand Up @@ -126,15 +126,15 @@ jobs:
needs: [spotless_check, checkstyle_check, java_build, java_unit_tests]
timeout-minutes: 60
# Run on any runner that matches all the specified runs-on values.
runs-on: [self-hosted, it]
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- name: Setup Environment
id: setup-env
uses: ./.github/actions/setup-env
- name: Run Integration Smoke Tests
run: |
run: |
./cicd/run-it-smoke-tests \
--modules-to-build="DEFAULT" \
--it-region="us-central1" \
Expand All @@ -156,15 +156,15 @@ jobs:
needs: [java_integration_smoke_tests_templates]
timeout-minutes: 240
# Run on any runner that matches all the specified runs-on values.
runs-on: [self-hosted, it]
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- name: Setup Environment
id: setup-env
uses: ./.github/actions/setup-env
- name: Run Integration Tests
run: |
run: |
./cicd/run-it-tests \
--modules-to-build="DEFAULT" \
--it-region="us-central1" \
Expand All @@ -187,7 +187,7 @@ jobs:
needs: [spotless_check, checkstyle_check, java_build, java_unit_tests, java_integration_tests_templates]
timeout-minutes: 600
# Run on any runner that matches all the specified runs-on values.
runs-on: [self-hosted, perf]
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
Expand Down
46 changes: 46 additions & 0 deletions cloudbuild/cloudbuild.java.googlecloud.pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
steps:

# ---------
# Update the flex template
# ---------

- id: "update flex-template"
waitFor: ['-']
name: "gcr.io/cloud-builders/mvn"
args: [
"mvn clean package -PtemplatesRun",
"-DskipTests",
"-DprojectId=${_TARGET_PROJECT_ID}",
"-DbucketName=${_TEMPLATE_BUCKET}",
"-Dregion=us-central1",
"-DjobName=cloud-pubsub-to-gcs-text-flex-job",
"-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs",
"-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX},outputShardTemplate=${_OUTPUT_SHARD_TEMPLATE},numShards=${_NUM_SHARDS},windowDuration=${_WINDOW_DURATION},yearPattern=${_YEAR_PATTERN},monthPattern=${_MONTH_PATTERN},dayPattern=${_DAY_PATTERN},hourPattern=${_HOUR_PATTERN},minutePattern=${_MINUTE_PATTERN}",
"-f v2/googlecloud-to-googlecloud"
]

timeout: 1200s

substitutions:
### Required
_OUTPUT_DIRECTORY: gs://analytics_events_warehouse/errors_w_attrs

### Optional
_INPUT_TOPIC: dataflow-failure
_INPUT_SUBSCRIPTION: failed_dataflow_transformation_develop
_USER_TEMP_LOCATION: gs://dataflow-staging-us-central1-739390938599/tmp
_OUTPUT_FILENAME_PREFIX: error
_OUTPUT_FILENAME_SUFFIX: ""
_OUTPUT_SHARD_TEMPLATE: W-P-SS-of-NN
_NUM_SHARDS: "0"
_WINDOW_DURATION: 5m
_YEAR_PATTERN: YYYY
_MONTH_PATTERN: MM
_DAY_PATTERN: dd
_HOUR_PATTERN: HH
_MINUTE_PATTERN: mm

_TARGET_PROJECT_ID: onx-dw-raw
_TEMPLATE_BUCKET: gs://onx-flex-templates
options:
dynamic_substitutions: true
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
Expand Down Expand Up @@ -151,6 +157,18 @@ public interface Options
String getOutputFilenameSuffix();

void setOutputFilenameSuffix(String value);

@TemplateParameter.Text(
order = 7,
groupName = "Target",
optional = true,
description = "Optionally include attributes in pubsub pull",
helpText =
"If specified, pull the message and the attributes from the topic or subscription",
example = "True,False")
String getAttributeFlag();

void setAttributeFlag(String value);
}

/**
Expand All @@ -176,6 +194,8 @@ public static void main(String[] args) {
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag());

boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
Expand All @@ -193,11 +213,49 @@ public static PipelineResult run(Options options) {
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/

if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
if (pullAttributes) {
PCollection<PubsubMessage> messagesAttr =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
messages =
messagesAttr.apply(
"ExtractPayloadAndAttributesToString",
MapElements.into(TypeDescriptor.of(String.class))
.via(
(PubsubMessage message) -> {
// Get the message payload as a string
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
// Get the message attributes and convert them to a JSON-like string
// format
Map<String, String> attributes = message.getAttributeMap();
String attributesString =
attributes.entrySet().stream()
.map(
entry ->
"\""
+ entry.getKey()
+ "\": \""
+ entry.getValue()
+ "\"")
.collect(Collectors.joining(", ", "{", "}"));

// Return the concatenated string with both the payload and attributes
return "{ \"payload\": \""
+ payload
+ "\", \"attributes\": "
+ attributesString
+ " }";
}));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
}
} else {
messages =
pipeline.apply(
Expand Down
Loading