Skip to content

Commit 04283b9

Browse files
committed
some modifications
1 parent 9e5f899 commit 04283b9

File tree

5 files changed

+51
-55
lines changed

5 files changed

+51
-55
lines changed

CMakeLists.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
cmake_minimum_required(VERSION 3.15)
22

3+
project(example)
4+
35
set(CMAKE_CXX_STANDARD 17)
46
set(CMAKE_CXX_EXTENSIONS FALSE)
57

@@ -32,4 +34,4 @@ foreach(mainfile IN LISTS mains)
3234
add_executable(${mainname} ${mainfile})
3335
target_include_directories(${mainname} PUBLIC ${DEPS_DEPLOY_DIR}/spdlog-src/include/)
3436
target_link_libraries(spdlog)
35-
endforeach()
37+
endforeach()

README.md

+26-15
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,10 @@ fantasy::Reactor reactor;
1111
reactor.run();
1212

1313
// It will run on the reactor thread, do not block the current thread
14-
reactor.callLater([&] {
15-
printf("task");
16-
});
14+
reactor.callLater([&] { spdlog::info("task"); });
1715

1816
// It will run on the reactor thread, block the current thread
19-
reactor.callNow([&] {
20-
printf("task");
21-
});
17+
reactor.callNow([&] { spdlog::info("task"); });
2218

