Having such a function
static class FluxToFluxFunction implements Function<Flux<String>, Flux<String>> {
private static final Logger log = LoggerFactory.getLogger(FluxToFluxFunction.class);
static final Scheduler SCHEDULER = Schedulers.newParallel("sleuthFunction");
@Override
public Flux<String> apply(Flux<String> input) {
return input.doOnEach(signal -> log.info("Got a message"))
.flatMap(s -> Mono.delay(Duration.ofMillis(1), SCHEDULER).map(aLong -> {
log.info("Logging [{}] from flat map", s);
return s.toUpperCase();
}));
}
}
causes cast exceptions when adding tracing.