Skip to content

Commit

Permalink
Merge pull request #478 from abitmore/472-market-his-size
Browse files Browse the repository at this point in the history
Market history plugin improvements
  • Loading branch information
oxarbitrage authored Nov 20, 2017
2 parents a9a55d7 + cecc926 commit e8da8e0
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 68 deletions.
16 changes: 2 additions & 14 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,11 +1348,6 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
auto quote_id = assets[1]->id;

if( base_id > quote_id ) std::swap( base_id, quote_id );
const auto& history_idx = _db.get_index_type<graphene::market_history::history_index>().indices().get<by_key>();
history_key hkey;
hkey.base = base_id;
hkey.quote = quote_id;
hkey.sequence = std::numeric_limits<int64_t>::min();

auto asset_to_real = [&]( const asset& a, int p ) { return double( a.amount.value ) / pow( 10, p ); };
auto price_to_real = [&]( const price& p )
Expand All @@ -1367,13 +1362,12 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
start = fc::time_point_sec( fc::time_point::now() );

uint32_t count = 0;
uint32_t skipped = 0;
auto itr = history_idx.lower_bound( hkey );
const auto& history_idx = _db.get_index_type<graphene::market_history::history_index>().indices().get<by_market_time>();
auto itr = history_idx.lower_bound( std::make_tuple( base_id, quote_id, start ) );
vector<market_trade> result;

