VC驿站

 找回密码
 加入驿站

QQ登录

只需一步,快速开始

搜索
查看: 1092|回复: 3

[交流] boost::asio 協程

[复制链接]
74_avatar_middle
在线会员 发表于 2017-5-30 10:50:10 | 显示全部楼层 |阅读模式
本來想詳細介紹下 boost::asio 但 asio 內部細節實在很多 很難 簡單講明白 故 就隨便 胡謅幾句
如果你對 asio完全不瞭解 或對協程  不清楚 不要看此文 此文僅僅對於 已經玩過 asio 但尚未玩過其協程 的人提供一個使用方式的示例

雖然 孤現在服務器都會優先用go 然世事總有很多無奈 最近的一個服務器 被迫不能用go 於是用了 boost::asio
作爲墮落的人類 一旦享受了 名利 就不會甘願平淡 故而 一旦使用了 go的 協程  就不會再忍受 asio 最初的 回調式 代碼

boost/asio/spawn.hpp 中的 boost::asio::spawn 函數 要求一個簽名爲 void(*)(boost::asio::yield_context) 的functor 將其作爲一個 協程運行
yield_context 由asio自動傳入 其中 記錄了上下文 信息用於 讓出cpu和恢復協程 運行

使用 asio 的 async_XXX 函數 現在都 可以接受一個 yield_context& 對象用於 異步執行同時 讓出cpu 當異步完成後 asio將 負責 恢復到當前位置 繼續執行

只需要知道上面兩點 就可以將 以往的 asio 回調式代碼 改爲 協程 模式 下面直接給出示例 (有問題 請在回覆中提出)
(如果使用 協程 你必須將 asio之外的 異步適配到能夠支持 asio 否則 依然無法避免 回調式 代碼
考慮到 文章長度 如何適配 異步代碼到 asio 將在另外 一篇文章中描述)

下面是一個 echo 的服務器代碼 示例
服務器 接到到 一個 以 \n 結尾的 消息後 將 消息 原樣傳回 之後 斷開連接
(如果要編譯通過 請確認編譯器 支持 c++11
代碼 在 Linux king-XXX 4.4.0-78-generic #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

Thread model: posix
gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4)

