Skip to content

Commit

Permalink
it works on my machine ¯\_(ツ)_/¯
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Apr 18, 2024
1 parent 182e22d commit 9e1e4a5
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 36 deletions.
12 changes: 3 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,20 @@ bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion = { path = "../arrow-datafusion/datafusion/core" }
datafusion-common = { path = "../arrow-datafusion/datafusion/common" }
datafusion-expr = { path = "../arrow-datafusion/datafusion/expr" }
datafusion-optimizer = { path = "../arrow-datafusion/datafusion/optimizer" }
datafusion-physical-expr = { path = "../arrow-datafusion/datafusion/physical-expr" }
datafusion-sql = { path = "../arrow-datafusion/datafusion/sql" }
datafusion-substrait = { path = "../arrow-datafusion/datafusion/substrait" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.12"
Expand Down
54 changes: 54 additions & 0 deletions feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# read line from log-1000.txt and POST it to http://localhost:4000/v1/influxdb/write?db=public&precision=ms
# POST data format: "many_logs,host=1 log=<FILE CONTENT> <INCREMENT ID>"

import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

batch_size = 3000
worker = 8

# Define the URL
url = "http://localhost:4000/v1/influxdb/write?db=public&precision=ms"


def send_data(start, data):
# Send the POST request
response = requests.post(url, data=data)
# Check the response
if response.status_code >= 300:
print(
f"Failed to send log line {start}: {response.status_code} {response.text}"
)


# Open the file
with open("target/log-1000.txt", "r") as file:
lines = file.readlines()

# Create a progress bar
with tqdm(
total=len(lines),
desc="Processing lines",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}",
) as pbar:
data = ""
with ThreadPoolExecutor(max_workers=worker) as executor:
for i, line in enumerate(lines):
# Prepare the POST data
content = line.strip()
content = content.replace('"', " ")
content = content.replace("'", " ")
content = content.replace("=", " ")
content = content.replace(".", " ")

data = data + f'many_logs,host=1 log="{content}" {i}\n'

if i % batch_size == 0:
executor.submit(send_data, i, data)
data = ""
# Update the progress bar
pbar.update(batch_size)

# close the executor
executor.shutdown(wait=True)
1 change: 1 addition & 0 deletions src/common/function/src/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
pub mod matches;
pub mod math;
pub mod numpy;
#[cfg(test)]
Expand Down
61 changes: 61 additions & 0 deletions src/common/function/src/scalars/matches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::fmt::Display;
use std::sync::Arc;

use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{BooleanVector, VectorRef};

use crate::function::{Function, FunctionContext};

const NAME: &str = "matches";

/// The function to find remainders
#[derive(Clone, Debug, Default)]
pub struct MatchesFunction;

impl Display for MatchesFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}

impl Function for MatchesFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}

fn signature(&self) -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let num_rows = columns[1].len();
Ok(Arc::new(BooleanVector::from(vec![true; num_rows])))
}
}
2 changes: 2 additions & 0 deletions src/common/function/src/scalars/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use pow::PowFunction;
pub use rate::RateFunction;
use snafu::ResultExt;

use super::matches::MatchesFunction;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
use crate::scalars::math::modulo::ModuloFunction;
Expand All @@ -44,6 +45,7 @@ impl MathFunction {
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(MatchesFunction));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/common/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
common-error.workspace = true
common-function.workspace = true
common-macro.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datafusion.workspace = true
datatypes.workspace = true
promql.workspace = true
prost.workspace = true
session.workspace = true
snafu.workspace = true

[dependencies.substrait_proto]
Expand Down
18 changes: 18 additions & 0 deletions src/common/substrait/src/df_substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::udf::create_udf;
use common_function::state::FunctionState;
use datafusion::catalog::CatalogList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
Expand All @@ -24,6 +27,7 @@ use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use prost::Message;
use session::context::QueryContext;
use snafu::ResultExt;
use substrait_proto::proto::Plan;

Expand All @@ -50,6 +54,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
let state = SessionState::new_with_config_rt(state_config, Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let mut context = SessionContext::new_with_state(state);

let udf = create_udf(
Arc::new(MatchesFunction),
QueryContext::arc(),
Arc::new(FunctionState::default()),
);
context.register_udf(udf.into());
context.register_catalog_list(catalog_list);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
let df_plan = from_substrait_plan(&mut context, &plan)
Expand All @@ -65,6 +76,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
.with_serializer_registry(Arc::new(ExtensionSerializer));
let context = SessionContext::new_with_state(session_state);

let udf = create_udf(
Arc::new(MatchesFunction),
QueryContext::arc(),
Arc::new(FunctionState::default()),
);
context.register_udf(udf.into());

let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?;
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;

Expand Down
6 changes: 3 additions & 3 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ greptime-proto.workspace = true
mockall.workspace = true
pin-project.workspace = true
prost.workspace = true
regex.workspace = true
regex-automata.workspace = true
regex.workspace = true
snafu.workspace = true
tantivy = "0.22"
tantivy = { version = "0.22", features = ["zstd-compression"] }

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio.workspace = true
Loading

0 comments on commit 9e1e4a5

Please sign in to comment.