浏览代码

Add subscription cancelation

- Introduce QGrpcSubscription to track and handle gRPC client
  subscriptions
- Add subscription cancelation functionality
- Inprove Async replies handling
Alexey Edelev 5 年之前
父节点
当前提交
6007ecc669

+ 5 - 1
examples/simplechat/simplechatengine.cpp

@@ -73,7 +73,11 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>(name, QCryptographicHash::hash(password.toUtf8(), QCryptographicHash::Md5).toHex())));
 
     m_client->attachChannel(channel);
-    m_client->subscribeMessageListUpdates(None());
+    QtProtobuf::QGrpcSubscription *subscription = m_client->subscribeMessageListUpdates(None());
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::error, this, [subscription] {
+        qCritical() << "Subscription error, cancel";
+        subscription->cancel();
+    });
     QObject::connect(m_client, &SimpleChatClient::messageListUpdated, this, [this, name](const qtprotobuf::examples::ChatMessages &messages) {
         if (m_userName != name) {
             m_userName = name;

+ 1 - 0
src/generator/clientgenerator.cpp

@@ -70,6 +70,7 @@ void ClientGenerator::printClientIncludes()
     std::unordered_set<std::string> includeSet;
     includeSet.insert("QAbstractGrpcClient");
     includeSet.insert("QGrpcAsyncReply");
+    includeSet.insert("QGrpcSubscription");
     for (auto type : includeSet) {
         mPrinter->Print({{"include", type}}, Templates::ExternalIncludeTemplate);
     }

+ 1 - 0
src/generator/singlefilegenerator.cpp

@@ -228,6 +228,7 @@ bool SingleFileGenerator::GenerateServices(const ::google::protobuf::FileDescrip
 
     externalIncludes.insert("QAbstractGrpcClient");
     externalIncludes.insert("QGrpcAsyncReply");
+    externalIncludes.insert("QGrpcSubscription");
 
     if (file->message_type_count() > 0) {
         internalIncludes.insert(outFileBasename + Templates::ProtoFileSuffix);

+ 7 - 8
src/generator/templates.cpp

@@ -136,7 +136,7 @@ const char *Templates::GetterPrivateMessageDefinitionTemplate = "$type$ *$classn
                                         "    return m_$property_name$.get();\n"
                                         "}\n\n";
 
-const char *Templates::GetterMessageDeclarationTemplate = "const $type$ &$property_name$() const;";
+const char *Templates::GetterMessageDeclarationTemplate = "const $type$ &$property_name$() const;\n";
 const char *Templates::GetterMessageDefinitionTemplate = "const $type$ &$classname$::$property_name$() const {\n"
                                         "    return *m_$property_name$;\n"
                                         "}\n\n";
@@ -265,16 +265,15 @@ const char *Templates::QmlRegisterTypeUncreatableTemplate = "qmlRegisterUncreata
 
 
 const char *Templates::ClientMethodSignalDeclarationTemplate = "Q_SIGNAL void $method_name$Updated(const $return_type$ &);\n";
-const char *Templates::ClientMethodServerStreamDeclarationTemplate = "void subscribe$method_name_upper$Updates(const $param_type$ &$param_name$);\n";
-const char *Templates::ClientMethodServerStream2DeclarationTemplate = "void subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
-const char *Templates::ClientMethodServerStreamDefinitionTemplate = "void $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
+const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf::QGrpcSubscription *subscribe$method_name_upper$Updates(const $param_type$ &$param_name$);\n";
+const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcSubscription *subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
+const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
                                                                     "{\n"
-                                                                    "    subscribe(\"$method_name$\", $param_name$, &$classname$::$method_name$Updated);\n"
+                                                                    "    return subscribe(\"$method_name$\", $param_name$, &$classname$::$method_name$Updated);\n"
                                                                     "}\n";
-const char *Templates::ClientMethodServerStream2DefinitionTemplate = "void $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
+const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
                                                                      "{\n"
-                                                                     "    subscribe(\"$method_name$\", $param_name$, $return_name$);\n"
-                                                                     "    subscribe(\"$method_name$\", $param_name$, &$classname$::$method_name$Updated);\n"
+                                                                     "    return subscribe(\"$method_name$\", $param_name$, $return_name$, &$classname$::$method_name$Updated);\n"
                                                                      "}\n";
 
 const char *Templates::ListSuffix = "Repeated";

+ 2 - 0
src/grpc/CMakeLists.txt

@@ -15,6 +15,7 @@ include(${QTPROTOBUF_CMAKE_DIR}/Coverage.cmake)
 include(${QTPROTOBUF_CMAKE_DIR}/GenerateQtHeaders.cmake)
 
 file(GLOB SOURCES qgrpcasyncreply.cpp
+    qgrpcsubscription.cpp
     qgrpcstatus.cpp
     qabstractgrpcchannel.cpp
     qgrpchttp2channel.cpp
@@ -25,6 +26,7 @@ file(GLOB SOURCES qgrpcasyncreply.cpp
     qgrpcuserpasswordcredentials.cpp)
 
 file(GLOB HEADERS qgrpcasyncreply.h
+    qgrpcsubscription.h
     qgrpcstatus.h
     qabstractgrpcchannel.h
     qgrpchttp2channel.h

+ 18 - 0
src/grpc/qabstractgrpcchannel.cpp

@@ -24,3 +24,21 @@
  */
 
 #include "qabstractgrpcchannel.h"
+
+#include "qgrpcasyncreply.h"
+#include "qgrpcsubscription.h"
+
+using namespace QtProtobuf;
+
+void QAbstractGrpcChannel::abort(QGrpcAsyncReply *reply)
+{
+    assert(reply != nullptr);
+    reply->setData({});
+    reply->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
+}
+
+void QAbstractGrpcChannel::cancel(QGrpcSubscription *subscription)
+{
+    assert(subscription != nullptr);
+    subscription->finished();
+}

+ 11 - 7
src/grpc/qabstractgrpcchannel.h

@@ -37,6 +37,7 @@
 namespace QtProtobuf {
 
 class QGrpcAsyncReply;
+class QGrpcSubscription;
 class QAbstractGrpcClient;
 class QAbstractProtobufSerializer;
 /*!
@@ -78,7 +79,7 @@ public:
      * \param[in] args serialized argument message
      * \param[in] handler callback that will be called when message recevied from the server-stream
      */
-    virtual void subscribe(const QString &method, const QString &service, const QByteArray &args, QAbstractGrpcClient *client, const std::function<void(const QByteArray &)> &handler) = 0;
+    virtual void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) = 0;
 
     virtual std::shared_ptr<QAbstractProtobufSerializer> serializer() const = 0;
 protected:
@@ -86,16 +87,19 @@ protected:
     virtual ~QAbstractGrpcChannel() = default;
 
     /*!
-     * \brief aborts async call for given \p reply
-     *        \note by default abort is explicitly not supported by QAbstractGrpcChannel and throws assert when called
+     * \brief Aborts async call for given \p reply
      * \param[in] reply returned by asynchronous QAbstractGrpcChannel::call() method
      */
-    virtual void abort(QGrpcAsyncReply *reply) {
-        Q_UNUSED(reply)
-        assert("Abort is not supported by used channel");
-    }
+    virtual void abort(QGrpcAsyncReply *reply);
+
+    /*!
+     * \brief Cancels \p subscription
+     * \param[in] subscription returned by QAbstractGrpcChannel::subscribe() method
+     */
+    virtual void cancel(QGrpcSubscription *subscription);
 
     friend class QGrpcAsyncReply;
+    friend class QGrpcSubscription;
 private:
     Q_DISABLE_COPY(QAbstractGrpcChannel)
 };

+ 19 - 3
src/grpc/qabstractgrpcclient.cpp

@@ -26,6 +26,7 @@
 #include "qabstractgrpcclient.h"
 
 #include "qgrpcasyncreply.h"
+#include "qgrpcsubscription.h"
 #include "qprotobufserializerregistry_p.h"
 
 #include <QTimer>
@@ -86,7 +87,7 @@ QGrpcAsyncReply *QAbstractGrpcClient::call(const QString &method, const QByteArr
             reply->deleteLater();
         });
 
-        connect(reply, &QGrpcAsyncReply::finished, this, [reply]() {
+        connect(reply, &QGrpcAsyncReply::finished, this, [reply] {
             reply->deleteLater();
         });
 
@@ -98,13 +99,28 @@ QGrpcAsyncReply *QAbstractGrpcClient::call(const QString &method, const QByteArr
     return reply;
 }
 
-void QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray&)> &handler)
+QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray&)> &handler)
 {
+    QGrpcSubscription *subscription = nullptr;
     if (dPtr->channel) {
-        dPtr->channel->subscribe(method, dPtr->service, arg, this, handler);
+        subscription = new QGrpcSubscription(dPtr->channel, method, arg, handler);
+
+        connect(subscription, &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
+            qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription error: " << status.message();
+            error(status);
+            dPtr->channel->subscribe(subscription, dPtr->service, this);
+        });
+
+        connect(subscription, &QGrpcSubscription::finished, this, [this, subscription] {
+            qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription finished";
+            subscription->deleteLater();
+        });
+
+        dPtr->channel->subscribe(subscription, dPtr->service, this);
     } else {
         error({QGrpcStatus::Unknown, QLatin1String("No channel(s) attached.")});
     }
+    return subscription;
 }
 
 QAbstractProtobufSerializer *QAbstractGrpcClient::serializer() const

+ 17 - 7
src/grpc/qabstractgrpcclient.h

@@ -30,6 +30,7 @@
 #include <type_traits>
 
 #include <QObject>
+#include <QPointer>
 #include <QByteArray>
 
 #include <qtprotobuflogging.h>
@@ -120,8 +121,8 @@ protected:
      */
     template<typename A, typename R, typename C,
              typename std::enable_if_t<std::is_base_of<QAbstractGrpcClient, C>::value, int> = 0>
-    void subscribe(const QString &method, const A &arg, void(C::*signal)(const R &)) {
-        subscribe(method, arg.serialize(serializer()), [this, signal](const QByteArray &data) {
+    QGrpcSubscription *subscribe(const QString &method, const A &arg, void(C::*signal)(const R &)) {
+        return subscribe(method, arg.serialize(serializer()), [this, signal](const QByteArray &data) {
             R ret;
             tryDeserialize(ret, data);
             C *client = static_cast<C *>(this);
@@ -139,18 +140,21 @@ protected:
      * \note If \p ret is used as property-fiels in other object, property NOTIFY signal won't be called in case of
      *       updated message recevied from server-stream
      */
-    template<typename A, typename R>
-    void subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
+    template<typename A, typename R, typename C,
+             typename std::enable_if_t<std::is_base_of<QAbstractGrpcClient, C>::value, int> = 0>
+    QGrpcSubscription *subscribe(const QString &method, const A &arg, const QPointer<R> &ret, void(C::*signal)(const R &)) {
         if (ret.isNull()) {
             static const QString nullPointerError("Unable to subscribe method: %1. Pointer to return data is null");
             error({QGrpcStatus::InvalidArgument, nullPointerError.arg(method)});
             qProtoCritical() << nullPointerError.arg(method);
-            return;
+            return nullptr;
         }
 
-        subscribe(method, arg.serialize(serializer()), [ret, this](const QByteArray &data) {
+        return subscribe(method, arg.serialize(serializer()), [ret, signal, this](const QByteArray &data) {
             if (!ret.isNull()) {
                 tryDeserialize(*ret, data);
+                C *client = static_cast<C *>(this);
+                (client->*signal)(*ret);
             } else {
                 static const QLatin1String nullPointerError("Pointer to return data is null while subscription update received");
                 error({QGrpcStatus::InvalidArgument, nullPointerError});
@@ -159,6 +163,12 @@ protected:
         });
     }
 
+    /*!
+     * \brief Canceles all subscription for specified \p method
+     * \param[in] method Name of method subscription for to be canceled
+     */
+    void cancel(const QString &method);
+
     QAbstractProtobufSerializer *serializer() const;
 
     friend class QGrpcAsyncReply;
@@ -176,7 +186,7 @@ private:
     /*!
      * \private
      */
-    void subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray &)> &handler);
+    QGrpcSubscription *subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray &)> &handler);
 
     /*!
      * \private

+ 0 - 1
src/grpc/qgrpcasyncreply.h

@@ -26,7 +26,6 @@
 #pragma once //QGrpcAsyncReply
 
 #include <functional>
-#include <QPointer>
 #include <QMutex>
 #include <memory>
 

+ 68 - 26
src/grpc/qgrpchttp2channel.cpp

@@ -32,10 +32,12 @@
 #include <QEventLoop>
 #include <QTimer>
 #include <QtEndian>
+#include <QMetaObject>
 
 #include <unordered_map>
 
 #include "qgrpcasyncreply.h"
+#include "qgrpcsubscription.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpccredentials.h"
 #include "qprotobufserializerregistry_p.h"
@@ -223,13 +225,22 @@ QGrpcStatus QGrpcHttp2Channel::call(const QString &method, const QString &servic
     return {grpcStatus, QString::fromUtf8(networkReply->rawHeader(GrpcStatusMessage))};
 }
 
-void QGrpcHttp2Channel::call(const QString &method, const QString &service, const QByteArray &args, QtProtobuf::QGrpcAsyncReply *reply)
+void QGrpcHttp2Channel::call(const QString &method, const QString &service, const QByteArray &args, QGrpcAsyncReply *reply)
 {
+    assert(reply != nullptr);
     QNetworkReply *networkReply = dPtr->post(method, service, args);
 
-    auto connection = QObject::connect(networkReply, &QNetworkReply::finished, reply, [reply, networkReply]() {
+    std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
+    std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
+    *connection = QObject::connect(networkReply, &QNetworkReply::finished, reply, [reply, networkReply, connection, abortConnection]() {
         QGrpcStatus::StatusCode grpcStatus = QGrpcStatus::StatusCode::Unknown;
         QByteArray data = QGrpcHttp2ChannelPrivate::processReply(networkReply, grpcStatus);
+        if (*connection) {
+            QObject::disconnect(*connection);
+        }
+        if (*abortConnection) {
+            QObject::disconnect(*abortConnection);
+        }
 
         qProtoDebug() << "RECV: " << data;
         if (QGrpcStatus::StatusCode::Ok == grpcStatus) {
@@ -241,17 +252,28 @@ void QGrpcHttp2Channel::call(const QString &method, const QString &service, cons
         }
     });
 
-    QObject::connect(reply, &QGrpcAsyncReply::error, networkReply, [networkReply, connection]() {
-        QObject::disconnect(connection);
-        QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+    *abortConnection = QObject::connect(reply, &QGrpcAsyncReply::error, networkReply, [networkReply, connection, abortConnection] (const QGrpcStatus &status) {
+        if (status.code() == QGrpcStatus::Aborted) {
+            if (*connection) {
+                QObject::disconnect(*connection);
+            }
+            if (*abortConnection) {
+                QObject::disconnect(*abortConnection);
+            }
+            QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+        }
     });
 }
 
-void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service, const QByteArray &args, QAbstractGrpcClient *client, const std::function<void (const QByteArray &)> &handler)
+void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
 {
-    QNetworkReply *networkReply = dPtr->post(method, service, args, true);
+    assert(subscription != nullptr);
+    QNetworkReply *networkReply = dPtr->post(subscription->method(), service, subscription->arg(), true);
 
-    auto connection = QObject::connect(networkReply, &QNetworkReply::readyRead, &(dPtr->lambdaContext), [networkReply, handler, this]() {
+    std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
+    std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
+    std::shared_ptr<QMetaObject::Connection> readConnection(new QMetaObject::Connection);
+    *readConnection = QObject::connect(networkReply, &QNetworkReply::readyRead, subscription, [networkReply, subscription, this]() {
         auto replyIt = dPtr->activeStreamReplies.find(networkReply);
 
         QByteArray data = networkReply->readAll();
@@ -263,7 +285,7 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
             qProtoDebug() << "First chunk received: " << data.size() << " expectedDataSize: " << expectedDataSize;
 
             if (expectedDataSize == 0) {
-                handler(QByteArray());
+                subscription->handler(QByteArray());
                 return;
             }
 
@@ -275,9 +297,9 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
         dataContainer.container.append(data);
 
         qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
-        while (dataContainer.container.size() >= dataContainer.expectedSize) {
+        while (dataContainer.container.size() >= dataContainer.expectedSize && !networkReply->isFinished()) {
             qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
-            handler(dataContainer.container.mid(GrpcMessageSizeHeaderSize, dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
+            subscription->handler(dataContainer.container.mid(GrpcMessageSizeHeaderSize, dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
             dataContainer.container.remove(0, dataContainer.expectedSize);
             if (dataContainer.container.size() > GrpcMessageSizeHeaderSize) {
                 dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(dataContainer.container);
@@ -286,45 +308,65 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
             }
         }
 
-        if (dataContainer.container.size() < GrpcMessageSizeHeaderSize) {
+        if (dataContainer.container.size() < GrpcMessageSizeHeaderSize || networkReply->isFinished()) {
             dPtr->activeStreamReplies.erase(replyIt);
         }
     });
 
-    QObject::connect(client, &QAbstractGrpcClient::destroyed, networkReply, [networkReply, connection, this]() {
+    QObject::connect(client, &QAbstractGrpcClient::destroyed, networkReply, [networkReply, connection, abortConnection, readConnection, this]() {
+        if (*readConnection) {
+            QObject::disconnect(*readConnection);
+        }
+        if (*abortConnection) {
+            QObject::disconnect(*abortConnection);
+        }
+        if (*connection) {
+            QObject::disconnect(*connection);
+        }
         dPtr->activeStreamReplies.erase(networkReply);
-        QObject::disconnect(connection);
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
     });
 
-    QObject::connect(networkReply, &QNetworkReply::finished, &(dPtr->lambdaContext), [method, service, args, client, handler, networkReply, connection, this]() {
+    *connection = QObject::connect(networkReply, &QNetworkReply::finished, subscription, [subscription, service, networkReply, abortConnection, readConnection, client, this]() {
         QString errorString = networkReply->errorString();
         QNetworkReply::NetworkError networkError = networkReply->error();
+        if (*readConnection) {
+            QObject::disconnect(*readConnection);
+        }
+        if (*abortConnection) {
+            QObject::disconnect(*abortConnection);
+        }
 
         dPtr->activeStreamReplies.erase(networkReply);
-        QObject::disconnect(connection);
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
 
-        qProtoWarning() << method << "call" << service << "subscription finished: " << errorString;
+        qProtoWarning() << subscription->method() << "call" << service << "subscription finished: " << errorString;
         switch(networkError) {
         case QNetworkReply::RemoteHostClosedError:
-            subscribe(method, service, args, client, handler);
+            qProtoDebug() << "Remote server closed connection. Reconnect silently";
+            subscribe(subscription, service, client);
             break;
         case QNetworkReply::NoError:
+            //Reply closed without error
             break;
         default:
-            client->error(QGrpcStatus{StatusCodeMap.at(networkError), QString("%1 call %2 subscription failed: %3").arg(service).arg(method).arg(errorString)});
-            subscribe(method, service, args, client, handler);
+            subscription->error(QGrpcStatus{StatusCodeMap.at(networkError), QString("%1 call %2 subscription failed: %3").arg(service).arg(subscription->method()).arg(errorString)});
             break;
         }
     });
-}
 
-void QGrpcHttp2Channel::abort(QGrpcAsyncReply *reply)
-{
-    assert(reply != nullptr);
-    reply->setData({});
-    reply->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
+    *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, networkReply, [networkReply, connection, abortConnection, readConnection] {
+        if (*connection) {
+            QObject::disconnect(*connection);
+        }
+        if (*readConnection) {
+            QObject::disconnect(*readConnection);
+        }
+        if (*abortConnection) {
+            QObject::disconnect(*abortConnection);
+        }
+        QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+    });
 }
 
 std::shared_ptr<QAbstractProtobufSerializer> QGrpcHttp2Channel::serializer() const

+ 1 - 5
src/grpc/qgrpchttp2channel.h

@@ -46,12 +46,8 @@ public:
 
     QGrpcStatus call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret) override;
     void call(const QString &method, const QString &service, const QByteArray &args, QtProtobuf::QGrpcAsyncReply *reply) override;
-    void subscribe(const QString &method, const QString &service, const QByteArray &args, QAbstractGrpcClient *client, const std::function<void (const QByteArray &)> &handler) override;
+    void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) override;
     std::shared_ptr<QAbstractProtobufSerializer> serializer() const override;
-
-protected:
-    void abort(QGrpcAsyncReply *reply) override;
-
 private:
     Q_DISABLE_COPY_MOVE(QGrpcHttp2Channel)
 

+ 40 - 0
src/grpc/qgrpcsubscription.cpp

@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 Alexey Edelev <semlanik@gmail.com>
+ *
+ * This file is part of QtProtobuf project https://git.semlanik.org/semlanik/qtprotobuf
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this
+ * software and associated documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the rights to use, copy, modify,
+ * merge, publish, distribute, sublicense, and/or sell copies of the Software, and
+ * to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies
+ * or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+ * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+ * PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
+ * FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include "qgrpcsubscription.h"
+
+#include <qtprotobuflogging.h>
+
+using namespace QtProtobuf;
+
+QGrpcSubscription::QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
+                                     const QByteArray &arg, const std::function<void(const QByteArray&)> &handler) : QObject()
+  , m_channel(channel)
+  , m_method(method)
+  , m_arg(arg)
+  , m_handler(handler)
+{
+
+}

+ 102 - 0
src/grpc/qgrpcsubscription.h

@@ -0,0 +1,102 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 Alexey Edelev <semlanik@gmail.com>
+ *
+ * This file is part of QtProtobuf project https://git.semlanik.org/semlanik/qtprotobuf
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this
+ * software and associated documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the rights to use, copy, modify,
+ * merge, publish, distribute, sublicense, and/or sell copies of the Software, and
+ * to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies
+ * or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+ * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+ * PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
+ * FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#pragma once //QGrpcSubscription
+
+#include <functional>
+#include <QMutex>
+#include <memory>
+
+#include "qabstractgrpcchannel.h"
+#include "qabstractgrpcclient.h"
+
+#include "qtgrpcglobal.h"
+
+namespace QtProtobuf {
+
+class Q_GRPC_EXPORT QGrpcSubscription final : public QObject
+{
+    Q_OBJECT
+public:
+    virtual ~QGrpcSubscription() = default;
+
+    /*!
+     * \brief Cancels this subscription and try to abort call in channel
+     */
+    void cancel() {
+        m_channel->cancel(this);
+    }
+
+    /*!
+     * \private
+     * \brief Returns method for this subscription
+     */
+    QString method() const {
+        return m_method;
+    }
+
+    /*!
+     * \private
+     * \brief Returns serialized arguments for this subscription
+     */
+    QByteArray arg() const {
+        return m_arg;
+    }
+
+    /*!
+     * \private
+     * \brief Invokes handler method assigned to this subscription
+     */
+    void handler(const QByteArray& data) const {
+        m_handler(data);
+    }
+signals:
+    /*!
+     * \brief The signal is emitted when subscription is finished by user
+     */
+    void finished();
+
+    /*!
+     * \brief The signal is emitted when error happend in channel or during serialization
+     *        \note QtGrpc automaically re-tries to restore subscription in case of any channel
+     *        or serialization error.
+     * \param code gRPC channel QGrpcStatus::StatusCode
+     * \param errorMessage Description of error occured
+     */
+    void error(const QGrpcStatus &status);
+
+protected:
+    QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
+                      const QByteArray &arg, const std::function<void(const QByteArray&)> &handler);
+
+private:
+    friend class QAbstractGrpcClient;
+    std::shared_ptr<QAbstractGrpcChannel> m_channel;
+    QString m_method;
+    QByteArray m_arg;
+    std::function<void(const QByteArray&)> m_handler;
+};
+
+}

+ 0 - 8
src/protobuf/qprotobufserializerregistry.cpp

@@ -30,14 +30,6 @@
 #include <QString>
 #include <QHash>
 
-namespace std {
-  template<> struct hash<QString> {
-    std::size_t operator()(const QString &s) const {
-      return std::hash<std::string>()(s.toStdString());
-    }
-  };
-}
-
 namespace QtProtobuf {
 class QProtobufSerializerRegistryPrivate {
 public:

+ 8 - 0
src/protobuf/qtprotobuftypes.h

@@ -311,4 +311,12 @@ struct is_signed<QtProtobuf::sfixed32> : public is_signed<decltype(QtProtobuf::s
 //! \private
 template<>
 struct is_signed<QtProtobuf::sfixed64> : public is_signed<decltype(QtProtobuf::sfixed64::_t)> {};
+
+#if QT_VERSION < QT_VERSION_CHECK(5, 14, 0)
+template<> struct hash<QString> {
+    std::size_t operator()(const QString &s) const {
+        return std::hash<std::string>()(s.toStdString());
+    }
+};
+#endif
 }

+ 72 - 0
tests/test_grpc/clienttest.cpp

@@ -214,6 +214,78 @@ TEST_F(ClientTest, StringEchoStreamTest)
     ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream1Stream2Stream3Stream4");
 }
 
+TEST_F(ClientTest, StringEchoStreamAbortTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials()));
+    SimpleStringMessage result;
+    SimpleStringMessage request;
+    request.setTestFieldString("Stream");
+
+    QEventLoop waiter;
+
+    int i = 0;
+    QtProtobuf::QGrpcSubscription *subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    QObject::connect(&testClient, &TestServiceClient::testMethodServerStreamUpdated, &m_app, [&result, &i, &waiter, subscription](const SimpleStringMessage &ret) {
+        ++i;
+
+        result.setTestFieldString(result.testFieldString() + ret.testFieldString());
+
+        if (i == 3) {
+            subscription->cancel();
+            QTimer::singleShot(4000, &waiter, &QEventLoop::quit);
+        }
+    });
+
+    QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
+    waiter.exec();
+
+    ASSERT_EQ(i, 3);
+    ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream1Stream2Stream3");
+}
+
+TEST_F(ClientTest, StringEchoStreamAbortByTimerTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials()));
+    SimpleStringMessage result;
+    SimpleStringMessage request;
+    request.setTestFieldString("Stream");
+
+    QEventLoop waiter;
+
+
+    int i = 0;
+    QtProtobuf::QGrpcSubscription *subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    QTimer::singleShot(3500, subscription, [subscription](){
+        subscription->cancel();
+    });
+
+    bool isFinished = false;
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished](){
+        isFinished = true;
+    });
+
+    bool isError = false;
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::error, [&isError](){
+        isError = true;
+    });
+
+    QObject::connect(&testClient, &TestServiceClient::testMethodServerStreamUpdated, &m_app, [&result, &i](const SimpleStringMessage &ret) {
+        ++i;
+
+        result.setTestFieldString(result.testFieldString() + ret.testFieldString());
+    });
+
+    QTimer::singleShot(5000, &waiter, &QEventLoop::quit);
+    waiter.exec();
+
+    ASSERT_EQ(i, 3);
+    ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream1Stream2Stream3");
+    ASSERT_TRUE(isFinished);
+    ASSERT_TRUE(!isError);
+}
+
 TEST_F(ClientTest, StringEchoStreamTestRetUpdates)
 {
     TestServiceClient testClient;