2319
```
2420
@@ -28,24 +24,20 @@ fantasy::Reactor reactor;
2824
reactor.run();
2925
3026
// It will run in one second
31-
reactor.callAt(std::chrono::system_clock::now() + std::chrono::seconds(1), [] {
32-
printf("callAt");
33-
});
27+
reactor.callAt(std::chrono::system_clock::now() + std::chrono::seconds(1), [] { spdlog::info("callAt"); });
3428
3529
// It will run in five second
36-
reactor.callAfter(std::chrono::seconds(5), [] {
37-
printf("callAfter");
38-
});
30+
reactor.callAfter(std::chrono::seconds(5), [] { spdlog::info("callAfter"); });
3931
4032
// Run every three seconds
4133
reactor.callEvery(std::chrono::seconds(3), [] {
42-
printf("callEvery");
34+
spdlog::info("callEvery");
4335
return fantasy::Reactor::CallStatus::Ok;
4436
});
4537
4638
// Run every day 05:30:00
4739
auto id = reactor.callEveryDay(fantasy::Time{5, 30, 0, 0}, [] {
48-
printf("callEveryDay");
40+
spdlog::info("callEveryDay");
4941
return fantasy::Reactor::CallStatus::Ok;
5042
});
5143
@@ -57,7 +49,23 @@ reactor.cancel(id);
5749
```c++
5850
fantasy::Reactor reactor;
5951
reactor.run();
60-
...
52+
std::string recv_buffer;
53+
int servfd;
54+
struct sockaddr_in servaddr, cliaddr;
55+
if ((servfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
56+
spdlog::info("create socket error!");
57+
exit(1);
58+
}
59+
bzero(&servaddr, sizeof(servaddr));
60+
servaddr.sin_family = AF_INET;
61+
servaddr.sin_port = htons(SERVER_PORT);
62+
servaddr.sin_addr.s_addr = htons(INADDR_ANY);
63+
int opt = 1;
64+
setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
65+
if (bind(servfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
66+
spdlog::info("bind to port {} failure!", SERVER_PORT);
67+
exit(1);
68+
}
6169
if (listen(servfd, LENGTH_OF_LISTEN_QUEUE) < 0) {
6270
spdlog::info("call listen failure!");
6371
exit(1);
@@ -88,9 +96,12 @@ reactor.callOnRead(servfd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Cha
8896
return fantasy::Reactor::CallStatus::Ok;
8997
});
9098
reactor.callOnWrite(clifd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Channel>& channel_ptr) {
99+
if (recv_buffer.empty())
100+
return fantasy::Reactor::CallStatus::Ok;
91101
spdlog::info("callOnWrite");
92102
char buffer[BUFFER_SIZE] = {};
93103
memcpy(buffer, recv_buffer.c_str(), recv_buffer.size());
104+
recv_buffer.clear();
94105
spdlog::info("buffer: {}", buffer);
95106
auto n = write(fd, buffer, strlen(buffer));
96107
if (n < 0) {

example/echo_server.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,12 @@ int main() {
6161
return fantasy::Reactor::CallStatus::Ok;
6262
});
6363
reactor.callOnWrite(clifd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Channel>& channel_ptr) {
64+
if (recv_buffer.empty())
65+
return fantasy::Reactor::CallStatus::Ok;
6466
spdlog::info("callOnWrite");
6567
char buffer[BUFFER_SIZE] = {};
6668
memcpy(buffer, recv_buffer.c_str(), recv_buffer.size());
69+
recv_buffer.clear();
6770
spdlog::info("buffer: {}", buffer);
6871
auto n = write(fd, buffer, strlen(buffer));
6972
if (n < 0) {

example/test_reactor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ int main() {
4747
std::this_thread::sleep_for(std::chrono::seconds(3));
4848
});
4949
spdlog::info("end callNow");
50-
while (true)
51-
std::this_thread::sleep_for(std::chrono::seconds(1));
50+
while (true) {
51+
std::this_thread::sleep_for(std::chrono::seconds(2));
52+
}
5253
return 0;
5354
}

reactor.hpp

+16-37
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ class Reactor : public NonCopyable {
551551
}
552552

553553
void cancel(const TimerId& timer_id) {
554-
m_reactor.callNow([=] {
554+
m_reactor.callNow([&] {
555555
auto i = std::find_if(m_timed_callbacks.begin(), m_timed_callbacks.end(), [&](auto& callback) {
556556
return callback.second->id() == timer_id;
557557
});
@@ -584,7 +584,6 @@ class Reactor : public NonCopyable {
584584
});
585585
if (m_thread.joinable())
586586
m_thread.join();
587-
m_wakeup_channel_ptr->disableAll();
588587
}
589588

590589
auto size() const {
@@ -662,19 +661,19 @@ class Reactor : public NonCopyable {
662661
return p.second->fd() == fd;
663662
});
664663
if (it != m_work_channels.end())
665-
return std::pair(it->first, it->second);
664+
return it;
666665
auto channel_ptr = std::make_shared<Channel>(*this, fd);
667666
CallId call_id;
668-
m_work_channels.emplace(call_id, channel_ptr);
669-
return std::pair(std::move(call_id), std::move(channel_ptr));
667+
auto ret = m_work_channels.emplace(std::move(call_id), std::move(channel_ptr));
668+
return ret.first;
670669
}
671670

672671
public:
673672
CallId callOnRead(int fd, IoCall&& io_call, bool is_enable_reading = true) {
674673
return callNow([&] {
675-
auto [call_id, channel_ptr] = get(fd);
674+
auto& [call_id, channel_ptr] = *get(fd);
676675
std::weak_ptr<Channel> weak_channel_ptr = channel_ptr;
677-
channel_ptr->setReadCallback([this, weak_channel_ptr = std::move(weak_channel_ptr), fd, call_id, io_call = std::move(io_call)] {
676+
channel_ptr->setReadCallback([this, weak_channel_ptr = std::move(weak_channel_ptr), call_id, fd, io_call = std::move(io_call)] {
678677
auto call_status = io_call(fd, weak_channel_ptr);
679678
if (call_status == fantasy::Reactor::CallStatus::Remove)
680679
cancel(call_id);
@@ -689,27 +688,9 @@ class Reactor : public NonCopyable {
689688
return callOnRead(fd, IoCall(call));
690689
}
691690

692-
void callOnEnableReading(const CallId& call_id) {
693-
callNow([&] {
694-
auto it = m_work_channels.find(call_id);
695-
if (it == m_work_channels.end())
696-
return;
697-
it->second->enableReading();
698-
});
699-
}
700-
701-
void callOnEnableWriting(const CallId& call_id) {
702-
callNow([&] {
703-
auto it = m_work_channels.find(call_id);
704-
if (it == m_work_channels.end())
705-
return;
706-
it->second->enableWriting();
707-
});
708-
}
709-
710691
CallId callOnWrite(int fd, IoCall&& io_call, bool is_enable_writing = false) {
711692
return callNow([&] {
712-
auto [call_id, channel_ptr] = get(fd);
693+
auto& [call_id, channel_ptr] = *get(fd);
713694
std::weak_ptr<Channel> weak_channel_ptr = channel_ptr;
714695
channel_ptr->setWriteCallback([this, weak_channel_ptr = std::move(weak_channel_ptr), fd, call_id, io_call = std::move(io_call)] {
715696
auto call_status = io_call(fd, weak_channel_ptr);
@@ -728,7 +709,7 @@ class Reactor : public NonCopyable {
728709

729710
CallId callOnClose(int fd, IoCall&& io_call) {
730711
return callNow([&] {
731-
auto [call_id, channel_ptr] = get(fd);
712+
auto& [call_id, channel_ptr] = *get(fd);
732713
std::weak_ptr<Channel> weak_channel_ptr = channel_ptr;
733714
channel_ptr->setCloseCallback([this, weak_channel_ptr = std::move(weak_channel_ptr), fd, call_id, io_call = std::move(io_call)] {
734715
auto call_status = io_call(fd, weak_channel_ptr);
@@ -745,7 +726,7 @@ class Reactor : public NonCopyable {
745726

746727
CallId callOnError(int fd, IoCall&& io_call) {
747728
return callNow([&] {
748-
auto [call_id, channel_ptr] = get(fd);
729+
auto& [call_id, channel_ptr] = *get(fd);
749730
std::weak_ptr<Channel> weak_channel_ptr = channel_ptr;
750731
channel_ptr->setErrorCallback([this, weak_channel_ptr = std::move(weak_channel_ptr), fd, call_id, io_call = std::move(io_call)] {
751732
auto call_status = io_call(fd, weak_channel_ptr);
@@ -763,7 +744,7 @@ class Reactor : public NonCopyable {
763744
template <typename T>
764745
void cancel(const T& id) {
765746
if constexpr (std::is_same_v<T, CallId>) {
766-
callNow([this, id] {
747+
callNow([&] {
767748
if (auto it = m_work_channels.find(id); it != m_work_channels.end()) {
768749
it->second->disableAll();
769750
m_release_channel.emplace_back(std::move(it->second));
@@ -776,12 +757,10 @@ class Reactor : public NonCopyable {
776757
}
777758

778759
void run() {
779-
m_timer_queue_ptr = std::make_unique<TimerManager>(*this);
780-
m_wakeup_channel_ptr = std::make_unique<Channel>(*this, m_wakeup_fd_ptr[0]);
781-
m_wakeup_channel_ptr->setReadCallback(std::bind(&Reactor::wakeupRead, this));
782-
m_wakeup_channel_ptr->enableReading();
783760
m_thread = std::thread([&] {
784761
m_thread_id = std::this_thread::get_id();
762+
m_timer_queue_ptr = std::make_unique<TimerManager>(*this);
763+
auto call_id = callOnRead(m_wakeup_fd_ptr[0], [&](int fd, const std::weak_ptr<Channel>&) { wakeupRead(fd); return CallStatus::Ok; });
785764
try {
786765
for (;;) {
787766
auto active_channels = m_epoller_ptr->poll(-1);
@@ -793,6 +772,7 @@ class Reactor : public NonCopyable {
793772
}
794773
} catch (const Shutdown&) {
795774
std::lock_guard<std::mutex> lk(m_mtx);
775+
cancel(call_id);
796776
for (auto& func : m_pending_functors)
797777
func();
798778
}
@@ -815,9 +795,9 @@ class Reactor : public NonCopyable {
815795
m_epoller_ptr->updateChannel(channel);
816796
}
817797

818-
void wakeupRead() {
798+
void wakeupRead(const int& fd) {
819799
char c;
820-
if (read(m_wakeup_fd_ptr[0], &c, sizeof(c)) == -1 && m_error_callback)
800+
if (read(fd, &c, sizeof(c)) == -1 && m_error_callback)
821801
m_error_callback(__FILE__, __LINE__, errno);
822802
std::unique_lock<std::mutex> lk(m_mtx);
823803
auto pending_functors = std::move(m_pending_functors);
@@ -828,7 +808,6 @@ class Reactor : public NonCopyable {
828808

829809
std::unique_ptr<Epoll> m_epoller_ptr;
830810
std::unique_ptr<int[], std::function<void(int*)>> m_wakeup_fd_ptr;
831-
std::unique_ptr<Channel> m_wakeup_channel_ptr;
832811
std::thread::id m_thread_id;
833812
std::thread m_thread;
834813
std::vector<Functor> m_pending_functors;
@@ -846,4 +825,4 @@ class Reactor : public NonCopyable {
846825

847826
} // namespace fantasy
848827

849-
#endif // _REACTOR_HPP_INCLUD_ED
828+
#endif // _REACTOR_HPP_INCLUD_ED

0 commit comments

Comments
 (0)