Skip to content

Commit

Permalink
chore(core): fix some compilation warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Apr 15, 2022
1 parent b1b959a commit 5020fd2
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected static Flow createFlow(List<AbstractTrigger> triggers) {
}

protected static Flow createFlow(List<AbstractTrigger> triggers, List<TaskDefault> list) {
Flow.FlowBuilder flow = Flow.builder()
Flow.FlowBuilder<?, ?> flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.inputs(List.of(Input.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ static TopicsConfig topicsConfig(ApplicationContext applicationContext, String n
private List<TopicPartition> listTopicPartition() throws ExecutionException, InterruptedException {
return this.adminClient
.describeTopics(Collections.singleton(topicsConfig.getName()))
.all()
.allTopicNames()
.get()
.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.micronaut.inject.qualifiers.Qualifiers;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -122,9 +123,10 @@ public Runnable receive(Class<?> consumerGroup, Consumer<WorkerTask> consumer) {
});

// we commit first all offset before submit task to worker

kafkaProducer.sendOffsetsToTransaction(
KafkaConsumerService.maxOffsets(records),
kafkaConfigService.getConsumerGroupName(consumerGroup)
new ConsumerGroupMetadata(kafkaConfigService.getConsumerGroupName(consumerGroup))
);
kafkaProducer.commitTransaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public StreamsBuilder topology() {
}

@Getter
@Jacksonized
@AllArgsConstructor
public static class FlowWithPrevious {
private Flow flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class KafkaStreamsBuilder extends StreamsBuilder {
@Override
public synchronized Topology build() {
Properties properties = new Properties();
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

return super.build(properties);
}
Expand Down

0 comments on commit 5020fd2

Please sign in to comment.