qgrpcchannel.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * MIT License
  3. *
  4. * Copyright (c) 2019 Giulio Girardi <giulio.girardi@protechgroup.it>
  5. *
  6. * This file is part of QtProtobuf project https://git.semlanik.org/semlanik/qtprotobuf
  7. *
  8. * Permission is hereby granted, free of charge, to any person obtaining a copy of this
  9. * software and associated documentation files (the "Software"), to deal in the Software
  10. * without restriction, including without limitation the rights to use, copy, modify,
  11. * merge, publish, distribute, sublicense, and/or sell copies of the Software, and
  12. * to permit persons to whom the Software is furnished to do so, subject to the following
  13. * conditions:
  14. *
  15. * The above copyright notice and this permission notice shall be included in all copies
  16. * or substantial portions of the Software.
  17. *
  18. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
  19. * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  20. * PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
  21. * FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  22. * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  23. * DEALINGS IN THE SOFTWARE.
  24. */
  25. #include "qgrpcchannel.h"
  26. #include "qgrpcchannel_p.h"
  27. #include <QEventLoop>
  28. #include <QThread>
  29. #include <memory>
  30. #include <thread>
  31. #include <unordered_map>
  32. #include <grpcpp/channel.h>
  33. #include <grpcpp/create_channel.h>
  34. #include <grpcpp/impl/codegen/byte_buffer.h>
  35. #include <grpcpp/impl/codegen/client_unary_call.h>
  36. #include <grpcpp/impl/codegen/rpc_method.h>
  37. #include <grpcpp/impl/codegen/slice.h>
  38. #include <grpcpp/impl/codegen/status.h>
  39. #include <grpcpp/impl/codegen/sync_stream_impl.h>
  40. #include <grpcpp/security/credentials.h>
  41. #include "qabstractgrpccredentials.h"
  42. #include "qgrpcasyncreply.h"
  43. #include "qgrpcstatus.h"
  44. #include "qgrpcsubscription.h"
  45. #include "qabstractgrpcclient.h"
  46. #include "qgrpccredentials.h"
  47. #include "qprotobufserializerregistry_p.h"
  48. #include "qtprotobuflogging.h"
  49. using namespace QtProtobuf;
  50. namespace QtProtobuf {
  51. static inline grpc::Status parseByteBuffer(const grpc::ByteBuffer &buffer, QByteArray &data)
  52. {
  53. std::vector<grpc::Slice> slices;
  54. auto status = buffer.Dump(&slices);
  55. if (!status.ok())
  56. return status;
  57. for (auto slice : slices) {
  58. data.append(QByteArray((const char *)slice.begin(), slice.size()));
  59. }
  60. return grpc::Status::OK;
  61. }
  62. static inline void parseQByteArray(const QByteArray &bytearray, grpc::ByteBuffer &buffer)
  63. {
  64. grpc::Slice slice(bytearray.data(), bytearray.size());
  65. grpc::ByteBuffer tmp(&slice, 1);
  66. buffer.Swap(&tmp);
  67. }
  68. QGrpcChannelSubscription::QGrpcChannelSubscription(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent) : QObject(parent)
  69. {
  70. grpc::ByteBuffer request;
  71. parseQByteArray(data, request);
  72. reader = grpc_impl::internal::ClientReaderFactory<grpc::ByteBuffer>::Create(channel,
  73. grpc::internal::RpcMethod(method.toStdString().c_str(), grpc::internal::RpcMethod::SERVER_STREAMING),
  74. &context, request);
  75. thread = QThread::create([this](){
  76. grpc::ByteBuffer response;
  77. QByteArray data;
  78. grpc::Status status;
  79. while (reader->Read(&response)) {
  80. status = parseByteBuffer(response, data);
  81. if (!status.ok()) {
  82. this->status = {
  83. (QGrpcStatus::StatusCode) status.error_code(),
  84. status.error_message().c_str()
  85. };
  86. return; // exit thread
  87. }
  88. emit this->dataReady(data);
  89. }
  90. status = reader->Finish();
  91. this->status = {
  92. (QGrpcStatus::StatusCode) status.error_code(),
  93. status.error_message().c_str()
  94. };
  95. return; // exit thread
  96. });
  97. connect(thread, &QThread::finished, this, &QGrpcChannelSubscription::finished);
  98. }
  99. void QGrpcChannelSubscription::start()
  100. {
  101. thread->start();
  102. }
  103. QGrpcChannelSubscription::~QGrpcChannelSubscription()
  104. {
  105. cancel();
  106. thread->wait();
  107. thread->deleteLater();
  108. if (reader != nullptr) {
  109. delete reader;
  110. }
  111. }
  112. void QGrpcChannelSubscription::cancel() {
  113. // TODO: check thread safety
  114. qProtoDebug() << "Subscription thread terminated";
  115. context.TryCancel();
  116. }
  117. QGrpcChannelCall::QGrpcChannelCall(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent) : QObject(parent) {
  118. grpc::ByteBuffer request;
  119. parseQByteArray(data, request);
  120. thread = QThread::create([this, request, channel, method](){
  121. grpc::ByteBuffer response;
  122. QByteArray data;
  123. grpc::Status status;
  124. status = grpc::internal::BlockingUnaryCall(channel,
  125. grpc::internal::RpcMethod(method.toStdString().c_str(), grpc::internal::RpcMethod::NORMAL_RPC),
  126. &context, request, &response
  127. );
  128. if (!status.ok()) {
  129. this->status = {
  130. static_cast<QGrpcStatus::StatusCode>(status.error_code()),
  131. status.error_message().c_str()
  132. };
  133. return; // exit thread
  134. }
  135. status = parseByteBuffer(response, this->response);
  136. this->status = {
  137. static_cast<QGrpcStatus::StatusCode>(status.error_code()),
  138. status.error_message().c_str()
  139. };
  140. });
  141. connect(thread, &QThread::finished, this, &QGrpcChannelCall::finished);
  142. }
  143. void QGrpcChannelCall::start()
  144. {
  145. thread->start();
  146. }
  147. QGrpcChannelCall::~QGrpcChannelCall()
  148. {
  149. cancel();
  150. thread->wait();
  151. thread->deleteLater();
  152. }
  153. void QGrpcChannelCall::cancel()
  154. {
  155. // TODO: check thread safety
  156. qProtoDebug() << "Call thread terminated";
  157. context.TryCancel();
  158. }
  159. QGrpcChannelPrivate::QGrpcChannelPrivate(const QUrl &url, std::shared_ptr<grpc::ChannelCredentials> credentials)
  160. {
  161. m_channel = grpc::CreateChannel(url.toString().toStdString(), credentials);
  162. }
  163. QGrpcChannelPrivate::~QGrpcChannelPrivate()
  164. {
  165. }
  166. void QGrpcChannelPrivate::call(const QString &method, const QString &service, const QByteArray &args, QGrpcAsyncReply *reply)
  167. {
  168. QString rpcName = QString("/%1/%2").arg(service).arg(method);
  169. std::shared_ptr<QGrpcChannelCall> call;
  170. std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
  171. std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
  172. call.reset(
  173. new QGrpcChannelCall(m_channel.get(), rpcName, args, reply),
  174. [](QGrpcChannelCall * c) { c->deleteLater(); }
  175. );
  176. *connection = QObject::connect(call.get(), &QGrpcChannelCall::finished, reply, [call, reply, connection, abortConnection](){
  177. if (call->status.code() == QGrpcStatus::Ok) {
  178. reply->setData(call->response);
  179. reply->finished();
  180. } else {
  181. reply->setData({});
  182. reply->error(call->status);
  183. }
  184. QObject::disconnect(*connection);
  185. QObject::disconnect(*abortConnection);
  186. });
  187. *abortConnection = QObject::connect(reply, &QGrpcAsyncReply::error, call.get(), [call, connection, abortConnection](const QGrpcStatus &status){
  188. if (status.code() == QGrpcStatus::Aborted) {
  189. QObject::disconnect(*connection);
  190. QObject::disconnect(*abortConnection);
  191. }
  192. });
  193. call->start();
  194. }
  195. QGrpcStatus QGrpcChannelPrivate::call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret)
  196. {
  197. QEventLoop loop;
  198. QString rpcName = QString("/%1/%2").arg(service).arg(method);
  199. QGrpcChannelCall call(m_channel.get(), rpcName, args);
  200. QObject::connect(&call, &QGrpcChannelCall::finished, &loop, &QEventLoop::quit);
  201. call.start();
  202. loop.exec();
  203. ret = call.response;
  204. return call.status;
  205. }
  206. void QGrpcChannelPrivate::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
  207. {
  208. assert(subscription != nullptr);
  209. QString rpcName = QString("/%1/%2").arg(service).arg(subscription->method());
  210. std::shared_ptr<QGrpcChannelSubscription> sub;
  211. std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
  212. std::shared_ptr<QMetaObject::Connection> readConnection(new QMetaObject::Connection);
  213. std::shared_ptr<QMetaObject::Connection> clientConnection(new QMetaObject::Connection);
  214. std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
  215. sub.reset(
  216. new QGrpcChannelSubscription(m_channel.get(), rpcName, subscription->arg(), subscription),
  217. [](QGrpcChannelSubscription * sub) { sub->deleteLater(); }
  218. );
  219. *readConnection = QObject::connect(sub.get(), &QGrpcChannelSubscription::dataReady, subscription, [subscription](const QByteArray &data) {
  220. subscription->handler(data);
  221. });
  222. *connection = QObject::connect(sub.get(), &QGrpcChannelSubscription::finished, subscription, [sub, subscription, readConnection, abortConnection, service, connection, clientConnection](){
  223. qProtoDebug() << "Subscription ended with server closing connection";
  224. QObject::disconnect(*connection);
  225. QObject::disconnect(*readConnection);
  226. QObject::disconnect(*abortConnection);
  227. QObject::disconnect(*clientConnection);
  228. if (sub->status.code() != QGrpcStatus::Ok)
  229. {
  230. subscription->error(sub->status);
  231. }
  232. });
  233. *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, sub.get(), [connection, abortConnection, readConnection, sub, clientConnection] {
  234. qProtoDebug() << "Subscription client was finished";
  235. QObject::disconnect(*connection);
  236. QObject::disconnect(*readConnection);
  237. QObject::disconnect(*abortConnection);
  238. QObject::disconnect(*clientConnection);
  239. sub->cancel();
  240. });
  241. *clientConnection = QObject::connect(client, &QAbstractGrpcClient::destroyed, sub.get(), [readConnection, connection, abortConnection, sub, clientConnection](){
  242. qProtoDebug() << "Grpc client was destroyed";
  243. QObject::disconnect(*connection);
  244. QObject::disconnect(*readConnection);
  245. QObject::disconnect(*abortConnection);
  246. QObject::disconnect(*clientConnection);
  247. sub->cancel();
  248. });
  249. sub->start();
  250. }
  251. QGrpcChannel::QGrpcChannel(const QUrl &url, std::shared_ptr<grpc::ChannelCredentials> credentials) : QAbstractGrpcChannel()
  252. , dPtr(std::make_unique<QGrpcChannelPrivate>(url, credentials))
  253. {
  254. }
  255. QGrpcChannel::~QGrpcChannel()
  256. {
  257. }
  258. QGrpcStatus QGrpcChannel::call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret)
  259. {
  260. return dPtr->call(method, service, args, ret);
  261. }
  262. void QGrpcChannel::call(const QString &method, const QString &service, const QByteArray &args, QGrpcAsyncReply *reply)
  263. {
  264. dPtr->call(method, service, args, reply);
  265. }
  266. void QGrpcChannel::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
  267. {
  268. dPtr->subscribe(subscription, service, client);
  269. }
  270. std::shared_ptr<QAbstractProtobufSerializer> QGrpcChannel::serializer() const
  271. {
  272. //TODO: make selection based on credentials or channel settings
  273. return QProtobufSerializerRegistry::instance().getSerializer("protobuf");
  274. }
  275. }