Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching query result if I use spark-clickhouse-connector #375

Open
hipp0gryph opened this issue Dec 19, 2024 · 0 comments
Open

Caching query result if I use spark-clickhouse-connector #375

hipp0gryph opened this issue Dec 19, 2024 · 0 comments

Comments

@hipp0gryph
Copy link

hipp0gryph commented Dec 19, 2024

Hello! I use driver in pyspark on cluster with jars clickhouse-jdbc-0.4.5.jar clickhouse-spark-runtime-3.3_2.12-0.7.1.jar

I have table with ReplicatedReplacingMergeTree:

ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/visits_v2', '{replica}', ver)

And I tried take data from table and on first iteration all good. On second iteration I see into visits same dataframe like after first iteration select. That look like clickhouse caching query data. If I do small change into query (new where, bigger limit, etc) - all working fine on first try. On second iteration I get same dataframe on second run.

Spark code looks like:

spark = SparkSession.builder \
    .appName("ClickHouse Example") \
    .config("spark.clickhouse.write.repartitionByPartition", "false") \
    .config("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") \
    .config("spark.sql.catalog.clickhouse.host", config["CLICKHOUSE"]["host"]) \
    .config("spark.sql.catalog.clickhouse.protocol", "http") \
    .config("spark.sql.catalog.clickhouse.http_port", config["CLICKHOUSE"]["port"]) \
    .config("spark.sql.catalog.clickhouse.user", config["CLICKHOUSE"]["user"]) \
    .config("spark.sql.catalog.clickhouse.password", config["CLICKHOUSE"]["password"]) \
    .config("spark.sql.catalog.clickhouse.database", config["CLICKHOUSE"]["table_visits"].split(".")[0]) \
    .getOrCreate()
spark.sql("use clickhouse;")
spark.catalog.clearCache()
visits = spark.sql(f'SELECT * FROM {config["CLICKHOUSE"]["table_visits"]} final '
                       f'where isChecked = 0 limit {config["CLICKHOUSE"]["limit"]}') \
        .repartition(100) \
        .withColumn("isChecked", F.lit(1)) \
        .withColumn("ver", F.col("ver") + 1)
visits.show()
visits.createOrReplaceTempView("checked_visits")
spark.sql(f'INSERT INTO {config["CLICKHOUSE"]["table_visits"]} SELECT * from checked_visits;')

How I can fix that? If i send same query into clickhouse with other instrument - I get normal result. Into instruction I not found param with caching. Thank you in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant