Browse Source

Implement subscriptions reusage

- Remove update signals from generated clients. It's required because
  of multiple subscriptions possible for single streaming method.
  If user whould like to subscribe multiple times, it's not possible
  to determine which subscription invoked signal.
- Add subscription reusage mechanism
- Implement tests for ducplicates
- Make QGrpcSubscription to serve multiple handlers
Alexey Edelev 5 years ago
parent
commit
b94e6708ed

+ 3 - 3
examples/addressbook/addressbookengine.cpp

@@ -56,9 +56,9 @@ AddressBookEngine::AddressBookEngine() : QObject()
     std::shared_ptr<QtProtobuf::QAbstractGrpcChannel> channel(new QtProtobuf::QGrpcHttp2Channel(QUrl("https://localhost:65001"), QtProtobuf::QGrpcSslCredentials(conf) |
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>("authorizedUser", QCryptographicHash::hash("test", QCryptographicHash::Md5).toHex())));
     m_client->attachChannel(channel);
-    m_client->subscribeContactsUpdates(ListFrame());
-    connect(m_client, &AddressBookClient::contactsUpdated, this, [this](const Contacts &contacts) {
-        m_contacts->reset(contacts.list());
+    auto subscription = m_client->subscribeContactsUpdates(ListFrame());
+    connect(subscription, &QtProtobuf::QGrpcSubscription::updated, this, [this, subscription]() {
+        m_contacts->reset(subscription->read<Contacts>().list());
     });
     m_client->subscribeCallStatusUpdates(qtprotobuf::examples::None(), QPointer<CallStatus>(&m_callStatus));
 }

+ 4 - 4
examples/simplechat/simplechatengine.cpp

@@ -78,19 +78,19 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
         qCritical() << "Subscription error, cancel";
         subscription->cancel();
     });
-    QObject::connect(m_client, &SimpleChatClient::messageListUpdated, this, [this, name](const qtprotobuf::examples::ChatMessages &messages) {
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::updated, this, [this, name, subscription]() {
         if (m_userName != name) {
             m_userName = name;
             userNameChanged();
             loggedIn();
         }
-        m_messages.reset(messages.messages());
+        m_messages.reset(subscription->read<qtprotobuf::examples::ChatMessages>().messages());
     });
 }
 
 void SimpleChatEngine::sendMessage(const QString &content)
 {
-    m_client->sendMessage(ChatMessage(QDateTime::currentMSecsSinceEpoch(), content.toUtf8(), ChatMessage::ContentType::Text));
+    m_client->sendMessage(ChatMessage(static_cast<quint64>(QDateTime::currentMSecsSinceEpoch()), content.toUtf8(), ChatMessage::ContentType::Text));
 }
 
 qtprotobuf::examples::ChatMessage::ContentType SimpleChatEngine::clipBoardContentType() const
@@ -146,7 +146,7 @@ void SimpleChatEngine::sendImageFromClipboard()
         return;
     }
 
-    m_client->sendMessage(ChatMessage(QDateTime::currentMSecsSinceEpoch(), imgData, qtprotobuf::examples::ChatMessage::ContentType::Image));
+    m_client->sendMessage(ChatMessage(static_cast<quint64>(QDateTime::currentMSecsSinceEpoch()), imgData, qtprotobuf::examples::ChatMessage::ContentType::Image));
 }
 
 QString SimpleChatEngine::getImageThumbnail(const QByteArray &data) const

+ 0 - 1
src/generator/clientgenerator.cpp

@@ -85,7 +85,6 @@ void ClientGenerator::printClientMethodsDeclaration()
         getMethodParameters(method, parameters);
 
         if (method->server_streaming()) {
-            mPrinter->Print(parameters, Templates::ClientMethodSignalDeclarationTemplate);
             mPrinter->Print(parameters, Templates::ClientMethodServerStreamDeclarationTemplate);
             mPrinter->Print(parameters, Templates::ClientMethodServerStream2DeclarationTemplate);
         } else {

+ 2 - 2
src/generator/templates.cpp

@@ -269,11 +269,11 @@ const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf
 const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcSubscription *subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
 const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
                                                                     "{\n"
-                                                                    "    return subscribe(\"$method_name$\", $param_name$, &$classname$::$method_name$Updated);\n"
+                                                                    "    return subscribe(\"$method_name$\", $param_name$);\n"
                                                                     "}\n";
 const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
                                                                      "{\n"
-                                                                     "    return subscribe(\"$method_name$\", $param_name$, $return_name$, &$classname$::$method_name$Updated);\n"
+                                                                     "    return subscribe(\"$method_name$\", $param_name$, $return_name$);\n"
                                                                      "}\n";
 
 const char *Templates::ListSuffix = "Repeated";

+ 4 - 2
src/grpc/CMakeLists.txt

@@ -14,7 +14,8 @@ set(CMAKE_AUTORCC ON)
 include(${QTPROTOBUF_CMAKE_DIR}/Coverage.cmake)
 include(${QTPROTOBUF_CMAKE_DIR}/GenerateQtHeaders.cmake)
 
-file(GLOB SOURCES qgrpcasyncreply.cpp
+file(GLOB SOURCES qgrpcasyncoperationbase.cpp
+    qgrpcasyncreply.cpp
     qgrpcsubscription.cpp
     qgrpcstatus.cpp
     qabstractgrpcchannel.cpp
@@ -25,7 +26,8 @@ file(GLOB SOURCES qgrpcasyncreply.cpp
     qgrpcinsecurecredentials.cpp
     qgrpcuserpasswordcredentials.cpp)
 
-file(GLOB HEADERS qgrpcasyncreply.h
+file(GLOB HEADERS qgrpcasyncoperationbase_p.h
+    qgrpcasyncreply.h
     qgrpcsubscription.h
     qgrpcstatus.h
     qabstractgrpcchannel.h

+ 20 - 2
src/grpc/qabstractgrpcclient.cpp

@@ -41,6 +41,7 @@ public:
     std::shared_ptr<QAbstractGrpcChannel> channel;
     const QString service;
     std::shared_ptr<QAbstractProtobufSerializer> serializer;
+    std::vector<QGrpcSubscription *> activeSubscriptions;
 };
 }
 
@@ -99,11 +100,20 @@ QGrpcAsyncReply *QAbstractGrpcClient::call(const QString &method, const QByteArr
     return reply;
 }
 
-QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray&)> &handler)
+QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler)
 {
     QGrpcSubscription *subscription = nullptr;
     if (dPtr->channel) {
-        subscription = new QGrpcSubscription(dPtr->channel, method, arg, handler);
+        subscription = new QGrpcSubscription(dPtr->channel, method, arg, handler, this);
+
+        auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscription *activeSubscription) {
+           return *activeSubscription == *subscription;
+        });
+
+        if (it != std::end(dPtr->activeSubscriptions)) {
+            (*it)->addHandler(handler);
+            return *it; //If subscription already exists return it for handling
+        }
 
         connect(subscription, &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription error: " << status.message();
@@ -113,10 +123,18 @@ QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const Q
 
         connect(subscription, &QGrpcSubscription::finished, this, [this, subscription] {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription finished";
+            auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscription *activeSubscription) {
+               return *activeSubscription == *subscription;
+            });
+
+            if (it != std::end(dPtr->activeSubscriptions)) {
+                dPtr->activeSubscriptions.erase(it);
+            }
             subscription->deleteLater();
         });
 
         dPtr->channel->subscribe(subscription, dPtr->service, this);
+        dPtr->activeSubscriptions.push_back(subscription);
     } else {
         error({QGrpcStatus::Unknown, QLatin1String("No channel(s) attached.")});
     }

+ 16 - 17
src/grpc/qabstractgrpcclient.h