while( itr != history_idx.end() && count < limit && !( itr->key.base != base_id || itr->key.quote != quote_id || itr->time < stop ) )
{
if( itr->time < start )
{
market_trade trade;

Expand Down Expand Up @@ -1418,12 +1412,6 @@ vector<market_trade> database_api_impl::get_trade_history( const string& base,
result.push_back( trade );
++count;
}
else // should skip
{
// TODO refuse to execute if need to skip too many entries
// ++skipped;
// FC_ASSERT( skipped <= 200 );
}

++itr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include <fc/thread/future.hpp>

#include <boost/multi_index/composite_key.hpp>

namespace graphene { namespace market_history {
using namespace chain;

Expand Down Expand Up @@ -105,21 +107,53 @@ struct order_history_object : public abstract_object<order_history_object>
fc::time_point_sec time;
fill_order_operation op;
};
struct order_history_object_key_base_extractor
{
typedef asset_id_type result_type;
result_type operator()(const order_history_object& o)const { return o.key.base; }
};
struct order_history_object_key_quote_extractor
{
typedef asset_id_type result_type;
result_type operator()(const order_history_object& o)const { return o.key.quote; }
};
struct order_history_object_key_sequence_extractor
{
typedef int64_t result_type;
result_type operator()(const order_history_object& o)const { return o.key.sequence; }
};

struct by_key;
typedef multi_index_container<
bucket_object,
indexed_by<
hashed_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< bucket_object, bucket_key, &bucket_object::key > >
>
> bucket_object_multi_index_type;

struct by_market_time;
typedef multi_index_container<
order_history_object,
indexed_by<
hashed_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< order_history_object, history_key, &order_history_object::key > >
ordered_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique< tag<by_key>, member< order_history_object, history_key, &order_history_object::key > >,
ordered_unique<
tag<by_market_time>,
composite_key<
order_history_object,
order_history_object_key_base_extractor,
order_history_object_key_quote_extractor,
member<order_history_object, time_point_sec, &order_history_object::time>,
order_history_object_key_sequence_extractor
>,
composite_key_compare<
std::less< asset_id_type >,
std::less< asset_id_type >,
std::greater< time_point_sec >,
std::less< int64_t >
>
>
>
> order_history_multi_index_type;

Expand Down Expand Up @@ -154,6 +188,8 @@ class market_history_plugin : public graphene::app::plugin

uint32_t max_history()const;
const flat_set<uint32_t>& tracked_buckets()const;
uint32_t max_order_his_records_per_market()const;
uint32_t max_order_his_seconds_per_market()const;

private:
friend class detail::market_history_plugin_impl;
Expand Down
143 changes: 92 additions & 51 deletions libraries/plugins/market_history/market_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class market_history_plugin_impl
market_history_plugin& _self;
flat_set<uint32_t> _tracked_buckets;
uint32_t _maximum_history_per_bucket_size = 1000;
uint32_t _max_order_his_records_per_market = 1000;
uint32_t _max_order_his_seconds_per_market = 259200;
};


Expand All @@ -81,13 +83,12 @@ struct operation_process_fill_order
void operator()( const fill_order_operation& o )const
{
//ilog( "processing ${o}", ("o",o) );
const auto& buckets = _plugin.tracked_buckets();
auto& db = _plugin.database();
const auto& bucket_idx = db.get_index_type<bucket_index>();
const auto& history_idx = db.get_index_type<history_index>().indices().get<by_key>();
const auto& his_time_idx = db.get_index_type<history_index>().indices().get<by_market_time>();

auto time = db.head_block_time();

// To save new filled order data
history_key hkey;
hkey.base = o.pays.asset_id;
hkey.quote = o.receives.asset_id;
Expand All @@ -104,39 +105,54 @@ struct operation_process_fill_order

db.create<order_history_object>( [&]( order_history_object& ho ) {
ho.key = hkey;
ho.time = time;
ho.time = _now;
ho.op = o;
});

/*
hkey.sequence += 200;
// To remove old filled order data
const auto max_records = _plugin.max_order_his_records_per_market();
hkey.sequence += max_records;
itr = history_idx.lower_bound( hkey );
while( itr != history_idx.end() )
if( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
{
if( itr->key.base == hkey.base && itr->key.quote == hkey.quote )
const auto max_seconds = _plugin.max_order_his_seconds_per_market();
fc::time_point_sec min_time;
if( min_time + max_seconds < _now )
min_time = _now - max_seconds;
auto time_itr = his_time_idx.lower_bound( std::make_tuple( hkey.base, hkey.quote, min_time ) );
if( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
{
db.remove( *itr );
itr = history_idx.lower_bound( hkey );
if( itr->key.sequence >= time_itr->key.sequence )
{
while( itr != history_idx.end() && itr->key.base == hkey.base && itr->key.quote == hkey.quote )
{
auto old_itr = itr;
++itr;
db.remove( *old_itr );
}
}
else
{
while( time_itr != his_time_idx.end() && time_itr->key.base == hkey.base && time_itr->key.quote == hkey.quote )
{
auto old_itr = time_itr;
++time_itr;
db.remove( *old_itr );
}
}
}
else break;
}
*/

/* Note: below is not true, because global settlement creates only one fill_order_op.
* for every matched order there are two fill order operations created, one for
* each side. We can filter the duplicates by only considering the fill operations where
* the base > quote
*/
/*
if( o.pays.asset_id > o.receives.asset_id )
{
//ilog( " skipping because base > quote" );
return;
}
*/
// To update buckets data, only update for maker orders
if( !o.is_maker )
return;

const auto max_history = _plugin.max_history();
if( max_history == 0 ) return;

const auto& buckets = _plugin.tracked_buckets();
if( buckets.size() == 0 ) return;

bucket_key key;
key.base = o.pays.asset_id;
key.quote = o.receives.asset_id;
Expand All @@ -153,23 +169,25 @@ struct operation_process_fill_order
if( fill_price.base.asset_id > fill_price.quote.asset_id )
fill_price = ~fill_price;

auto max_history = _plugin.max_history();
for( auto bucket : buckets )
{
auto cutoff = (fc::time_point() + fc::seconds( bucket * max_history));
auto bucket_num = _now.sec_since_epoch() / bucket;
fc::time_point_sec cutoff;
if( bucket_num > max_history )
cutoff = cutoff + ( bucket * ( bucket_num - max_history ) );

key.seconds = bucket;
key.open = fc::time_point() + fc::seconds((_now.sec_since_epoch() / key.seconds) * key.seconds);
key.open = fc::time_point_sec() + ( bucket_num * bucket );

const auto& by_key_idx = bucket_idx.indices().get<by_key>();
auto itr = by_key_idx.find( key );
if( itr == by_key_idx.end() )
auto bucket_itr = by_key_idx.find( key );
if( bucket_itr == by_key_idx.end() )
{ // create new bucket
/* const auto& obj = */
db.create<bucket_object>( [&]( bucket_object& b ){
b.key = key;
b.quote_volume += trade_price.quote.amount;
b.base_volume += trade_price.base.amount;
b.base_volume = trade_price.base.amount;
b.quote_volume = trade_price.quote.amount;
b.open_base = fill_price.base.amount;
b.open_quote = fill_price.quote.amount;
b.close_base = fill_price.base.amount;
Expand All @@ -183,10 +201,18 @@ struct operation_process_fill_order
}
else
{ // update existing bucket
//wlog( " before updating bucket ${b}", ("b",*itr) );
db.modify( *itr, [&]( bucket_object& b ){
b.base_volume += trade_price.base.amount;
b.quote_volume += trade_price.quote.amount;
//wlog( " before updating bucket ${b}", ("b",*bucket_itr) );
db.modify( *bucket_itr, [&]( bucket_object& b ){
try {
b.base_volume += trade_price.base.amount;
} catch( fc::overflow_exception ) {
b.base_volume = std::numeric_limits<int64_t>::max();
}
try {
b.quote_volume += trade_price.quote.amount;
} catch( fc::overflow_exception ) {
b.quote_volume = std::numeric_limits<int64_t>::max();
}
b.close_base = fill_price.base.amount;
b.close_quote = fill_price.quote.amount;
if( b.high() < fill_price )
Expand All @@ -200,24 +226,23 @@ struct operation_process_fill_order
b.low_quote = b.close_quote;
}
});
//wlog( " after bucket bucket ${b}", ("b",*itr) );
//wlog( " after bucket bucket ${b}", ("b",*bucket_itr) );
}

if( max_history != 0 )
{
key.open = fc::time_point_sec();
auto itr = by_key_idx.lower_bound( key );
bucket_itr = by_key_idx.lower_bound( key );

while( itr != by_key_idx.end() &&
itr->key.base == key.base &&
itr->key.quote == key.quote &&
itr->key.seconds == bucket &&
itr->key.open < cutoff )
while( bucket_itr != by_key_idx.end() &&
bucket_itr->key.base == key.base &&
bucket_itr->key.quote == key.quote &&
bucket_itr->key.seconds == bucket &&
bucket_itr->key.open < cutoff )
{
// elog( " removing old bucket ${b}", ("b", *itr) );
auto old_itr = itr;
++itr;
db.remove( *old_itr );
// elog( " removing old bucket ${b}", ("b", *bucket_itr) );
auto old_bucket_itr = bucket_itr;
++bucket_itr;
db.remove( *old_bucket_itr );
}
}
}
Expand All @@ -229,9 +254,6 @@ market_history_plugin_impl::~market_history_plugin_impl()

void market_history_plugin_impl::update_market_histories( const signed_block& b )
{
if( _maximum_history_per_bucket_size == 0 ) return;
if( _tracked_buckets.size() == 0 ) return;

graphene::chain::database& db = database();
const vector<optional< operation_history_object > >& hist = db.get_applied_operations();
for( const optional< operation_history_object >& o_op : hist )
Expand Down Expand Up @@ -275,8 +297,12 @@ void market_history_plugin::plugin_set_program_options(
cli.add_options()
("bucket-size", boost::program_options::value<string>()->default_value("[60,300,900,1800,3600,14400,86400]"),
"Track market history by grouping orders into buckets of equal size measured in seconds specified as a JSON array of numbers")
("history-per-size", boost::program_options::value<uint32_t>()->default_value(1000),
("history-per-size", boost::program_options::value<uint32_t>()->default_value(1000),
"How far back in time to track history for each bucket size, measured in the number of buckets (default: 1000)")
("max-order-his-records-per-market", boost::program_options::value<uint32_t>()->default_value(1000),
"Will only store this amount of matched orders for each market in order history for querying, or those meet the other option, which has more data (default: 1000)")
("max-order-his-seconds-per-market", boost::program_options::value<uint32_t>()->default_value(259200),
"Will only store matched orders in last X seconds for each market in order history for querying, or those meet the other option, which has more data (default: 259200 (3 days))")
;
cfg.add(cli);
}
Expand All @@ -291,9 +317,14 @@ void market_history_plugin::plugin_initialize(const boost::program_options::vari
{
const std::string& buckets = options["bucket-size"].as<string>();
my->_tracked_buckets = fc::json::from_string(buckets).as<flat_set<uint32_t>>();
my->_tracked_buckets.erase( 0 );
}
if( options.count( "history-per-size" ) )
my->_maximum_history_per_bucket_size = options["history-per-size"].as<uint32_t>();
if( options.count( "max-order-his-records-per-market" ) )
my->_max_order_his_records_per_market = options["max-order-his-records-per-market"].as<uint32_t>();
if( options.count( "max-order-his-seconds-per-market" ) )
my->_max_order_his_seconds_per_market = options["max-order-his-seconds-per-market"].as<uint32_t>();
} FC_CAPTURE_AND_RETHROW() }

void market_history_plugin::plugin_startup()
Expand All @@ -310,4 +341,14 @@ uint32_t market_history_plugin::max_history()const
return my->_maximum_history_per_bucket_size;
}

uint32_t market_history_plugin::max_order_his_records_per_market()const
{
return my->_max_order_his_records_per_market;
}

uint32_t market_history_plugin::max_order_his_seconds_per_market()const
{
return my->_max_order_his_seconds_per_market;
}

} }

0 comments on commit e8da8e0

Please sign in to comment.