Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market history plugin improvements #478

Merged
merged 5 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,11 +1348,6 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
auto quote_id = assets[1]->id;

if( base_id > quote_id ) std::swap( base_id, quote_id );
const auto& history_idx = _db.get_index_type<graphene::market_history::history_index>().indices().get<by_key>();
history_key hkey;
hkey.base = base_id;
hkey.quote = quote_id;
hkey.sequence = std::numeric_limits<int64_t>::min();

auto asset_to_real = [&]( const asset& a, int p ) { return double( a.amount.value ) / pow( 10, p ); };
auto price_to_real = [&]( const price& p )
Expand All @@ -1367,13 +1362,12 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
start = fc::time_point_sec( fc::time_point::now() );

uint32_t count = 0;
uint32_t skipped = 0;
auto itr = history_idx.lower_bound( hkey );
const auto& history_idx = _db.get_index_type<graphene::market_history::history_index>().indices().get<by_market_time>();
auto itr = history_idx.lower_bound( std::make_tuple( base_id, quote_id, start ) );
vector<market_trade> result;

while( itr != history_idx.end() && count < limit && !( itr->key.base != base_id || itr->key.quote != quote_id || itr->time < stop ) )
{
if( itr->time < start )
{
market_trade trade;

Expand Down Expand Up @@ -1418,12 +1412,6 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
result.push_back( trade );
++count;
}
else // should skip
{
// TODO refuse to execute if need to skip too many entries
// ++skipped;
// FC_ASSERT( skipped <= 200 );
}

++itr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include <fc/thread/future.hpp>

#include <boost/multi_index/composite_key.hpp>

namespace graphene { namespace market_history {
using namespace chain;

Expand Down Expand Up @@ -105,21 +107,53 @@ struct order_history_object : public abstract_object<order_history_object>
fc::time_point_sec time;
fill_order_operation op;
};
struct order_history_object_key_base_extractor
{
typedef asset_id_type result_type;
result_type operator()(const order_history_object& o)const { return o.key.base; }
};
struct order_history_object_key_quote_extractor
{
typedef asset_id_type result_type;
result_type operator()(const order_history_object& o)const { return o.key.quote; }
};
struct order_history_object_key_sequence_extractor
{
typedef int64_t result_type;
result_type operator()(const order_history_object& o)const { return o.key.sequence; }
};

struct by_key;
typedef multi_index_container<
bucket_object,
indexed_by<
hashed_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< bucket_object, bucket_key, &bucket_object::key > >
>
> bucket_object_multi_index_type;

struct by_market_time;
typedef multi_index_container<
order_history_object,
indexed_by<
hashed_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< order_history_object, history_key, &order_history_object::key > >
ordered_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< order_history_object, history_key, &order_history_object::key > >,
ordered_unique<
tag<by_market_time>,
composite_key<
order_history_object,
order_history_object_key_base_extractor,
order_history_object_key_quote_extractor,
member<order_history_object, time_point_sec, &order_history_object::time>,
order_history_object_key_sequence_extractor
>,
composite_key_compare<
std::less< asset_id_type >,
std::less< asset_id_type >,
std::greater< time_point_sec >,
std::less< int64_t >
>
>
>
> order_history_multi_index_type;

Expand Down Expand Up @@ -154,6 +188,8 @@ class market_history_plugin : public graphene::app::plugin

uint32_t max_history()const;
const flat_set<uint32_t>& tracked_buckets()const;
uint32_t max_order_his_records_per_market()const;
uint32_t max_order_his_seconds_per_market()const;

private:
friend class detail::market_history_plugin_impl;
Expand Down
143 changes: 92 additions & 51 deletions libraries/plugins/market_history/market_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class market_history_plugin_impl
market_history_plugin& _self;
flat_set<uint32_t> _tracked_buckets;
uint32_t _maximum_history_per_bucket_size = 1000;
uint32_t _max_order_his_records_per_market = 1000;
uint32_t _max_order_his_seconds_per_market = 259200;
};


Expand All @@ -81,13 +83,12 @@ struct operation_process_fill_order
void operator()( const fill_order_operation& o )const
{
//ilog( "processing ${o}", ("o",o) );
const auto& buckets = _plugin.tracked_buckets();
auto& db = _plugin.database();
const auto& bucket_idx = db.get_index_type<bucket_index>();
const auto& history_idx = db.get_index_type<history_index>().indices().get<by_key>();
const auto& his_time_idx = db.get_index_type<history_index>().indices().get<by_market_time>();

auto time = db.head_block_time();

// To save new filled order data
history_key hkey;
hkey.base = o.pays.asset_id;
hkey.quote = o.receives.asset_id;
Expand All @@ -104,39 +105,54 @@ struct operation_process_fill_order

db.create<order_history_object>( [&]( order_history_object& ho ) {
ho.key = hkey;
ho.time = time;
ho.time = _now;
ho.op = o;
});

/*
hkey.sequence += 200;
// To remove old filled order data
const auto max_records = _plugin.max_order_his_records_per_market();
hkey.sequence += max_records;
itr = history_idx.lower_bound( hkey );
while( itr != history_idx.end() )
if( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
{
if( itr->key.base == hkey.base && itr->key.quote == hkey.quote )
const auto max_seconds = _plugin.max_order_his_seconds_per_market();
fc::time_point_sec min_time;
if( min_time + max_seconds < _now )
min_time = _now - max_seconds;
auto time_itr = his_time_idx.lower_bound( std::make_tuple( hkey.base, hkey.quote, min_time ) );
if( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
{
db.remove( *itr );
itr = history_idx.lower_bound( hkey );
if( itr->key.sequence >= time_itr->key.sequence )
{
while( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
{
auto old_itr = itr;
++itr;
db.remove( *old_itr );
}
}
else
{
while( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
{
auto old_itr = time_itr;
++time_itr;
db.remove( *old_itr );
}
}
}
else break;
}
*/

/* Note: below is not true, because global settlement creates only one fill_order_op.
* for every matched order there are two fill order operations created, one for
* each side. We can filter the duplicates by only considering the fill operations where
* the base > quote
*/
/*
if( o.pays.asset_id > o.receives.asset_id )
{
//ilog( " skipping because base > quote" );
return;
}
*/
// To update buckets data, only update for maker orders
if( !o.is_maker )
return;

const auto max_history = _plugin.max_history();
if( max_history == 0 ) return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this check ? i started to test the options with a setup as:

programs/witness_node/witness_node --data-dir data/my-blockprod --rpc-endpoint "127.0.0.1:8090" --max-ops-per-account 1000 --partial-operations true --max-order-his-seconds-per-market 259200 --max-order-his-records-per-market 0 --history-per-size 0

to later realize that i had to change history-per-size to at least 1 to make it work.

Copy link
Member Author

@abitmore abitmore Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two sets of data being stored in the market history plugin: the detailed order matching history, and the statistics (the data in buckets). history-per-size == 0 means to not store statistics, but it should not prevent order matching history from being stored. The original code skips statistics data storing if history-per-size is set to 0, but it also skips order matching history storing, which I think is a bug. Now the new options controls how order matching history being stored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, got it. thanks for explaining it.


const auto& buckets = _plugin.tracked_buckets();
if( buckets.size() == 0 ) return;

bucket_key key;
key.base = o.pays.asset_id;
key.quote = o.receives.asset_id;
Expand All @@ -153,23 +169,25 @@ struct operation_process_fill_order
if( fill_price.base.asset_id > fill_price.quote.asset_id )
fill_price = ~fill_price;

auto max_history = _plugin.max_history();
for( auto bucket : buckets )
{
auto cutoff = (fc::time_point() + fc::seconds( bucket * max_history));
auto bucket_num = _now.sec_since_epoch() / bucket;
fc::time_point_sec cutoff;
if( bucket_num > max_history )
cutoff = cutoff + ( bucket * ( bucket_num - max_history ) );

key.seconds = bucket;
key.open = fc::time_point() + fc::seconds((_now.sec_since_epoch() / key.seconds) * key.seconds);
key.open = fc::time_point_sec() + ( bucket_num * bucket );

const auto& by_key_idx = bucket_idx.indices().get<by_key>();
auto itr = by_key_idx.find( key );
if( itr == by_key_idx.end() )
auto bucket_itr = by_key_idx.find( key );
if( bucket_itr == by_key_idx.end() )
{ // create new bucket
/* const auto& obj = */
db.create<bucket_object>( [&]( bucket_object& b ){
b.key = key;
b.quote_volume += trade_price.quote.amount;
b.base_volume += trade_price.base.amount;
b.base_volume = trade_price.base.amount;
b.quote_volume = trade_price.quote.amount;
b.open_base = fill_price.base.amount;
b.open_quote = fill_price.quote.amount;
b.close_base = fill_price.base.amount;
Expand All @@ -183,10 +201,18 @@ struct operation_process_fill_order
}
else
{ // update existing bucket
//wlog( " before updating bucket ${b}", ("b",*itr) );
db.modify( *itr, [&]( bucket_object& b ){
b.base_volume += trade_price.base.amount;
b.quote_volume += trade_price.quote.amount;
//wlog( " before updating bucket ${b}", ("b",*bucket_itr) );
db.modify( *bucket_itr, [&]( bucket_object& b ){
try {
b.base_volume += trade_price.base.amount;
} catch( fc::overflow_exception ) {
b.base_volume = std::numeric_limits<int64_t>::max();
}
try {
b.quote_volume += trade_price.quote.amount;
} catch( fc::overflow_exception ) {
b.quote_volume = std::numeric_limits<int64_t>::max();
}
b.close_base = fill_price.base.amount;
b.close_quote = fill_price.quote.amount;
if( b.high() < fill_price )
Expand All @@ -200,24 +226,23 @@ struct operation_process_fill_order
b.low_quote = b.close_quote;
}
});
//wlog( " after bucket bucket ${b}", ("b",*itr) );
//wlog( " after bucket bucket ${b}", ("b",*bucket_itr) );
}

if( max_history != 0 )
{
key.open = fc::time_point_sec();
auto itr = by_key_idx.lower_bound( key );
bucket_itr = by_key_idx.lower_bound( key );

while( itr != by_key_idx.end() &&
itr->key.base == key.base &&
itr->key.quote == key.quote &&
itr->key.seconds == bucket &&
itr->key.open < cutoff )
while( bucket_itr != by_key_idx.end() &&
bucket_itr->key.base == key.base &&
bucket_itr->key.quote == key.quote &&
bucket_itr->key.seconds == bucket &&
bucket_itr->key.open < cutoff )
{
// elog( " removing old bucket ${b}", ("b", *itr) );
auto old_itr = itr;
++itr;
db.remove( *old_itr );
// elog( " removing old bucket ${b}", ("b", *bucket_itr) );
auto old_bucket_itr = bucket_itr;
++bucket_itr;
db.remove( *old_bucket_itr );
}
}
}
Expand All @@ -229,9 +254,6 @@ market_history_plugin_impl::~market_history_plugin_impl()

void market_history_plugin_impl::update_market_histories( const signed_block& b )
{
if( _maximum_history_per_bucket_size == 0 ) return;
if( _tracked_buckets.size() == 0 ) return;

graphene::chain::database& db = database();
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
for( const optional< operation_history_object >& o_op : hist )
Expand Down Expand Up @@ -275,8 +297,12 @@ void market_history_plugin::plugin_set_program_options(
cli.add_options()
("bucket-size", boost::program_options::value<string>()->default_value("[60,300,900,1800,3600,14400,86400]"),
"Track market history by grouping orders into buckets of equal size measured in seconds specified as a JSON array of numbers")
("history-per-size", boost::program_options::value<uint32_t>()->default_value(1000),
("history-per-size", boost::program_options::value<uint32_t>()->default_value(1000),
"How far back in time to track history for each bucket size, measured in the number of buckets (default: 1000)")
("max-order-his-records-per-market", boost::program_options::value<uint32_t>()->default_value(1000),
"Will only store this amount of matched orders for each market in order history for querying, or those meet the other option, which has more data (default: 1000)")
("max-order-his-seconds-per-market", boost::program_options::value<uint32_t>()->default_value(259200),
"Will only store matched orders in last X seconds for each market in order history for querying, or those meet the other option, which has more data (default: 259200 (3 days))")
;
cfg.add(cli);
}
Expand All @@ -291,9 +317,14 @@ void market_history_plugin::plugin_initialize(const boost::program_options::vari
{
const std::string& buckets = options["bucket-size"].as<string>();
my->_tracked_buckets = fc::json::from_string(buckets).as<flat_set<uint32_t>>();
my->_tracked_buckets.erase( 0 );
}
if( options.count( "history-per-size" ) )
my->_maximum_history_per_bucket_size = options["history-per-size"].as<uint32_t>();
if( options.count( "max-order-his-records-per-market" ) )
my->_max_order_his_records_per_market = options["max-order-his-records-per-market"].as<uint32_t>();
if( options.count( "max-order-his-seconds-per-market" ) )
my->_max_order_his_seconds_per_market = options["max-order-his-seconds-per-market"].as<uint32_t>();
} FC_CAPTURE_AND_RETHROW() }

void market_history_plugin::plugin_startup()
Expand All @@ -310,4 +341,14 @@ uint32_t market_history_plugin::max_history()const
return my->_maximum_history_per_bucket_size;
}

uint32_t market_history_plugin::max_order_his_records_per_market()const
{
return my->_max_order_his_records_per_market;
}

uint32_t market_history_plugin::max_order_his_seconds_per_market()const
{
return my->_max_order_his_seconds_per_market;
}

} }