@@ -43,8 +43,16 @@
 namespace QtProtobuf {
 
 class QGrpcAsyncReply;
+class QGrpcSubscription;
+class QGrpcAsyncOperationBase;
 class QAbstractGrpcChannel;
 class QAbstractGrpcClientPrivate;
+
+/*!
+ * \private
+ */
+using SubscriptionHandler = std::function<void(const QByteArray&)>;
+
 /*!
  * \ingroup QtGrpc
  * \brief The QAbstractGrpcClient class is bridge between gRPC clients and channels. QAbstractGrpcClient provides set of
@@ -119,15 +127,9 @@ protected:
      * \param[out] signal Callback with return-message as input parameter that will be called each time message
      *             update recevied from server-stream
      */
-    template<typename A, typename R, typename C,
-             typename std::enable_if_t<std::is_base_of<QAbstractGrpcClient, C>::value, int> = 0>
-    QGrpcSubscription *subscribe(const QString &method, const A &arg, void(C::*signal)(const R &)) {
-        return subscribe(method, arg.serialize(serializer()), [this, signal](const QByteArray &data) {
-            R ret;
-            tryDeserialize(ret, data);
-            C *client = static_cast<C *>(this);
-            (client->*signal)(ret);
-        });
+    template<typename A>
+    QGrpcSubscription *subscribe(const QString &method, const A &arg) {
+        return subscribe(method, arg.serialize(serializer()));
     }
 
     /*!
@@ -140,9 +142,8 @@ protected:
      * \note If \p ret is used as property-fiels in other object, property NOTIFY signal won't be called in case of
      *       updated message recevied from server-stream
      */
-    template<typename A, typename R, typename C,
-             typename std::enable_if_t<std::is_base_of<QAbstractGrpcClient, C>::value, int> = 0>
-    QGrpcSubscription *subscribe(const QString &method, const A &arg, const QPointer<R> &ret, void(C::*signal)(const R &)) {
+    template<typename A, typename R>
+    QGrpcSubscription *subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
         if (ret.isNull()) {
             static const QString nullPointerError("Unable to subscribe method: %1. Pointer to return data is null");
             error({QGrpcStatus::InvalidArgument, nullPointerError.arg(method)});
@@ -150,11 +151,9 @@ protected:
             return nullptr;
         }
 
-        return subscribe(method, arg.serialize(serializer()), [ret, signal, this](const QByteArray &data) {
+        return subscribe(method, arg.serialize(serializer()), [ret, this](const QByteArray &data) {
             if (!ret.isNull()) {
                 tryDeserialize(*ret, data);
-                C *client = static_cast<C *>(this);
-                (client->*signal)(*ret);
             } else {
                 static const QLatin1String nullPointerError("Pointer to return data is null while subscription update received");
                 error({QGrpcStatus::InvalidArgument, nullPointerError});
@@ -171,7 +170,7 @@ protected:
 
     QAbstractProtobufSerializer *serializer() const;
 
-    friend class QGrpcAsyncReply;
+    friend class QGrpcAsyncOperationBase;
 private:
     /*!
      * \private
@@ -186,7 +185,7 @@ private:
     /*!
      * \private
      */
-    QGrpcSubscription *subscribe(const QString &method, const QByteArray &arg, const std::function<void(const QByteArray &)> &handler);
+    QGrpcSubscription *subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler = {});
 
     /*!
      * \private

+ 38 - 0
src/grpc/qgrpcasyncoperationbase.cpp

@@ -0,0 +1,38 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 Alexey Edelev <semlanik@gmail.com>
+ *
+ * This file is part of QtProtobuf project https://git.semlanik.org/semlanik/qtprotobuf
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this
+ * software and associated documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the rights to use, copy, modify,
+ * merge, publish, distribute, sublicense, and/or sell copies of the Software, and
+ * to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies
+ * or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+ * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+ * PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
+ * FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include "qgrpcasyncoperationbase_p.h"
+
+#include <qtprotobuflogging.h>
+
+using namespace QtProtobuf;
+
+QGrpcAsyncOperationBase::~QGrpcAsyncOperationBase()
+{
+    qProtoDebug() << "Trying ~QGrpcAsyncOperationBase" << this;
+    QMutexLocker locker(&m_asyncLock);
+    qProtoDebug() << "~QGrpcAsyncOperationBase" << this;
+    (void)locker;
+}

+ 110 - 0
src/grpc/qgrpcasyncoperationbase_p.h

@@ -0,0 +1,110 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 Alexey Edelev <semlanik@gmail.com>
+ *
+ * This file is part of QtProtobuf project https://git.semlanik.org/semlanik/qtprotobuf
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this
+ * software and associated documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the rights to use, copy, modify,
+ * merge, publish, distribute, sublicense, and/or sell copies of the Software, and
+ * to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies
+ * or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+ * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+ * PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
+ * FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#pragma once
+
+#include <QObject>
+#include <QMutex>
+
+#include <functional>
+#include <memory>
+
+#include "qabstractgrpcchannel.h"
+#include "qabstractgrpcclient.h"
+
+#include "qtgrpcglobal.h"
+
+namespace QtProtobuf {
+
+class Q_GRPC_EXPORT QGrpcAsyncOperationBase : public QObject
+{
+    Q_OBJECT
+public:
+    /*!
+     * \brief Reads message from raw byte array stored in QGrpcAsyncReply
+     * \return Copy of deserialized message or non-initialized message in case of exceptional situation
+     */
+    template <typename T>
+    T read() {
+        QMutexLocker locker(&m_asyncLock);
+        T value;
+        try {
+            value.deserialize(static_cast<QAbstractGrpcClient*>(parent())->serializer(), m_data);
+        } catch (std::invalid_argument &) {
+            static const QLatin1String invalidArgumentErrorMessage("Response deserialization failed invalid field found");
+            error({QGrpcStatus::InvalidArgument, invalidArgumentErrorMessage});
+        } catch (std::out_of_range &) {
+            static const QLatin1String outOfRangeErrorMessage("Invalid size of received buffer");
+            error({QGrpcStatus::OutOfRange, outOfRangeErrorMessage});
+        } catch (...) {
+            error({QGrpcStatus::Internal, QLatin1String("Unknown exception caught during deserialization")});
+        }
+        return value;
+    }
+
+    /*!
+     * \brief Interface for implementation of QAbstractGrpcChannel. Should be used to write raw data from channel to
+     *        reply
+     * \param data Raw data received from channel
+     */
+    void setData(const QByteArray &data)
+    {
+        QMutexLocker locker(&m_asyncLock);
+        m_data = data;
+    }
+
+signals:
+    /*!
+     * \brief The signal is emitted when reply is ready for read. Usualy called by channel when all chunks of data
+     *        recevied
+     */
+    void finished();
+
+    /*!
+     * \brief The signal is emitted when error happend in channel or during serialization
+     * \param code gRPC channel QGrpcStatus::StatusCode
+     * \param errorMessage Description of error occured
+     */
+    void error(const QGrpcStatus &status);
+
+protected:
+    //! \private
+    QGrpcAsyncOperationBase(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QObject(parent)
+      , m_channel(channel) {}
+    //! \private
+    virtual ~QGrpcAsyncOperationBase();
+
+    std::shared_ptr<QAbstractGrpcChannel> m_channel;
+private:
+    QGrpcAsyncOperationBase();
+    Q_DISABLE_COPY_MOVE(QGrpcAsyncOperationBase)
+
+    friend class QAbstractGrpcClient;
+
+    QByteArray m_data;
+    QMutex m_asyncLock;
+};
+
+}

+ 0 - 9
src/grpc/qgrpcasyncreply.cpp

@@ -24,14 +24,5 @@
  */
 
 #include "qgrpcasyncreply.h"
-#include <qtprotobuflogging.h>
 
 using namespace QtProtobuf;
-
-QGrpcAsyncReply::~QGrpcAsyncReply()
-{
-    qProtoDebug() << "Trying ~QGrpcAsyncReply" << this;
-    QMutexLocker locker(&m_asyncLock);
-    qProtoDebug() << "~QGrpcAsyncReply" << this;
-    (void)locker;
-}

+ 5 - 56
src/grpc/qgrpcasyncreply.h

@@ -31,6 +31,7 @@
 
 #include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
+#include "qgrpcasyncoperationbase_p.h"
 
 #include "qtgrpcglobal.h"
 
@@ -42,43 +43,10 @@ namespace QtProtobuf {
  *        created it. QGrpcAsyncReply coul be used by QAbstractGrpcChannel implementations to control call work flow and
  *        abort calls if possible in case if QGrpcAsyncReply::abort method called by library user.
  */
-class Q_GRPC_EXPORT QGrpcAsyncReply final : public QObject
+class Q_GRPC_EXPORT QGrpcAsyncReply final : public QGrpcAsyncOperationBase
 {
     Q_OBJECT
 public:
-    /*!
-     * \brief Reads message from raw byte array stored in QGrpcAsyncReply
-     * \return Copy of deserialized message or non-initialized message in case of exceptional situation
-     */
-    template <typename T>
-    T read() {
-        QMutexLocker locker(&m_asyncLock);
-        T value;
-        try {
-            value.deserialize(static_cast<QAbstractGrpcClient*>(parent())->serializer(), m_data);
-        } catch (std::invalid_argument &) {
-            static const QLatin1String invalidArgumentErrorMessage("Response deserialization failed invalid field found");
-            error({QGrpcStatus::InvalidArgument, invalidArgumentErrorMessage});
-        } catch (std::out_of_range &) {
-            static const QLatin1String outOfRangeErrorMessage("Invalid size of received buffer");
-            error({QGrpcStatus::OutOfRange, outOfRangeErrorMessage});
-        } catch (...) {
-            error({QGrpcStatus::Internal, QLatin1String("Unknown exception caught during deserialization")});
-        }
-        return value;
-    }
-
-    /*!
-     * \brief Interface for implementation of QAbstractGrpcChannel. Should be used to write raw data from channel to
-     *        reply
-     * \param data Raw data received from channel
-     */
-    void setData(const QByteArray &data)
-    {
-        QMutexLocker locker(&m_asyncLock);
-        m_data = data;
-    }
-
     /*!
      * \brief Aborts this reply and try to abort call in channel
      */
@@ -108,36 +76,17 @@ public:
         QObject::connect(this, &QGrpcAsyncReply::finished, receiver, finishCallback, type);
     }
 
-signals:
-    /*!
-     * \brief The signal is emitted when reply is ready for read. Usualy called by channel when all chunks of data
-     *        recevied
-     */
-    void finished();
-
-    /*!
-     * \brief The signal is emitted when error happend in channel or during serialization
-     * \param code gRPC channel QGrpcStatus::StatusCode
-     * \param errorMessage Description of error occured
-     */
-    void error(const QGrpcStatus &status);
-
 protected:
     //! \private
-    QGrpcAsyncReply(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent = nullptr) : QObject(parent)
-    , m_channel(channel) {}
+    QGrpcAsyncReply(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+    {}
     //! \private
-    ~QGrpcAsyncReply();
+    ~QGrpcAsyncReply() = default;
 
 private:
     QGrpcAsyncReply();
     Q_DISABLE_COPY_MOVE(QGrpcAsyncReply)
 
     friend class QAbstractGrpcClient;
-
-    std::shared_ptr<QAbstractGrpcChannel> m_channel;
-    QByteArray m_data;
-
-    QMutex m_asyncLock;
 };
 }

+ 10 - 3
src/grpc/qgrpcsubscription.cpp

@@ -30,11 +30,18 @@
 using namespace QtProtobuf;
 
 QGrpcSubscription::QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                                     const QByteArray &arg, const std::function<void(const QByteArray&)> &handler) : QObject()
-  , m_channel(channel)
+                                     const QByteArray &arg, const SubscriptionHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
   , m_method(method)
   , m_arg(arg)
-  , m_handler(handler)
 {
+    if (handler) {
+        m_handlers.push_back(handler);
+    }
+}
 
+void QGrpcSubscription::addHandler(const SubscriptionHandler &handler)
+{
+    if (handler) {
+        m_handlers.push_back(handler);
+    }
 }

+ 21 - 18
src/grpc/qgrpcsubscription.h

@@ -31,17 +31,18 @@
 
 #include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
+#include "qgrpcasyncoperationbase_p.h"
 
 #include "qtgrpcglobal.h"
 
 namespace QtProtobuf {
 
-class Q_GRPC_EXPORT QGrpcSubscription final : public QObject
+class QAbstractGrpcClient;
+
+class Q_GRPC_EXPORT QGrpcSubscription final : public QGrpcAsyncOperationBase
 {
     Q_OBJECT
 public:
-    virtual ~QGrpcSubscription() = default;
-
     /*!
      * \brief Cancels this subscription and try to abort call in channel
      */
@@ -69,34 +70,36 @@ public:
      * \private
      * \brief Invokes handler method assigned to this subscription
      */
-    void handler(const QByteArray& data) const {
-        m_handler(data);
+    void handler(const QByteArray& data) {
+        setData(data);
+        updated();
+        for (auto handler : m_handlers) {
+            handler(data);
+        }
+    }
+
+    bool operator ==(const QGrpcSubscription &other) const {
+        return other.method() == this->method() &&
+                other.arg() == this->arg();
     }
+
 signals:
     /*!
      * \brief The signal is emitted when subscription is finished by user
      */
-    void finished();
-
-    /*!
-     * \brief The signal is emitted when error happend in channel or during serialization
-     *        \note QtGrpc automaically re-tries to restore subscription in case of any channel
-     *        or serialization error.
-     * \param code gRPC channel QGrpcStatus::StatusCode
-     * \param errorMessage Description of error occured
-     */
-    void error(const QGrpcStatus &status);
+    void updated();
 
 protected:
     QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                      const QByteArray &arg, const std::function<void(const QByteArray&)> &handler);
+                      const QByteArray &arg, const SubscriptionHandler &handler, QAbstractGrpcClient *parent);
+    virtual ~QGrpcSubscription() = default;
 
+    void addHandler(const SubscriptionHandler &handler);
 private:
     friend class QAbstractGrpcClient;
-    std::shared_ptr<QAbstractGrpcChannel> m_channel;
     QString m_method;
     QByteArray m_arg;
-    std::function<void(const QByteArray&)> m_handler;
+    std::vector<SubscriptionHandler> m_handlers;
 };
 
 }

+ 92 - 6
tests/test_grpc/clienttest.cpp

@@ -195,7 +195,10 @@ TEST_F(ClientTest, StringEchoStreamTest)
     QEventLoop waiter;
 
     int i = 0;
-    QObject::connect(&testClient, &TestServiceClient::testMethodServerStreamUpdated, &m_app, [&result, &i, &waiter](const SimpleStringMessage &ret) {
+    auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
+        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
@@ -205,7 +208,6 @@ TEST_F(ClientTest, StringEchoStreamTest)
         }
     });
 
-    testClient.subscribeTestMethodServerStreamUpdates(request);
 
     QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
     waiter.exec();
@@ -226,7 +228,8 @@ TEST_F(ClientTest, StringEchoStreamAbortTest)
 
     int i = 0;
     QtProtobuf::QGrpcSubscription *subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(&testClient, &TestServiceClient::testMethodServerStreamUpdated, &m_app, [&result, &i, &waiter, subscription](const SimpleStringMessage &ret) {
+    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
+        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
@@ -271,7 +274,8 @@ TEST_F(ClientTest, StringEchoStreamAbortByTimerTest)
         isError = true;
     });
 
-    QObject::connect(&testClient, &TestServiceClient::testMethodServerStreamUpdated, &m_app, [&result, &i](const SimpleStringMessage &ret) {
+    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
+        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
@@ -326,12 +330,14 @@ TEST_F(ClientTest, HugeBlobEchoStreamTest)
     QByteArray dataHash = QCryptographicHash::hash(request.testBytes(), QCryptographicHash::Sha256);
     QEventLoop waiter;
 
-    QObject::connect(&testClient, &TestServiceClient::testMethodBlobServerStreamUpdated, &m_app, [&result, &waiter](const BlobMessage &ret) {
+    auto subscription = testClient.subscribeTestMethodBlobServerStreamUpdates(request);
+
+    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &waiter, subscription]() {
+        BlobMessage ret = subscription->read<BlobMessage>();
         result.setTestBytes(ret.testBytes());
         waiter.quit();
     });
 
-    testClient.subscribeTestMethodBlobServerStreamUpdates(request);
 
     QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
     waiter.exec();
@@ -520,3 +526,83 @@ TEST_F(ClientTest, AsyncReplySubscribeTest)
     callTimeout.stop();
     ASSERT_STREQ(result.testFieldString().toStdString().c_str(), request.testFieldString().toStdString().c_str());
 }
+
+TEST_F(ClientTest, MultipleSubscriptionsTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials()));
+    SimpleStringMessage result;
+    SimpleStringMessage request;
+    QEventLoop waiter;
+    request.setTestFieldString("Stream");
+
+    auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    auto subscriptionNext = testClient.subscribeTestMethodServerStreamUpdates(request);
+
+    ASSERT_EQ(subscription, subscriptionNext);
+
+    int i = 0;
+    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
+        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+        ++i;
+
+        result.setTestFieldString(result.testFieldString() + ret.testFieldString());
+    });
+
+    QTimer::singleShot(10000, &waiter, &QEventLoop::quit);
+    waiter.exec();
+
+    ASSERT_EQ(i, 4);
+    ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream1Stream2Stream3Stream4");
+}
+
+TEST_F(ClientTest, MultipleSubscriptionsCancelTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials()));
+    SimpleStringMessage result;
+    SimpleStringMessage request;
+    request.setTestFieldString("Stream");
+
+    auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    auto subscriptionNext = testClient.subscribeTestMethodServerStreamUpdates(request);
+
+    ASSERT_EQ(subscription, subscriptionNext);
+
+    bool isFinished = false;
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished](){
+        isFinished = true;
+    });
+
+    bool isFinishedNext = false;
+    QObject::connect(subscriptionNext, &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext](){
+        isFinishedNext = true;
+    });
+
+    subscriptionNext->cancel();
+
+    ASSERT_TRUE(isFinished);
+    ASSERT_TRUE(isFinishedNext);
+
+    subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    ASSERT_NE(subscription, subscriptionNext);
+
+    subscriptionNext = testClient.subscribeTestMethodServerStreamUpdates(request);
+
+    ASSERT_EQ(subscription, subscriptionNext);
+
+    isFinished = false;
+    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished](){
+        isFinished = true;
+    });
+
+    isFinishedNext = false;
+    QObject::connect(subscriptionNext, &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext](){
+        isFinishedNext = true;
+    });
+
+    subscription->cancel();
+
+    ASSERT_TRUE(isFinished);
+    ASSERT_TRUE(isFinishedNext);
+}