boost1.6.04 中測試通過
其他平臺 可能需要適當修改代碼
)
  1. #include <cstdint>

  2. #include <boost/asio.hpp>
  3. #include <boost/thread.hpp>
  4. #include <boost/smart_ptr.hpp>
  5. #include <boost/asio/spawn.hpp>
  6. #include <boost/bind.hpp>

  7. #define LISTEN_ADDRESS "0.0.0.0"
  8. #define LISTEN_PORT 8080
  9. #define BUFFER_SIZE 1024
  10. #define COMPARE_GET 50


  11. typedef boost::asio::io_service io_service_t;
  12. typedef boost::asio::ip::tcp::acceptor acceptor_t;
  13. typedef boost::asio::ip::tcp::socket socket_t;
  14. typedef boost::shared_ptr<socket_t> socket_spt;

  15. void coroutine_read(socket_spt sp,std::size_t* client,boost::asio::yield_context ctx);
  16. class server_t
  17. {
  18. public:
  19.     //listen
  20.     io_service_t _service;
  21.     acceptor_t _acceptor;
  22.     //read
  23.     io_service_t* _services = NULL;
  24.     std::size_t* _clients = NULL;
  25.     std::size_t _n = 0;
  26.     std::size_t _flag = COMPARE_GET;
  27.     std::size_t _pos = 0;

  28.     server_t()
  29.         :_acceptor(_service,boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(LISTEN_ADDRESS),LISTEN_PORT))
  30.     {
  31.         std::size_t n = boost::thread::hardware_concurrency();
  32.         _n = n;
  33.         _services = new io_service_t[n];
  34.         _clients = new std::size_t[n];
  35.         for(std::size_t i=0; i<n; ++i)
  36.         {
  37.             _clients[i] = 0;
  38.             boost::thread([services=_services,i=i]()
  39.             {
  40.                 boost::asio::deadline_timer timer(services[i],boost::posix_time::hours(24 * 365 * 10));
  41.                 timer.async_wait([](const boost::system::error_code& e) {});
  42.                 services[i].run();
  43.             });
  44.         }
  45.     }
  46.     ~server_t()
  47.     {
  48.         if(_services)
  49.         {
  50.             delete[] _services;
  51.         }
  52.         if(_clients)
  53.         {
  54.             delete[] _clients;
  55.         }
  56.     }
  57.     void run()
  58.     {
  59.         post_accept();
  60.         std::cout<<"work at "<<LISTEN_ADDRESS<<":"<<LISTEN_PORT<<std::endl;
  61.         _service.run();
  62.     }
  63.     void post_accept()
  64.     {
  65.         std::size_t i = get_service();

  66.         socket_spt sock = boost::make_shared<socket_t>(_services[i]);
  67.         _acceptor.async_accept(*sock,
  68.                                boost::bind(&server_t::post_accept_handler,
  69.                                            this,
  70.                                            boost::asio::placeholders::error,
  71.                                            sock,
  72.                                            &_clients[i])
  73.                               );
  74.     }
  75.     std::size_t get_service()
  76.     {
  77.         if(_flag)
  78.         {
  79.             --_flag;
  80.              return _pos;
  81.         }

  82.         std::size_t min = _clients[0];
  83.         _pos = 0;
  84.         for(std::size_t i=1; i<_n; ++i)
  85.         {
  86.             if(_clients[i] < min)
  87.             {
  88.                 min = _clients[i];
  89.                 _pos = i;
  90.             }
  91.         }
  92.         _flag = COMPARE_GET - 1;
  93.          return _pos;
  94.     }
  95.     void post_accept_handler(const boost::system::error_code& ec,socket_spt sock,std::size_t* client)
  96.     {
  97.         post_accept();
  98.         if(ec)
  99.         {
  100.             return;
  101.         }

  102.         io_service_t& service = sock->get_io_service();
  103.         service.post([client=client]()
  104.         {
  105.             ++(*client);
  106.         });

  107.         //爲 socket 啓動 通信 coroutine
  108.         boost::asio::spawn(service,boost::bind(coroutine_read,sock,client,_1));
  109.     }
  110. };

  111. int main()
  112. {
  113.     try
  114.     {
  115.         server_t s;
  116.         s.run();
  117.     }
  118.     catch(const std::bad_alloc& e)
  119.     {
  120.         std::cout<<e.what()<<std::endl;
  121.     }
  122.     catch(const boost::system::system_error& e)
  123.     {
  124.         std::cout<<e.what()<<std::endl;
  125.     }
  126.     return 0;
  127. }

  128. void coroutine_read(socket_spt sp,std::size_t* client,boost::asio::yield_context ctx)
  129. {
  130.     boost::system::error_code ec;
  131.     socket_t& s = *sp;
  132.     try
  133.     {
  134.         //讀取消息
  135.         std::uint8_t buffer[BUFFER_SIZE];
  136.         std::size_t pos = 0;
  137.         while(pos < BUFFER_SIZE)
  138.         {
  139.             //接收消息 yield
  140.             std::size_t n = s.async_read_some(boost::asio::buffer(buffer + pos,BUFFER_SIZE - pos),ctx);
  141.             pos += n;
  142.             if(buffer[pos-1] == '\n')
  143.             {
  144.                 //echo
  145.                 s.async_write_some(boost::asio::buffer(buffer,pos),ctx);
  146.                 break;
  147.             }
  148.         }

  149.         //退出 關閉 socket
  150.     }
  151.     catch(const boost::system::system_error& se)
  152.     {
  153.         std::cout<<se.what()<<"\n";
  154.     }

  155.     s.shutdown(socket_t::shutdown_both,ec);
  156.     s.close(ec);

  157.     --(*client);
  158. }
复制代码


