Skip to content

Commit

Permalink
Market his plugin: precalculate ticker data. bitshares#509
Browse files Browse the repository at this point in the history
  • Loading branch information
abitmore committed Nov 30, 2017
1 parent ef374cd commit ed11ede
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ namespace graphene { namespace account_history {
// time.
//
#ifndef ACCOUNT_HISTORY_SPACE_ID
#define ACCOUNT_HISTORY_SPACE_ID 5
#define ACCOUNT_HISTORY_SPACE_ID 4
#endif

enum account_history_object_type
{
key_account_object_type = 0,
bucket_object_type = 1 ///< used in market_history_plugin
key_account_object_type = 0
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <graphene/chain/database.hpp>

#include <fc/thread/future.hpp>
#include <fc/uint128.hpp>

#include <boost/multi_index/composite_key.hpp>

Expand All @@ -43,10 +44,18 @@ using namespace chain;
// various template automagic depends on them being known at compile
// time.
//
#ifndef ACCOUNT_HISTORY_SPACE_ID
#define ACCOUNT_HISTORY_SPACE_ID 5
#ifndef MARKET_HISTORY_SPACE_ID
#define MARKET_HISTORY_SPACE_ID 5
#endif

enum market_history_object_type
{
order_history_object_type = 0,
bucket_object_type = 1,
market_ticker_object_type = 2,
market_ticker_meta_object_type = 3
};

struct bucket_key
{
bucket_key( asset_id_type a, asset_id_type b, uint32_t s, fc::time_point_sec o )
Expand All @@ -70,8 +79,8 @@ struct bucket_key

struct bucket_object : public abstract_object<bucket_object>
{
static const uint8_t space_id = ACCOUNT_HISTORY_SPACE_ID;
static const uint8_t type_id = 1; // market_history_plugin type, referenced from account_history_plugin.hpp
static const uint8_t space_id = MARKET_HISTORY_SPACE_ID;
static const uint8_t type_id = bucket_object_type;

price high()const { return asset( high_base, key.base ) / asset( high_quote, key.quote ); }
price low()const { return asset( low_base, key.base ) / asset( low_quote, key.quote ); }
Expand Down Expand Up @@ -103,9 +112,12 @@ struct history_key {
};
struct order_history_object : public abstract_object<order_history_object>
{
history_key key;
fc::time_point_sec time;
fill_order_operation op;
static const uint8_t space_id = MARKET_HISTORY_SPACE_ID;
static const uint8_t type_id = order_history_object_type;

history_key key;
fc::time_point_sec time;
fill_order_operation op;
};
struct order_history_object_key_base_extractor
{
Expand All @@ -123,6 +135,30 @@ struct order_history_object_key_sequence_extractor
result_type operator()(const order_history_object& o)const { return o.key.sequence; }
};

struct market_ticker_object : public abstract_object<market_ticker_object>
{
static const uint8_t space_id = MARKET_HISTORY_SPACE_ID;
static const uint8_t type_id = market_ticker_object_type;

asset_id_type base;
asset_id_type quote;
share_type last_day_base;
share_type last_day_quote;
share_type latest_base;
share_type latest_quote;
fc::uint128 base_volume;
fc::uint128 quote_volume;
};

struct market_ticker_meta_object : public abstract_object<market_ticker_meta_object>
{
static const uint8_t space_id = MARKET_HISTORY_SPACE_ID;
static const uint8_t type_id = market_ticker_meta_object_type;

object_id_type rolling_min_order_his_id;
bool skip_min_order_his_id = false;
};

struct by_key;
typedef multi_index_container<
bucket_object,
Expand Down Expand Up @@ -157,9 +193,25 @@ typedef multi_index_container<
>
> order_history_multi_index_type;

struct by_market;
typedef multi_index_container<
market_ticker_object,
indexed_by<
ordered_unique< tag<by_id>, member< object, object_id_type, &object::id > >,
ordered_unique<
tag<by_market>,
composite_key<
market_ticker_object,
member<market_ticker_object, asset_id_type, &market_ticker_object::base>,
member<market_ticker_object, asset_id_type, &market_ticker_object::quote>
>
>
>
> market_ticker_object_multi_index_type;

typedef generic_index<bucket_object, bucket_object_multi_index_type> bucket_index;
typedef generic_index<order_history_object, order_history_multi_index_type> history_index;
typedef generic_index<market_ticker_object, market_ticker_object_multi_index_type> market_ticker_index;


namespace detail
Expand Down Expand Up @@ -201,11 +253,17 @@ class market_history_plugin : public graphene::app::plugin
FC_REFLECT( graphene::market_history::history_key, (base)(quote)(sequence) )
FC_REFLECT_DERIVED( graphene::market_history::order_history_object, (graphene::db::object), (key)(time)(op) )
FC_REFLECT( graphene::market_history::bucket_key, (base)(quote)(seconds)(open) )
FC_REFLECT_DERIVED( graphene::market_history::bucket_object, (graphene::db::object),
FC_REFLECT_DERIVED( graphene::market_history::bucket_object, (graphene::db::object),
(key)
(high_base)(high_quote)
(low_base)(low_quote)
(open_base)(open_quote)
(close_base)(close_quote)
(base_volume)(quote_volume) )

FC_REFLECT_DERIVED( graphene::market_history::market_ticker_object, (graphene::db::object),
(base)(quote)
(last_day_base)(last_day_quote)
(latest_base)(latest_quote)
(base_volume)(quote_volume) )
FC_REFLECT_DERIVED( graphene::market_history::market_ticker_meta_object, (graphene::db::object),
(rolling_min_order_his_id)(skip_min_order_his_id) )
147 changes: 131 additions & 16 deletions libraries/plugins/market_history/market_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ class market_history_plugin_impl
uint32_t _maximum_history_per_bucket_size = 1000;
uint32_t _max_order_his_records_per_market = 1000;
uint32_t _max_order_his_seconds_per_market = 259200;

const market_ticker_meta_object* _meta = nullptr;
};


struct operation_process_fill_order
{
market_history_plugin& _plugin;
fc::time_point_sec _now;
market_history_plugin& _plugin;
fc::time_point_sec _now;
const market_ticker_meta_object*& _meta;

operation_process_fill_order( market_history_plugin& mhp, fc::time_point_sec n )
:_plugin(mhp),_now(n) {}
operation_process_fill_order( market_history_plugin& mhp, fc::time_point_sec n, const market_ticker_meta_object*& meta )
:_plugin(mhp),_now(n),_meta(meta) {}

typedef void result_type;

Expand All @@ -84,9 +87,9 @@ struct operation_process_fill_order
{
//ilog( "processing ${o}", ("o",o) );
auto& db = _plugin.database();
const auto& bucket_idx = db.get_index_type<bucket_index>();
const auto& history_idx = db.get_index_type<history_index>().indices().get<by_key>();
const auto& his_time_idx = db.get_index_type<history_index>().indices().get<by_market_time>();
const auto& order_his_idx = db.get_index_type<history_index>().indices();
const auto& history_idx = order_his_idx.get<by_key>();
const auto& his_time_idx = order_his_idx.get<by_market_time>();

// To save new filled order data
history_key hkey;
Expand All @@ -103,12 +106,25 @@ struct operation_process_fill_order
else
hkey.sequence = 0;

db.create<order_history_object>( [&]( order_history_object& ho ) {
const auto& new_order_his_obj = db.create<order_history_object>( [&]( order_history_object& ho ) {
ho.key = hkey;
ho.time = _now;
ho.op = o;
});

// save a reference to market ticker meta object
if( _meta == nullptr )
{
const auto& meta_idx = db.get_index_type<simple_index<market_ticker_meta_object>>();
if( meta_idx.size() == 0 )
_meta = &db.create<market_ticker_meta_object>( [&]( market_ticker_meta_object& mtm ) {
mtm.rolling_min_order_his_id = new_order_his_obj.id;
mtm.skip_min_order_his_id = false;
});
else
_meta = &( *meta_idx.begin() );
}

// To remove old filled order data
const auto max_records = _plugin.max_order_his_records_per_market();
hkey.sequence += max_records;
Expand Down Expand Up @@ -143,16 +159,10 @@ struct operation_process_fill_order
}
}

// To update buckets data, only update for maker orders
// To update ticker data and buckets data, only update for maker orders
if( !o.is_maker )
return;

const auto max_history = _plugin.max_history();
if( max_history == 0 ) return;

const auto& buckets = _plugin.tracked_buckets();
if( buckets.size() == 0 ) return;

bucket_key key;
key.base = o.pays.asset_id;
key.quote = o.receives.asset_id;
Expand All @@ -169,6 +179,40 @@ struct operation_process_fill_order
if( fill_price.base.asset_id > fill_price.quote.asset_id )
fill_price = ~fill_price;

// To update ticker data
const auto& ticker_idx = db.get_index_type<market_ticker_index>().indices().get<by_market>();
auto ticker_itr = ticker_idx.find( std::make_tuple( key.base, key.quote ) );
if( ticker_itr == ticker_idx.end() )
{
db.create<market_ticker_object>( [&]( market_ticker_object& mt ) {
mt.base = key.base;
mt.quote = key.quote;
mt.last_day_base = 0;
mt.last_day_quote = 0;
mt.latest_base = fill_price.base.amount;
mt.latest_quote = fill_price.quote.amount;
mt.base_volume = trade_price.base.amount.value;
mt.quote_volume = trade_price.quote.amount.value;
});
}
else
{
db.modify( *ticker_itr, [&]( market_ticker_object& mt ) {
mt.latest_base = fill_price.base.amount;
mt.latest_quote = fill_price.quote.amount;
mt.base_volume += trade_price.base.amount.value; // ignore overflow
mt.quote_volume += trade_price.quote.amount.value; // ignore overflow
});
}

// To update buckets data
const auto max_history = _plugin.max_history();
if( max_history == 0 ) return;

const auto& buckets = _plugin.tracked_buckets();
if( buckets.size() == 0 ) return;

const auto& bucket_idx = db.get_index_type<bucket_index>();
for( auto bucket : buckets )
{
auto bucket_num = _now.sec_since_epoch() / bucket;
Expand Down Expand Up @@ -262,10 +306,79 @@ void market_history_plugin_impl::update_market_histories( const signed_block& b
{
try
{
o_op->op.visit( operation_process_fill_order( _self, b.timestamp ) );
o_op->op.visit( operation_process_fill_order( _self, b.timestamp, _meta ) );
} FC_CAPTURE_AND_LOG( (o_op) )
}
}
// roll out expired data from ticker
if( _meta != nullptr )
{
time_point_sec last_day = b.timestamp - 86400;
object_id_type last_min_his_id = _meta->rolling_min_order_his_id;
bool skip = _meta->skip_min_order_his_id;

const auto& ticker_idx = db.get_index_type<market_ticker_index>().indices().get<by_market>();
const auto& history_idx = db.get_index_type<history_index>().indices().get<by_id>();
auto history_itr = history_idx.lower_bound( _meta->rolling_min_order_his_id );
while( history_itr != history_idx.end() && history_itr->time < last_day )
{
const fill_order_operation& o = history_itr->op;
if( skip && history_itr->id == _meta->rolling_min_order_his_id )
skip = false;
else if( o.is_maker )
{
bucket_key key;
key.base = o.pays.asset_id;
key.quote = o.receives.asset_id;

price trade_price = o.pays / o.receives;

if( key.base > key.quote )
{
std::swap( key.base, key.quote );
trade_price = ~trade_price;
}

price fill_price = o.fill_price;
if( fill_price.base.asset_id > fill_price.quote.asset_id )
fill_price = ~fill_price;

auto ticker_itr = ticker_idx.find( std::make_tuple( key.base, key.quote ) );
if( ticker_itr != ticker_idx.end() ) // should always be true
{
db.modify( *ticker_itr, [&]( market_ticker_object& mt ) {
mt.last_day_base = fill_price.base.amount;
mt.last_day_quote = fill_price.quote.amount;
mt.base_volume -= trade_price.base.amount.value; // ignore underflow
mt.quote_volume -= trade_price.quote.amount.value; // ignore underflow
});
}
}
last_min_his_id = history_itr->id;
++history_itr;
}
// update meta
if( history_itr != history_idx.end() ) // if still has some data rolling
{
if( history_itr->id != _meta->rolling_min_order_his_id ) // if rolled out some
{
db.modify( *_meta, [&]( market_ticker_meta_object& mtm ) {
mtm.rolling_min_order_his_id = history_itr->id;
mtm.skip_min_order_his_id = false;
});
}
}
else // if all data are rolled out
{
if( last_min_his_id != _meta->rolling_min_order_his_id ) // if rolled out some
{
db.modify( *_meta, [&]( market_ticker_meta_object& mtm ) {
mtm.rolling_min_order_his_id = last_min_his_id;
mtm.skip_min_order_his_id = true;
});
}
}
}
}

} // end namespace detail
Expand Down Expand Up @@ -312,6 +425,8 @@ void market_history_plugin::plugin_initialize(const boost::program_options::vari
database().applied_block.connect( [&]( const signed_block& b){ my->update_market_histories(b); } );
database().add_index< primary_index< bucket_index > >();
database().add_index< primary_index< history_index > >();
database().add_index< primary_index< market_ticker_index > >();
database().add_index< primary_index< simple_index< market_ticker_meta_object > > >();

if( options.count( "bucket-size" ) )
{
Expand Down

0 comments on commit ed11ede

Please sign in to comment.