Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 23 additions & 26 deletions datafusion/physical-expr/src/regex_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ fn _regexp_replace_static_pattern<T: OffsetSizeTrait>(

let string_array = downcast_string_array_arg!(args[0], "string", T);
let pattern = fetch_string_arg!(args[1], "pattern", T, _regexp_replace_early_abort);
let replacement_array = downcast_string_array_arg!(args[2], "replacement", T);
let replacement =
fetch_string_arg!(args[2], "replacement", T, _regexp_replace_early_abort);
let flags = match args.len() {
3 => None,
4 => Some(fetch_string_arg!(args[3], "flags", T, _regexp_replace_early_abort)),
Expand All @@ -227,33 +228,29 @@ fn _regexp_replace_static_pattern<T: OffsetSizeTrait>(
}
};

// Embed the flag (if it exists) into the pattern
let (pattern, replace_all) = match flags {
Some("g") => (pattern.to_string(), true),
// Embed the flag (if it exists) into the pattern. Limit will determine
// whether this is a global match (as in replace all) or just a single
// replace operation.
let (pattern, limit) = match flags {
Some("g") => (pattern.to_string(), 0),
Some(flags) => (
format!("(?{}){}", flags.to_string().replace('g', ""), pattern),
flags.contains('g'),
!flags.contains('g') as usize,
),
None => (pattern.to_string(), false),
None => (pattern.to_string(), 1),
};

let re = Regex::new(&pattern)
.map_err(|err| DataFusionError::Execution(err.to_string()))?;

// Replaces the posix groups in the replacement string
// with rust ones.
let replacement = regex_replace_posix_groups(replacement);

let result = string_array
.iter()
.zip(replacement_array.iter())
.map(|(string, replacement)| match (string, replacement) {
(Some(string), Some(replacement)) => {
let replacement = regex_replace_posix_groups(replacement);

if replace_all {
Some(re.replace_all(string, replacement.as_str()))
} else {
Some(re.replace(string, replacement.as_str()))
}
}
_ => None,
.map(|string| {
string.map(|string| re.replacen(string, limit, replacement.as_str()))
})
.collect::<GenericStringArray<T>>();
Ok(Arc::new(result) as ArrayRef)
Expand Down Expand Up @@ -284,21 +281,21 @@ pub fn specialize_regexp_replace<T: OffsetSizeTrait>(
is_flags_scalar,
) {
// This represents a very hot path for the case where the there is
// a single pattern that is being matched against. This is extremely
// important to specialize on since it removes the overhead of DF's
// in-house regex pattern cache (since there will be at most a single
// pattern).
// a single pattern that is being matched against and a single replacement.
// This is extremely important to specialize on since it removes the overhead
// of DF's in-house regex pattern cache (since there will be at most a single
// pattern) and the pre-processing of the same replacement pattern at each
// query.
//
// The flags needs to be a scalar as well since each pattern is actually
// constructed with the flags embedded into the pattern itself. This means
// even if the pattern itself is scalar, if the flags are an array then
// we will create many regexes and it is best to use the implementation
// that caches it. If there are no flags, we can simply ignore it here,
// and let the specialized function handle it.
(_, true, _, true) => {
// We still don't know the scalarity of source/replacement, so we
// need the adapter even if it will do some extra work for the pattern
// and the flags.
(_, true, true, true) => {
// We still don't know the scalarity of source, so we need the adapter
// even if it will do some extra work for the pattern and the flags.
//
// TODO: maybe we need a way of telling the adapter on which arguments
// it can skip filling (so that we won't create N - 1 redundant cols).
Expand Down