下面是測試時使用的一個 測試 客戶端的 go代碼
  1. package main

  2. import (
  3.         "flag"
  4.         "log"
  5.         "net"
  6.         "time"
  7. )

  8. const (
  9.         Addr       = "127.0.0.1:8080"
  10.         BufferSize = 1024
  11.         Timeout    = time.Second * 2

  12.         StatueOk        = 0
  13.         StatueTimeout   = 1
  14.         StatueNoConnect = 2
  15.         StatueError     = 3
  16. )

  17. func main() {
  18.         var addr string
  19.         flag.StringVar(&addr, "addr", Addr, "remote address")
  20.         flag.StringVar(&addr, "a", Addr, "remote address")
  21.         var h bool
  22.         flag.BoolVar(&h, "h", false, "show help")
  23.         flag.BoolVar(&h, "help", false, "show help")
  24.         var count int
  25.         flag.IntVar(&count, "count", 1024, "number for connect")
  26.         flag.IntVar(&count, "c", 1024, "number for connect")

  27.         flag.Parse()

  28.         if h {
  29.                 flag.PrintDefaults()
  30.                 return
  31.         }

  32.         now := time.Now()
  33.         ch := make(chan int)
  34.         for i := 0; i < count; i++ {
  35.                 go doConnect(ch)
  36.         }

  37.         sumOk := 0
  38.         sumTime := 0
  39.         sumConnect := 0
  40.         sumError := 0
  41.         for count > 0 {
  42.                 switch <-ch {
  43.                 case StatueOk:
  44.                         sumOk++
  45.                 case StatueTimeout:
  46.                         sumTime++
  47.                 case StatueNoConnect:
  48.                         sumConnect++
  49.                 case StatueError:
  50.                         sumError++
  51.                 }
  52.                 count--
  53.         }

  54.         log.Println("ok                :", sumOk)
  55.         //log.Println("timeout        :", sumTime)
  56.         log.Println("noconnect        :", sumConnect)
  57.         log.Println("error        :", sumError)
  58.         log.Println("time        :", time.Now().Sub(now))
  59. }
  60. func doConnect(ch chan int) {
  61.         /*chStatue := make(chan int, 2)
  62.         go func() {
  63.                 time.Sleep(Timeout)
  64.                 chStatue <- StatueTimeout
  65.         }()
  66.         go func() {
  67.                 chStatue <- goConnect()
  68.         }()
  69.         statue := <-chStatue
  70.         ch <- statue
  71.         */
  72.         ch <- goConnect()
  73. }
  74. func goConnect() int {
  75.         c, e := net.Dial("tcp", Addr)
  76.         if e != nil {

  77.                 return StatueNoConnect
  78.         }
  79.         defer c.Close()

  80.         str := "i'm king\n"
  81.         _, e = c.Write([]byte(str))
  82.         if e != nil {
  83.                 return StatueError
  84.         }

  85.         b := make([]byte, BufferSize)
  86.         pos := 0
  87.         for {
  88.                 n, e := c.Read(b[pos:])
  89.                 if e != nil {
  90.                         break
  91.                 }
  92.                 pos += n
  93.                 if b[pos-1] == '\n' {
  94.                         //check echo
  95.                         if string(b[:pos]) == str {
  96.                                 return StatueOk
  97.                         }
  98.                 }
  99.                 //large than max msg
  100.                 if pos == BufferSize {
  101.                         break
  102.                 }
  103.         }
  104.         return StatueError
  105. }
复制代码




上一篇:有深圳的做mfc的朋友来交流吗
下一篇:適配異步代碼到 asio 協程
51_avatar_middle
online_admins 发表于 2017-5-31 10:05:11 | 显示全部楼层
最近项目中用到了 libevent 这个开源跨平台的网络库,也还不错boost::asio 協程
74_avatar_middle
ico_lz  楼主| 发表于 2017-5-31 10:36:09 | 显示全部楼层
Syc 发表于 2017-5-31 10:05
最近项目中用到了 libevent 这个开源跨平台的网络库,也还不错

libevent 只聽說過是 c庫 沒用過 但 好像也是異步庫

高效的c/c++ 網絡庫都是 異步吧
這就是問題所在 異步編程很繁瑣 而且 容易出錯 所以孤才在玩go後 儘量不用c++寫代碼而用go

現在 孤知道的 c/c++ 網絡庫 能用協程的 只有asio 而且 asio的協程 和go一樣是可以工作在多線程上的 不像 一些語言所謂的協程 只能工作在單線程 上 完全不能利用 多cpu的優勢

而且在上面孤的示例代碼中 asio 其實 只需要 coroutine_read 函數的二三十行代碼 就可以完成 一個高效 邏輯清晰 能使用多cpu的 echo 服務器
server_t 中的工作 只是爲了 讓 每個 線程 都使用 獨立的 io_service 以發揮 asio的所有潛能 但其實 80% 以上的程序 所有 線程 使用 同一個 io_service 也可以勝任(孤自己試過 所有線程使用 一個 io_service 性能基本和 go的服務器差不多)
74_avatar_middle
ico_lz  楼主| 发表于 2017-5-31 10:39:07 | 显示全部楼层
另外一個示例代碼 所有線程使用了同一個 io_service
(所以 代碼條例看起來更加清晰)
  1. #include <iostream>
  2. #include <cstdint>

  3. #include <boost/asio.hpp>
  4. #include <boost/thread.hpp>
  5. #include <boost/bind.hpp>
  6. #include <boost/asio/spawn.hpp>
  7. #include <boost/smart_ptr.hpp>

  8. #define LISTEN_ADDRESS "0.0.0.0"
  9. #define LISTEN_PORT 8080
  10. #define BUFFER_SIZE 1024

  11. typedef boost::asio::io_service io_service_t;
  12. typedef boost::asio::ip::tcp::acceptor acceptor_t;
  13. typedef boost::asio::ip::tcp::socket socket_t;
  14. typedef boost::shared_ptr<socket_t> socket_spt;

  15. void coroutine_accept(acceptor_t& acceptor,boost::asio::yield_context ctx);
  16. void coroutine_read(socket_spt sp,boost::asio::yield_context ctx);

  17. int main()
  18. {
  19.     try
  20.     {
  21.         //asio 服務
  22.         io_service_t service;

  23.         //接收器
  24.         acceptor_t acceptor(service,boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(LISTEN_ADDRESS),LISTEN_PORT));

  25.         std::cout<<"work at "<<LISTEN_ADDRESS<<":"<<LISTEN_PORT<<std::endl;

  26.         std::size_t n = boost::thread::hardware_concurrency();

  27.         for(std::size_t i=0; i<n; ++i)
  28.         {
  29.             //啓動 接收連接 coroutine
  30.             boost::asio::spawn(service,boost::bind(coroutine_accept,boost::ref(acceptor),_1));
  31.         }

  32.         boost::thread_group threads;
  33.         for(std::size_t i=0; i<n; ++i)
  34.         {
  35.             threads.create_thread([&service]()
  36.             {
  37.                 //運行asio服務
  38.                 service.run();
  39.             });
  40.         }
  41.         threads.join_all();

  42.     }
  43.     catch(const boost::system::system_error& se)
  44.     {
  45.         std::cout<<se.what()<<"\n";
  46.     }
  47.     return 0;
  48. }
  49. void coroutine_accept(acceptor_t& acceptor,boost::asio::yield_context ctx)
  50. {
  51.     while(true)
  52.     {
  53.         boost::system::error_code ec;
  54.         io_service_t& service = acceptor.get_io_service();
  55.         try
  56.         {
  57.             socket_spt sp = boost::make_shared<socket_t>(service);
  58.             acceptor.async_accept(*sp,ctx[ec]);
  59.             acceptor.get_io_service();
  60.             if(ec)
  61.             {
  62.                 //錯誤
  63.                 std::cout<<ec<<"\n";
  64.             }
  65.             else
  66.             {
  67.                 //爲 socket 啓動 通信 coroutine
  68.                 boost::asio::spawn(service,boost::bind(coroutine_read,sp,_1));
  69.             }

  70.         }
  71.         catch(const std::bad_alloc& e)
  72.         {
  73.             std::cout<<e.what()<<"\n";
  74.         }
  75.     }
  76. }
  77. void coroutine_read(socket_spt sp,boost::asio::yield_context ctx)
  78. {
  79.     boost::system::error_code ec;
  80.     socket_t& s = *sp;
  81.     try
  82.     {
  83.         //讀取消息
  84.         std::uint8_t buffer[BUFFER_SIZE];
  85.         std::size_t pos = 0;
  86.         while(pos < BUFFER_SIZE)
  87.         {
  88.             //接收消息 yield
  89.             std::size_t n = s.async_read_some(boost::asio::buffer(buffer + pos,BUFFER_SIZE - pos),ctx);
  90.             pos += n;
  91.             if(buffer[pos-1] == '\n')
  92.             {
  93.                 //echo
  94.                 s.async_write_some(boost::asio::buffer(buffer,pos),ctx);
  95.                 break;
  96.             }
  97.         }

  98.         //退出 關閉 socket
  99.     }
  100.     catch(const boost::system::system_error& se)
  101.     {
  102.         std::cout<<se.what()<<"\n";
  103.     }

  104.     s.shutdown(socket_t::shutdown_both,ec);
  105.     s.close(ec);
  106. }
复制代码
您需要登录后才可以回帖 登录 | 加入驿站 qq_login

本版积分规则

关闭

站长提醒上一条 /2 下一条

QQ|小黑屋|手机版|VC驿站 ( 辽ICP备09019393号tongdun|网站地图wx_jqr

GMT+8, 2019-9-17 11:15

Powered by Discuz! X3.4

© 2009-2019 cctry.com

快速回复 返回顶部 返回列表