Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {} records", len);

self.build_event_from_chunk(&records)
.await?
.process()
.await?;
self.build_event_from_chunk(&records).await?.process()?;

debug!("Processed {} records", len);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
pub fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
custom_partition_values: HashMap::new(),
stream_type: StreamType::Internal,
}
.process()
.await?;
.process()?;

Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ pub async fn push_logs(
custom_partition_values,
stream_type: StreamType::UserDefined,
}
.process()
.await?;
.process()?;
}
Ok(())
}
Expand Down
Loading