幸运快三

grpc应用纪录(三)质朴异步办现实例

grpc应用纪录(三)质朴异步办现实例

编写异步服务和编写同步服务的基本流程都差不多,稍有点差异。

同步服务你只须要完成相关服务接口的完成便可,不须要治理太多器械。异步服务GRPC运转时会把读取到的客户端请求放入CompletionQueue中,须要手动从中取出,然落先行相关的处置赏罚赏罚,可以多线程也能够或许单线程。

1、编写proto文件,界说服务

幸运快三这里和中的一样,这里就不多说了。

2、编译proto文件,天生代码

这里也是和中的一样的。

3、编写服务端代码

幸运快三这里可以复用前面同步服务的代码,只须要做质朴的修改便可。

质朴说一下培植一个GRPC异步服务的要点:

  • 1、培植服务工具的时间要培植AsyncService,而不是Service
  • 2、至少须要添加一个grpc::ServerCompletionQueue用于异步义务操作。
  • 3、必须要经由历程AsyncService::RequestXXXX来注册XXXX接口的处置赏罚赏罚。
  • 4、一个客户端请求的处置赏罚赏罚可质朴的分为两个法式模范:1、构建前往给客户真个照顾数据;2、发送照顾数据给客户端。
  • 5、完成行列和注册请求处置赏罚赏罚都可以有多个,不用定非得是一个。

分享图片

async_service.cpp

下面代码质朴的培植了3个HandlerContext的结构体类型,用于生涯三个接口请求处置赏罚赏罚历程当中的数据,现实的请求处置赏罚赏罚照样和之前同步服务的一样,这里只是写成了Test1Test2Test3三个函数的形式。

// > g++ -o aservice async_service.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated

#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>

#include <memory>
#include <iostream>
#include <strstream>

struct HandlerContext {
  // 以后处置赏罚赏罚状态(处置赏罚赏罚分为两步:1处置赏罚赏罚请求构建照顾数据;2发送照顾)
  // 这里纪录一下完成到哪一步了,以便阻拦相关操作
  int                 status_; // (1构建照顾完成;2发送完成)
  // rpc的曲折文,允许经由历程它阻拦诸如延伸、身份验证,和把元数据发回客户端等。
  grpc::ServerContext ctx_;
};

struct HandlerTest1Context:public HandlerContext {
  // 用于吸收客户端发送的请求
  Simple::TestRequest req_;
  // 用于发送照顾给客户端
  Simple::TestNull    rep_;

  // 发送到客户真个措施工具
  grpc::ServerAsyncResponseWriter<Simple::TestNull> responder_;
  // 结构函数
  HandlerTest1Context()
    :responder_(&ctx_)
  {}
};

struct HandlerTest2Context:public HandlerContext  {
  // 用于吸收客户端发送的请求
  Simple::TestNull req_;
  // 用于发送照顾给客户端
  Simple::TestReply   rep_;

  // 发送到客户真个措施工具
  grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
  // 结构函数
  HandlerTest2Context()
    :responder_(&ctx_)
  {}
};

struct HandlerTest3Context:public HandlerContext {
  // 用于吸收客户端发送的请求
  Simple::TestRequest req_;
  // 用于发送照顾给客户端
  Simple::TestReply   rep_;

  // 发送到客户真个措施工具
  grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
  // 结构函数
  HandlerTest3Context()
    :responder_(&ctx_)
  {}
};


// Test1 完成都是差不都的,这里只是为了测试,就随便前往点数据了
grpc::Status Test1(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestNull*          response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();
  // grpc状态可以设置message,以是也能够或许用来前往一些信息
  return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext*       context,
                   const Simple::TestNull*    request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  response->set_tid(100);
  response->set_svrname("Simple Server");
  response->set_takeuptime(0.01);
  return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();

  response->set_tid(__LINE__);
  response->set_svrname(__FILE__);
  response->set_takeuptime(1.234);
  // grpc状态可以设置message
  return grpc::Status(grpc::StatusCode::OK,std::move(message));
}

int main()
{
  // 服务构建器,用于构建同步或许异步服务
  grpc::ServerBuilder builder;
  // 添加监听的地址和端口,后一个参数用于设置认证要领,这里选择不认证
  builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
  // 培植一个异步服务工具
  Simple::Server::AsyncService service;
  // 注册服务
  builder.RegisterService(&service);

  // 添加一个完成行列,用于与 gRPC 运转时异步通讯
  std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();

  // 构建服务器
  std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  std::cout<<"Server Runing"<<std::endl;
  // 这里用一个map来纪录一下下面要阻拦处置赏罚赏罚的请求
  // 由于这里也是单线程的,以是不加锁了
  std::map<HandlerContext*,int> handlerMap; // value用于纪录是Test1照样2、3
  {
    // 先培植三个类型接口的请求处置赏罚赏罚曲折文工具
    HandlerTest1Context* htc1 = new HandlerTest1Context;
    htc1->status_ = 1; // 设置状态为1(由于只须要分辨能否曾经发送照顾完成)
    HandlerTest2Context* htc2 = new HandlerTest2Context;
    htc2->status_ = 1;
    HandlerTest3Context* htc3 = new HandlerTest3Context;
    htc3->status_ = 1;

    // 将三个曲折文工具存入map中
    handlerMap[htc1] = 1; // 值用于分辨是哪个类型
    handlerMap[htc2] = 2;
    handlerMap[htc3] = 3;

    // 进入下面去世循环前须要先注册一下请求
    service.RequestTest1(
        &htc1->ctx_         /*服务曲折文工具*/,
        &htc1->req_         /*用于吸收请求的工具*/,
        &htc1->responder_   /*异步写照顾工具*/,
        cq_ptr.get()        /*新的挪用应用的完成行列*/,
        cq_ptr.get()        /*告诉应用的完成行列*/,
        htc1                /*唯一标识tag*/);
    service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
    service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
  }
  // 异步服务这里不克不及应用 server.Wait() 来期待处置赏罚赏罚,由因此异步服务
  // 服务器会把到达的请求放入行列,须要自己从完成行列取出请求阻拦处置赏罚赏罚
  // 以是这里须要一个去世循环来取得请求并阻拦处置赏罚赏罚
  while(true){
    // 前面曾经注册了请求处置赏罚赏罚,这里壅闭从完成行列中取出一个请求阻拦处置赏罚赏罚
    HandlerContext* htc = NULL;
    bool ok = false; 
    GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
    GPR_ASSERT(ok);
    // 凭证tag断定是哪个请求
    // 由于前面注册请求处置赏罚赏罚的时间应用的就是工具地址
    // 以是这里直接从map外面取出来断定便可
    int type = handlerMap[htc];
    // 断定状态,看能否是曾经照顾发送了
    if(htc->status_ == 2) {
      // 从map中移除
      handlerMap.erase(htc);
      // 由于这里着实不是多态类,必须凭证类型操作
      switch(type) {
        case 1:
          {
            // 释下班具(这里未对这个工具阻拦复用)
            delete (HandlerTest1Context*)htc;
          }
          break;
        case 2:
          {
            delete (HandlerTest2Context*)htc;
          }
          break;
        case 3:
          {
            delete (HandlerTest3Context*)htc;
          }
          break;
      }
      continue; // 回到从完成行列取得下一个
    }

    // 凭证type阻拦照顾的处置赏罚赏罚
    switch(type) {
      case 1: /*Test1的处置赏罚赏罚*/
        {
          // 重新培植一个请求处置赏罚赏罚曲折文工具(以便不影响下一个请求的处置赏罚赏罚)
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1;    // 设置状态为1
          handlerMap[htc1] = 1; // 生涯到handlerMap中
          service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc1);
            
          HandlerTest1Context* h = (HandlerTest1Context*)htc;
          grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
      case 2: /*Test2的处置赏罚赏罚*/
        {
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;    // 设置状态为1
          handlerMap[htc2] = 2; // 生涯到handlerMap中
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc2);
            
          HandlerTest2Context* h = (HandlerTest2Context*)htc;
          grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
      case 3: /*Test3的处置赏罚赏罚*/
        {
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;    // 设置状态为1
          handlerMap[htc3] = 3; // 生涯到handlerMap中
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc3);
            
          HandlerTest3Context* h = (HandlerTest3Context*)htc;
          grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
    }
  }
  return 0;
}

