Skip to content

Recorder Snapshot #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
chenhao94 opened this issue Nov 9, 2022 · 10 comments
Open

Recorder Snapshot #97

chenhao94 opened this issue Nov 9, 2022 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@chenhao94
Copy link
Collaborator

The recorder should generate snapshots periodically so that others can use them while keeping recording new data. There should also be a rotating mechanism so that we only keep last a few snapshots.

@chenhao94 chenhao94 added the enhancement New feature or request label Nov 9, 2022
@chenhao94
Copy link
Collaborator Author

cc @xkszltl for visibility.

@YuzhouGuo
Copy link
Contributor

Gonna paste some work I did so far to make sure still on the right track:
In trade.cc

    TradingLauncher launcher(
        std::move(launcher_config),
        strategy,
        market_contract_configs,
        std::move(runner_config_file));

    // names and internals can be specified from commands, please ignore the hardcode
    std::filesystem::create_directory("record_snapshots");

    // self-implemented class
    Timer t = Timer();
    t.setInterval([&]() {
        time_t now = time(0);
        char* current = ctime(&now);
        std::filesystem::copy(FLAGS_record_dir, "record_snapshots/" + FLAGS_record_dir + current, std::filesystem::copy_options::recursive);
        // an if condition will be put here to withdraw the earliest snapshot if we hit a certain number of records
    }, 5000);

    launcher.Start();
    launcher.Wait();

And for this Timer class, this is what I did using std::thread:

void Timer::setInterval(auto function, int interval) {
    this->clear = false;
    std::thread t([=]() {
        while(true) {
            if(this->clear) return;
            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
            if(this->clear) return;
            function();
        }
    });
    t.detach();
}

The output works cool generating snapshots.

@xkszltl
Copy link
Collaborator

xkszltl commented Nov 11, 2022

  • You cannot simply copy a dir or file, because that's not transactional. Instead you'll need to pause the on-going dataflow so that nothing new can be written during your copy, do the copy, then resume it.
  • There should be hourly/daily/weekly/monthly snapshots, they're independent and can overlap.
  • There should be an "0" snapshot at the beginning for each interval, or the monthly code won't be run at all within a month and we'll have no idea whether it actually works.
  • Each interval should have a "latest" symlink, pointing to the last snapshot after it's completed written. When rolling old snapshot, make sure the symlink has been moved away first.

@chenhao94
Copy link
Collaborator Author

chenhao94 commented Nov 12, 2022

  • There should be hourly/daily/weekly/monthly snapshots, they're independent and can overlap.

这没意义,高频率snapshot会包含低频率的

@xkszltl
Copy link
Collaborator

xkszltl commented Nov 13, 2022

这没意义,高频率snapshot会包含低频率的

  1. 高低频应该分别rolling,实现类似于exp rolling但是又好算的效果,比如hourly最多48个,daily最多就7个或者30个,
  2. 允许overlap是为了方便读取,不然取的时候还要merge

@YuzhouGuo
Copy link
Contributor

Latest work: (logging messages only for local debugging)

  • Timer class moved under cris::core together with the MessageRecorder class for now.

In the MessageRecorder class, mutex and cv were added for directory copy communication to prevent information loss, this is what I'm trying to do:

template<CRMessageType message_t>
void MessageRecorder::RegisterChannel(const MessageRecorder::channel_subid_t subid, const std::string& alias) {
    auto* record_file = CreateFile(GetTypeName<message_t>(), subid, alias);

    this->Subscribe<message_t>(
        subid,
        [this, record_file, alias](const std::shared_ptr<message_t>& message) {
            std::unique_lock lock(recorder_mtx_);
            if(dataflow_paused) {
                snapshot_cv_.wait(lock, []{ return data_copied_; });
                printf("\nDataflow paused waiting to be copied\n");
            }
            printf("\nBack to data writing\n");
            record_file->Write(MessageToStr(*message));
        },
        /* allow_concurrency = */ false);
}

Basically, I'm trying to pass in an instance reference to the lambda so the locking works out, which leads to a problem, is there any reason why we stated MessageRecorder(MessageRecorder&&) = default; to prevent reference capture?

@chenhao94
Copy link
Collaborator Author

@YuzhouGuo why do you put your code here? PR is the correct place for doing that, issue is for discussing ideas and solutions.

@YuzhouGuo
Copy link
Contributor

OK I will start a PR then

@YuzhouGuo
Copy link
Contributor

YuzhouGuo commented Nov 14, 2022

I started a new PR and I do have quite a lot of questions:

  • First of all, I am not sure it is a good idea to insert a wait functionality inside the Subscribe function at all. What if there're messages coming at the same time waiting for the snapshot generating? Should we make some sort of buffer system to store the message? If this is not the case then where should we pause the dataflow?
  • From the last test case of msg_record_test running forever, it should be an infinite call somewhere with my mutex and cv implementation, I spent quite some time today studying but didn't find out, any help with be appreciated!
  • Is there any reason why we stated MessageRecorder(MessageRecorder&&) = default; to prevent reference capture?
  • Didn't figure out the symlink thing entirely but I will add it to the next draft.

Thank you so much.

@chenhao94
Copy link
Collaborator Author

@YuzhouGuo If you want to discuss PR, please do it in the PR. Thanks.

Move the discussion to #101 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants