Skip to content

Commit

Permalink
Merge pull request #221 from den818/CLIENT_WRITE_INFO
Browse files Browse the repository at this point in the history
Support client write info & progress written_rows/written_bytes
  • Loading branch information
Enmk authored Oct 18, 2022
2 parents 0ef73e2 + 85f8c35 commit 69325bd
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 2 deletions.
5 changes: 5 additions & 0 deletions clickhouse/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ const BlockInfo& Block::Info() const {
return info_;
}

/// Set block info
void Block::SetInfo(BlockInfo info) {
info_ = std::move(info);
}

/// Count of rows in the block.
size_t Block::GetRowCount() const {
return rows_;
Expand Down
3 changes: 3 additions & 0 deletions clickhouse/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class Block {

const BlockInfo& Info() const;

/// Set block info
void SetInfo(BlockInfo info);

/// Count of rows in the block.
size_t GetRowCount() const;

Expand Down
14 changes: 12 additions & 2 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420

#define REVISION DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
#define REVISION DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO

namespace clickhouse {

Expand Down Expand Up @@ -408,6 +409,15 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
return false;
}
}
if (REVISION >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) {
return false;
}
if (!WireFormat::ReadUInt64(*input_, &info.written_bytes)) {
return false;
}
}

if (events_) {
events_->OnProgress(info);
Expand Down Expand Up @@ -475,7 +485,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
return false;
}

// TODO use data
block->SetInfo(std::move(info));
}

uint64_t num_columns = 0;
Expand Down
2 changes: 2 additions & 0 deletions clickhouse/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct Progress {
uint64_t rows = 0;
uint64_t bytes = 0;
uint64_t total_rows = 0;
uint64_t written_rows = 0;
uint64_t written_bytes = 0;
};


Expand Down
29 changes: 29 additions & 0 deletions ut/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,35 @@ TEST_P(ClientCase, RoundtripArrayTString) {
EXPECT_TRUE(CompareRecursive(*array, *result_typed));
}

TEST_P(ClientCase, OnProgress) {
Block block;
createTableWithOneColumn<ColumnString>(block);

std::optional<Progress> received_progress;
Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" );
query.OnProgress([&](const Progress& progress) {
received_progress = progress;
});
client_->Execute(query);

ASSERT_TRUE(received_progress.has_value());

EXPECT_GE(received_progress->rows, 0u);
EXPECT_LE(received_progress->rows, 2u);

EXPECT_GE(received_progress->bytes, 0u);
EXPECT_LE(received_progress->bytes, 10000u);

EXPECT_GE(received_progress->total_rows, 0u);
EXPECT_LE(received_progress->total_rows, 2u);

EXPECT_GE(received_progress->written_rows, 0u);
EXPECT_LE(received_progress->written_rows, 2u);

EXPECT_GE(received_progress->written_bytes, 0u);
EXPECT_LE(received_progress->written_bytes, 10000u);
}

const auto LocalHostEndpoint = ClientOptions()
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))
Expand Down

0 comments on commit 69325bd

Please sign in to comment.