async_service2.cpp

下面虽然是应用到了grpc的异步服务机制,然则只是为了形貌清晰异步服务的培植历程,是一个单线程的粗陋完成。下面写一个应用的完成。

// > g++ -o aservice2 async_service2.cpp  simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated

// 线程池的代码可见 https://www.cnblogs.com/oloroso/p/5881863.html
#include "threadpool.h"
#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>

#include <memory>
#include <iostream>
#include <strstream>
#include <chrono>

struct HandlerContextBase {
  // 以后工具类型,用于一定是Test1/2/3哪个请求的
  int                 type_;
  // 以后处置赏罚赏罚状态(处置赏罚赏罚分为两步:1处置赏罚赏罚请求构建照顾数据;2发送照顾)
  // 这里纪录一下完成到哪一步了,以便阻拦相关操作
  int                 status_; // (1构建照顾完成;2发送完成)
  // rpc的曲折文,允许经由历程它阻拦诸如延伸、身份验证,和把元数据发回客户端等。
  grpc::ServerContext ctx_;
};

template<typename RequestType,typename ReplyType>
struct HandlerContext:public HandlerContextBase {
  // 用于吸收客户端发送的请求
  RequestType         req_;
  // 用于发送照顾给客户端
  ReplyType           rep_;
  // 发送到客户真个措施工具
  grpc::ServerAsyncResponseWriter<ReplyType> responder_;
  //================================================
  // 结构函数
  HandlerContext()
    :responder_(&ctx_)
  {}

};
typedef HandlerContext<Simple::TestRequest,Simple::TestNull>  HandlerTest1Context;
typedef HandlerContext<Simple::TestNull,Simple::TestReply>    HandlerTest2Context;
typedef HandlerContext<Simple::TestRequest,Simple::TestReply> HandlerTest3Context;

unsigned long get_tid()
{
  std::thread::id tid = std::this_thread::get_id();
  std::ostrstream os;
  os << tid;
  unsigned long tidx = std::stol(os.str());
  return tidx;
}

// Test1 完成都是差不都的,这里只是为了测试,就随便前往点数据了
grpc::Status Test1(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestNull*          response)
{
  printf("%s %d\n",__func__,__LINE__);
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  std::string message = os.str();
  // grpc状态可以设置message,以是也能够或许用来前往一些信息
  return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext*       context,
                   const Simple::TestNull*    request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  response->set_tid(100);
  response->set_svrname("Simple Server");
  response->set_takeuptime(0.01);
  return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext*       context,
                   const Simple::TestRequest* request,
                   Simple::TestReply*         response)
{
  printf("%s %d\n",__func__,__LINE__);
  int tid = get_tid();
  std::ostrstream os;
  os << "Client Name = " << request->name() << '\n';
  os << "Clinet ID   = " << request->id()   << '\n';
  os << "Clinet Value= " << request->value()<< '\n';
  os << "Server TID  = " << tid<<'\n';
  std::string message = os.str();
  
  // 休眠0.5秒,以便不雅不雅察异步推行的效果
  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  response->set_tid(tid);
  response->set_svrname(__FILE__);
  response->set_takeuptime(1.234);
  // grpc状态可以设置message
  return grpc::Status(grpc::StatusCode::OK,std::move(message));
}

