|
本來想詳細介紹下 boost::asio 但 asio 內部細節實在很多 很難 簡單講明白 故 就隨便 胡謅幾句
如果你對 asio完全不瞭解 或對協程 不清楚 不要看此文 此文僅僅對於 已經玩過 asio 但尚未玩過其協程 的人提供一個使用方式的示例
雖然 孤現在服務器都會優先用go 然世事總有很多無奈 最近的一個服務器 被迫不能用go 於是用了 boost::asio
作爲墮落的人類 一旦享受了 名利 就不會甘願平淡 故而 一旦使用了 go的 協程 就不會再忍受 asio 最初的 回調式 代碼
boost/asio/spawn.hpp 中的 boost::asio::spawn 函數 要求一個簽名爲 void(*)(boost::asio::yield_context) 的functor 將其作爲一個 協程運行
yield_context 由asio自動傳入 其中 記錄了上下文 信息用於 讓出cpu和恢復協程 運行
使用 asio 的 async_XXX 函數 現在都 可以接受一個 yield_context& 對象用於 異步執行同時 讓出cpu 當異步完成後 asio將 負責 恢復到當前位置 繼續執行
只需要知道上面兩點 就可以將 以往的 asio 回調式代碼 改爲 協程 模式 下面直接給出示例 (有問題 請在回覆中提出)
(如果使用 協程 你必須將 asio之外的 異步適配到能夠支持 asio 否則 依然無法避免 回調式 代碼
考慮到 文章長度 如何適配 異步代碼到 asio 將在另外 一篇文章中描述)
下面是一個 echo 的服務器代碼 示例
服務器 接到到 一個 以 \n 結尾的 消息後 將 消息 原樣傳回 之後 斷開連接
(如果要編譯通過 請確認編譯器 支持 c++11
代碼 在 Linux king-XXX 4.4.0-78-generic #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
Thread model: posix
gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4)
boost1.6.04 中測試通過
其他平臺 可能需要適當修改代碼
)
- #include <cstdint>
-
- #include <boost/asio.hpp>
- #include <boost/thread.hpp>
- #include <boost/smart_ptr.hpp>
- #include <boost/asio/spawn.hpp>
- #include <boost/bind.hpp>
-
- #define LISTEN_ADDRESS "0.0.0.0"
- #define LISTEN_PORT 8080
- #define BUFFER_SIZE 1024
- #define COMPARE_GET 50
-
-
- typedef boost::asio::io_service io_service_t;
- typedef boost::asio::ip::tcp::acceptor acceptor_t;
- typedef boost::asio::ip::tcp::socket socket_t;
- typedef boost::shared_ptr<socket_t> socket_spt;
-
- void coroutine_read(socket_spt sp,std::size_t* client,boost::asio::yield_context ctx);
- class server_t
- {
- public:
- //listen
- io_service_t _service;
- acceptor_t _acceptor;
- //read
- io_service_t* _services = NULL;
- std::size_t* _clients = NULL;
- std::size_t _n = 0;
- std::size_t _flag = COMPARE_GET;
- std::size_t _pos = 0;
-
- server_t()
- :_acceptor(_service,boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(LISTEN_ADDRESS),LISTEN_PORT))
- {
- std::size_t n = boost::thread::hardware_concurrency();
- _n = n;
- _services = new io_service_t[n];
- _clients = new std::size_t[n];
- for(std::size_t i=0; i<n; ++i)
- {
- _clients[i] = 0;
- boost::thread([services=_services,i=i]()
- {
- boost::asio::deadline_timer timer(services[i],boost::posix_time::hours(24 * 365 * 10));
- timer.async_wait([](const boost::system::error_code& e) {});
- services[i].run();
- });
- }
- }
- ~server_t()
- {
- if(_services)
- {
- delete[] _services;
- }
- if(_clients)
- {
- delete[] _clients;
- }
- }
- void run()
- {
- post_accept();
- std::cout<<"work at "<<LISTEN_ADDRESS<<":"<<LISTEN_PORT<<std::endl;
- _service.run();
- }
- void post_accept()
- {
- std::size_t i = get_service();
-
- socket_spt sock = boost::make_shared<socket_t>(_services[i]);
- _acceptor.async_accept(*sock,
- boost::bind(&server_t::post_accept_handler,
- this,
- boost::asio::placeholders::error,
- sock,
- &_clients[i])
- );
- }
- std::size_t get_service()
- {
- if(_flag)
- {
- --_flag;
- return _pos;
- }
-
- std::size_t min = _clients[0];
- _pos = 0;
- for(std::size_t i=1; i<_n; ++i)
- {
- if(_clients[i] < min)
- {
- min = _clients[i];
- _pos = i;
- }
- }
- _flag = COMPARE_GET - 1;
- return _pos;
- }
- void post_accept_handler(const boost::system::error_code& ec,socket_spt sock,std::size_t* client)
- {
- post_accept();
- if(ec)
- {
- return;
- }
-
- io_service_t& service = sock->get_io_service();
- service.post([client=client]()
- {
- ++(*client);
- });
-
- //爲 socket 啓動 通信 coroutine
- boost::asio::spawn(service,boost::bind(coroutine_read,sock,client,_1));
- }
- };
-
- int main()
- {
- try
- {
- server_t s;
- s.run();
- }
- catch(const std::bad_alloc& e)
- {
- std::cout<<e.what()<<std::endl;
- }
- catch(const boost::system::system_error& e)
- {
- std::cout<<e.what()<<std::endl;
- }
- return 0;
- }
-
- void coroutine_read(socket_spt sp,std::size_t* client,boost::asio::yield_context ctx)
- {
- boost::system::error_code ec;
- socket_t& s = *sp;
- try
- {
- //讀取消息
- std::uint8_t buffer[BUFFER_SIZE];
- std::size_t pos = 0;
- while(pos < BUFFER_SIZE)
- {
- //接收消息 yield
- std::size_t n = s.async_read_some(boost::asio::buffer(buffer + pos,BUFFER_SIZE - pos),ctx);
- pos += n;
- if(buffer[pos-1] == '\n')
- {
- //echo
- s.async_write_some(boost::asio::buffer(buffer,pos),ctx);
- break;
- }
- }
-
- //退出 關閉 socket
- }
- catch(const boost::system::system_error& se)
- {
- std::cout<<se.what()<<"\n";
- }
-
- s.shutdown(socket_t::shutdown_both,ec);
- s.close(ec);
-
- --(*client);
- }
复制代码
下面是測試時使用的一個 測試 客戶端的 go代碼
- package main
-
- import (
- "flag"
- "log"
- "net"
- "time"
- )
-
- const (
- Addr = "127.0.0.1:8080"
- BufferSize = 1024
- Timeout = time.Second * 2
-
- StatueOk = 0
- StatueTimeout = 1
- StatueNoConnect = 2
- StatueError = 3
- )
-
- func main() {
- var addr string
- flag.StringVar(&addr, "addr", Addr, "remote address")
- flag.StringVar(&addr, "a", Addr, "remote address")
- var h bool
- flag.BoolVar(&h, "h", false, "show help")
- flag.BoolVar(&h, "help", false, "show help")
- var count int
- flag.IntVar(&count, "count", 1024, "number for connect")
- flag.IntVar(&count, "c", 1024, "number for connect")
-
- flag.Parse()
-
- if h {
- flag.PrintDefaults()
- return
- }
-
- now := time.Now()
- ch := make(chan int)
- for i := 0; i < count; i++ {
- go doConnect(ch)
- }
-
- sumOk := 0
- sumTime := 0
- sumConnect := 0
- sumError := 0
- for count > 0 {
- switch <-ch {
- case StatueOk:
- sumOk++
- case StatueTimeout:
- sumTime++
- case StatueNoConnect:
- sumConnect++
- case StatueError:
- sumError++
- }
- count--
- }
-
- log.Println("ok :", sumOk)
- //log.Println("timeout :", sumTime)
- log.Println("noconnect :", sumConnect)
- log.Println("error :", sumError)
- log.Println("time :", time.Now().Sub(now))
- }
- func doConnect(ch chan int) {
- /*chStatue := make(chan int, 2)
- go func() {
- time.Sleep(Timeout)
- chStatue <- StatueTimeout
- }()
- go func() {
- chStatue <- goConnect()
- }()
- statue := <-chStatue
- ch <- statue
- */
- ch <- goConnect()
- }
- func goConnect() int {
- c, e := net.Dial("tcp", Addr)
- if e != nil {
-
- return StatueNoConnect
- }
- defer c.Close()
-
- str := "i'm king\n"
- _, e = c.Write([]byte(str))
- if e != nil {
- return StatueError
- }
-
- b := make([]byte, BufferSize)
- pos := 0
- for {
- n, e := c.Read(b[pos:])
- if e != nil {
- break
- }
- pos += n
- if b[pos-1] == '\n' {
- //check echo
- if string(b[:pos]) == str {
- return StatueOk
- }
- }
- //large than max msg
- if pos == BufferSize {
- break
- }
- }
- return StatueError
- }
复制代码 |
上一篇: 有深圳的做mfc的朋友来交流吗下一篇: 適配異步代碼到 asio 協程
|