int main()
{
  // 服务构建器,用于构建同步或许异步服务
  grpc::ServerBuilder builder;
  // 添加监听的地址和端口,后一个参数用于设置认证要领,这里选择不认证
  builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
  // 培植一个异步服务工具
  Simple::Server::AsyncService service;
  // 注册服务
  builder.RegisterService(&service);

  // 添加一个完成行列,用于与 gRPC 运转时异步通讯
  std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();

  // 构建服务器
  std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  std::cout<<"Server Runing"<<std::endl;
  // 下面可以有几个使命线程就先注册几个,也能够或许仅注册一个(至少一个)
  /*for(int i=0;i<4;++i)*/ {
    // 先培植三个类型接口的请求处置赏罚赏罚曲折文工具
    HandlerTest1Context* htc1 = new HandlerTest1Context;
    htc1->status_ = 1; // 设置状态为1(由于只须要分辨能否曾经发送照顾完成)
    htc1->type_   = 1; // 设置类型为1
    HandlerTest2Context* htc2 = new HandlerTest2Context;
    htc2->status_ = 1;
    htc2->type_   = 2;
    HandlerTest3Context* htc3 = new HandlerTest3Context;
    htc3->status_ = 1;
    htc3->type_   = 3;

    // 进入下面去世循环前须要先注册一下请求
    service.RequestTest1(
        &htc1->ctx_         /*服务曲折文工具*/,
        &htc1->req_         /*用于吸收请求的工具*/,
        &htc1->responder_   /*异步写照顾工具*/,
        cq_ptr.get()        /*新的挪用应用的完成行列*/,
        cq_ptr.get()        /*告诉应用的完成行列*/,
        htc1                /*唯一标识tag*/);
    service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
    service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
  }

  // 培植线程池,应用4个使命线程,用于构建请求的照顾
  ThreadPool pool(4);

  // 异步服务这里不克不及应用 server->Wait() 来期待处置赏罚赏罚,由因此异步服务
  // 服务器会把到达的请求放入行列,须要自己从完成行列取出请求阻拦处置赏罚赏罚
  // 以是这里须要一个去世循环来取得请求并阻拦处置赏罚赏罚
  while(true){
    // 前面曾经注册了请求处置赏罚赏罚,这里壅闭从完成行列中取出一个请求阻拦处置赏罚赏罚
    HandlerContextBase* htc = NULL;
    bool ok = false; 
    GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
    GPR_ASSERT(ok);
    // 凭证tag断定是哪个请求
    // 由于前面注册请求处置赏罚赏罚的时间应用的就是工具地址
    // 以是这里直接从map外面取出来断定便可
    int type = htc->type_;
    // 断定状态,看能否是曾经照顾发送了
    if(htc->status_ == 2) {
      // 由于这里着实不是多态类,必须凭证类型操作
      switch(type) {
        case 1:
          {
            // 释下班具(这里未对这个工具阻拦复用)
            delete (HandlerTest1Context*)htc;
          }
          break;
        case 2:
          {
            delete (HandlerTest2Context*)htc;
          }
          break;
        case 3:
          {
            delete (HandlerTest3Context*)htc;
          }
          break;
      }
      continue; // 回到从完成行列取得下一个
    }
    
    // 重新培植一个请求处置赏罚赏罚曲折文工具(以便能够吸收下一个请求阻拦处置赏罚赏罚)
    switch(type) {
      case 1:
        {
          HandlerTest1Context* htc1 = new HandlerTest1Context;
          htc1->status_ = 1;    // 设置状态为1
          htc1->type_   = 1;    // 设置类型为1
          service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc1);
        }
        break;
      case 2:
        {
          HandlerTest2Context* htc2 = new HandlerTest2Context;
          htc2->status_ = 1;    // 设置状态为1
          htc2->type_   = 1;    // 设置类型为2
          service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc2);
        }
        break;
      case 3:
        {
          HandlerTest3Context* htc3 = new HandlerTest3Context;
          htc3->status_ = 1;    // 设置状态为1
          htc3->type_   = 3;    // 设置类型为3
          service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
                               cq_ptr.get(),cq_ptr.get(),htc3);
        }
        break;
    }

    pool.enqueue([type,htc](){
    // 凭证type阻拦照顾的处置赏罚赏罚
    switch(type) {
      case 1: /*Test1的处置赏罚赏罚*/
        {
          HandlerTest1Context* h = (HandlerTest1Context*)htc;
          grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
      case 2: /*Test2的处置赏罚赏罚*/
        {
          HandlerTest2Context* h = (HandlerTest2Context*)htc;
          grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
      case 3: /*Test3的处置赏罚赏罚*/
        {
          HandlerTest3Context* h = (HandlerTest3Context*)htc;
          grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
          // 设置状态为发送照顾
          h->status_ = 2;
          // 挪用responder_阻拦照顾发送(异步)
          h->responder_.Finish(h->rep_/*发送的照顾*/,status/*状态码*/,htc/*请求处置赏罚赏罚的唯一tag*/);
        }
        break;
    }
  });
  }
  return 0;
}
相关文章
相关标签/搜索