Kaynağa Gözat

Rename QGrpcSubscription to QGrpcStream

- Replace 'subscription' term with the 'stream' across the project
- Rename the 'updated' signal of QGrpcSubscription to
  'messageReceived'
- Remove 'Updates' suffix from the generated stream subscribe method
  names.
- Update QML gRPC tests. Use unique message identifiers.
Alexey Edelev 3 yıl önce
ebeveyn
işleme
c618516960

+ 4 - 4
examples/addressbook/addressbookengine.cpp

@@ -56,11 +56,11 @@ AddressBookEngine::AddressBookEngine() : QObject()
     std::shared_ptr<QtProtobuf::QAbstractGrpcChannel> channel(new QtProtobuf::QGrpcHttp2Channel(QUrl("https://localhost:65001"), QtProtobuf::QGrpcSslCredentials(conf) |
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>("authorizedUser", QCryptographicHash::hash("test", QCryptographicHash::Md5).toHex())));
     m_client->attachChannel(channel);
-    auto subscription = m_client->subscribeContactsUpdates(ListFrame());
-    connect(subscription.get(), &QtProtobuf::QGrpcSubscription::updated, this, [this, subscription]() {
-        m_contacts->reset(subscription->read<Contacts>().list());
+    auto stream = m_client->subscribeContacts(ListFrame());
+    connect(stream.get(), &QtProtobuf::QGrpcStream::messageReceived, this, [this, stream]() {
+        m_contacts->reset(stream->read<Contacts>().list());
     });
-    m_client->subscribeCallStatusUpdates(qtprotobuf::examples::None(), QPointer<CallStatus>(&m_callStatus));
+    m_client->subscribeCallStatus(qtprotobuf::examples::None(), QPointer<CallStatus>(&m_callStatus));
 }
 
 void AddressBookEngine::addContact(qtprotobuf::examples::Contact *contact)

+ 6 - 6
examples/simplechat/simplechatengine.cpp

@@ -73,18 +73,18 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>(name, QCryptographicHash::hash(password.toUtf8(), QCryptographicHash::Md5).toHex())));
 
     m_client->attachChannel(channel);
-    QtProtobuf::QGrpcSubscriptionShared subscription = m_client->subscribeMessageListUpdates(None());
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::error, this, [subscription] {
-        qCritical() << "Subscription error, cancel";
-        subscription->cancel();
+    QtProtobuf::QGrpcStreamShared stream = m_client->subscribeMessageList(None());
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::error, this, [stream] {
+        qCritical() << "Stream error, cancel";
+        stream->cancel();
     });
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::updated, this, [this, name, subscription]() {
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::messageReceived, this, [this, name, stream]() {
         if (m_userName != name) {
             m_userName = name;
             userNameChanged();
             loggedIn();
         }
-        m_messages.reset(subscription->read<qtprotobuf::examples::ChatMessages>().messages());
+        m_messages.reset(stream->read<qtprotobuf::examples::ChatMessages>().messages());
     });
 }
 

+ 1 - 1
src/generator/clientdeclarationprinter.cpp

@@ -66,7 +66,7 @@ void ClientDeclarationPrinter::printClientIncludes()
     std::unordered_set<std::string> includeSet;
     includeSet.insert("QAbstractGrpcClient");
     includeSet.insert("QGrpcAsyncReply");
-    includeSet.insert("QGrpcSubscription");
+    includeSet.insert("QGrpcStream");
     for (auto type : includeSet) {
         mPrinter->Print({{"include", type}}, Templates::ExternalIncludeTemplate);
     }

+ 1 - 1
src/generator/singlefilegenerator.cpp

@@ -211,7 +211,7 @@ bool SingleFileGenerator::GenerateServices(const ::google::protobuf::FileDescrip
 
     externalIncludes.insert("QAbstractGrpcClient");
     externalIncludes.insert("QGrpcAsyncReply");
-    externalIncludes.insert("QGrpcSubscription");
+    externalIncludes.insert("QGrpcStream");
 
     if (file->message_type_count() > 0) {
         internalIncludes.insert(basename + Templates::ProtoFileSuffix);

+ 6 - 6
src/generator/templates.cpp

@@ -378,19 +378,19 @@ const char *Templates::QmlRegisterEnumTypeTemplate = "qmlRegisterUncreatableType
 
 
 const char *Templates::ClientMethodSignalDeclarationTemplate = "Q_SIGNAL void $method_name$Updated(const $return_type$ &);\n";
-const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf::QGrpcSubscriptionShared subscribe$method_name_upper$Updates(const $param_type$ &$param_name$);\n";
-const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcSubscriptionShared subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
-const char *Templates::ClientMethodServerStreamQmlDeclarationTemplate = "Q_INVOKABLE QtProtobuf::QGrpcSubscriptionShared qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$);\n";
+const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf::QGrpcStreamShared subscribe$method_name_upper$(const $param_type$ &$param_name$);\n";
+const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcStreamShared subscribe$method_name_upper$(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
+const char *Templates::ClientMethodServerStreamQmlDeclarationTemplate = "Q_INVOKABLE QtProtobuf::QGrpcStreamShared qmlSubscribe$method_name_upper$_p($param_type$ *$param_name$, $return_type$ *$return_name$);\n";
 
-const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
+const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcStreamShared $classname$::subscribe$method_name_upper$(const $param_type$ &$param_name$)\n"
                                                                     "{\n"
                                                                     "    return subscribe(\"$method_name$\", $param_name$);\n"
                                                                     "}\n";
-const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
+const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcStreamShared $classname$::subscribe$method_name_upper$(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
                                                                      "{\n"
                                                                      "    return subscribe(\"$method_name$\", $param_name$, $return_name$);\n"
                                                                      "}\n";
-const char *Templates::ClientMethodServerStreamQmlDefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$)\n"
+const char *Templates::ClientMethodServerStreamQmlDefinitionTemplate = "QtProtobuf::QGrpcStreamShared $classname$::qmlSubscribe$method_name_upper$_p($param_type$ *$param_name$, $return_type$ *$return_name$)\n"
                                                                        "{\n"
                                                                        "    return subscribe(\"$method_name$\", *$param_name$, QPointer<$return_type$>($return_name$));\n"
                                                                        "}\n";

+ 2 - 2
src/grpc/CMakeLists.txt

@@ -2,7 +2,7 @@ qt_protobuf_internal_add_library(Grpc
     SOURCES
         qgrpcasyncoperationbase.cpp
         qgrpcasyncreply.cpp
-        qgrpcsubscription.cpp
+        qgrpcstream.cpp
         qgrpcstatus.cpp
         qabstractgrpcchannel.cpp
         qgrpchttp2channel.cpp
@@ -14,7 +14,7 @@ qt_protobuf_internal_add_library(Grpc
     PUBLIC_HEADER
         qgrpcasyncoperationbase_p.h
         qgrpcasyncreply.h
-        qgrpcsubscription.h
+        qgrpcstream.h
         qgrpcstatus.h
         qabstractgrpcchannel.h
         qgrpchttp2channel.h

+ 4 - 4
src/grpc/qabstractgrpcchannel.cpp

@@ -26,7 +26,7 @@
 #include "qabstractgrpcchannel.h"
 
 #include "qgrpcasyncreply.h"
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 #include <QThread>
 
 namespace QtProtobuf {
@@ -52,10 +52,10 @@ void QAbstractGrpcChannel::abort(QGrpcAsyncReply *reply)
     reply->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
 }
 
-void QAbstractGrpcChannel::cancel(QGrpcSubscription *subscription)
+void QAbstractGrpcChannel::cancel(QGrpcStream *stream)
 {
-    assert(subscription != nullptr);
-    subscription->finished();
+    assert(stream != nullptr);
+    stream->finished();
 }
 
 const QThread *QAbstractGrpcChannel::thread() const

+ 6 - 6
src/grpc/qabstractgrpcchannel.h

@@ -39,7 +39,7 @@ class QThread;
 namespace QtProtobuf {
 
 class QGrpcAsyncReply;
-class QGrpcSubscription;
+class QGrpcStream;
 class QAbstractGrpcClient;
 class QAbstractProtobufSerializer;
 struct QAbstractGrpcChannelPrivate;
@@ -85,7 +85,7 @@ public:
      * \param[in] args serialized argument message
      * \param[in] handler callback that will be called when message recevied from the server-stream
      */
-    virtual void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) = 0;
+    virtual void subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client) = 0;
 
     virtual std::shared_ptr<QAbstractProtobufSerializer> serializer() const = 0;
 
@@ -106,13 +106,13 @@ protected:
 
     /*!
      * \private
-     * \brief Cancels \p subscription
-     * \param[in] subscription returned by QAbstractGrpcChannel::subscribe() method
+     * \brief Cancels \p stream
+     * \param[in] stream returned by QAbstractGrpcChannel::subscribe() method
      */
-    virtual void cancel(QGrpcSubscription *subscription);
+    virtual void cancel(QGrpcStream *stream);
 
     friend class QGrpcAsyncReply;
-    friend class QGrpcSubscription;
+    friend class QGrpcStream;
 private:
     Q_DISABLE_COPY(QAbstractGrpcChannel)
     std::unique_ptr<QAbstractGrpcChannelPrivate> dPtr;

+ 30 - 30
src/grpc/qabstractgrpcclient.cpp

@@ -26,7 +26,7 @@
 #include "qabstractgrpcclient.h"
 
 #include "qgrpcasyncreply.h"
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 #include "qprotobufserializerregistry_p.h"
 
 #include <QTimer>
@@ -44,7 +44,7 @@ public:
     std::shared_ptr<QAbstractGrpcChannel> channel;
     const QString service;
     std::shared_ptr<QAbstractProtobufSerializer> serializer;
-    std::vector<QGrpcSubscriptionShared> activeSubscriptions;
+    std::vector<QGrpcStreamShared> activeStreams;
 };
 }
 
@@ -128,64 +128,64 @@ QGrpcAsyncReplyShared QAbstractGrpcClient::call(const QString &method, const QBy
     return reply;
 }
 
-QGrpcSubscriptionShared QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler)
+QGrpcStreamShared QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::StreamHandler &handler)
 {
-    QGrpcSubscriptionShared subscription;
+    QGrpcStreamShared stream;
 
     if (thread() != QThread::currentThread()) {
-        QMetaObject::invokeMethod(this, [&]()->QGrpcSubscriptionShared {
-                                      qProtoDebug() << "Subscription: " << dPtr->service << method << " called from different thread";
+        QMetaObject::invokeMethod(this, [&]()->QGrpcStreamShared {
+                                      qProtoDebug() << "Stream: " << dPtr->service << method << " called from different thread";
                                       return subscribe(method, arg, handler);
-                                  }, Qt::BlockingQueuedConnection, &subscription);
+                                  }, Qt::BlockingQueuedConnection, &stream);
     } else if (dPtr->channel) {
-        subscription.reset(new QGrpcSubscription(dPtr->channel, method, arg, handler, this), [](QGrpcSubscription *subscription) { subscription->deleteLater(); });
+        stream.reset(new QGrpcStream(dPtr->channel, method, arg, handler, this), [](QGrpcStream *stream) { stream->deleteLater(); });
 
-        auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](const QGrpcSubscriptionShared &activeSubscription) {
-           return *activeSubscription == *subscription;
+        auto it = std::find_if(std::begin(dPtr->activeStreams), std::end(dPtr->activeStreams), [stream](const QGrpcStreamShared &activeStream) {
+           return *activeStream == *stream;
         });
 
-        if (it != std::end(dPtr->activeSubscriptions)) {
+        if (it != std::end(dPtr->activeStreams)) {
             (*it)->addHandler(handler);
-            return *it; //If subscription already exists return it for handling
+            return *it; //If stream already exists return it for handling
         }
 
         auto errorConnection = std::make_shared<QMetaObject::Connection>();
-        *errorConnection = connect(subscription.get(), &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
-            qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription error: " << status.message();
+        *errorConnection = connect(stream.get(), &QGrpcStream::error, this, [this, stream](const QGrpcStatus &status) {
+            qProtoWarning() << stream->method() << "call" << dPtr->service << "stream error: " << status.message();
             error(status);
-            std::weak_ptr<QGrpcSubscription> weakSubscription = subscription;
+            std::weak_ptr<QGrpcStream> weakStream = stream;
             //TODO: Make timeout configurable from channel settings
-            QTimer::singleShot(1000, this, [this, weakSubscription, method = subscription->method()] {
-                auto subscription = weakSubscription.lock();
-                if (subscription) {
-                    dPtr->channel->subscribe(subscription.get(), dPtr->service, this);
+            QTimer::singleShot(1000, this, [this, weakStream, method = stream->method()] {
+                auto stream = weakStream.lock();
+                if (stream) {
+                    dPtr->channel->subscribe(stream.get(), dPtr->service, this);
                 } else {
-                    qProtoDebug() << "Subscription for " << dPtr->service << "method" << method << " will not be restored by timeout.";
+                    qProtoDebug() << "Stream for " << dPtr->service << "method" << method << " will not be restored by timeout.";
                 }
             });
         });
 
         auto finishedConnection = std::make_shared<QMetaObject::Connection>();
-        *finishedConnection = connect(subscription.get(), &QGrpcSubscription::finished, this, [this, subscription, errorConnection, finishedConnection]() mutable {
-            qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription finished";
-            auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscriptionShared activeSubscription) {
-               return *activeSubscription == *subscription;
+        *finishedConnection = connect(stream.get(), &QGrpcStream::finished, this, [this, stream, errorConnection, finishedConnection]() mutable {
+            qProtoWarning() << stream->method() << "call" << dPtr->service << "stream finished";
+            auto it = std::find_if(std::begin(dPtr->activeStreams), std::end(dPtr->activeStreams), [stream](QGrpcStreamShared activeStream) {
+               return *activeStream == *stream;
             });
 
-            if (it != std::end(dPtr->activeSubscriptions)) {
-                dPtr->activeSubscriptions.erase(it);
+            if (it != std::end(dPtr->activeStreams)) {
+                dPtr->activeStreams.erase(it);
             }
             QObject::disconnect(*errorConnection);
             QObject::disconnect(*finishedConnection);
-            subscription.reset();
+            stream.reset();
         });
 
-        dPtr->channel->subscribe(subscription.get(), dPtr->service, this);
-        dPtr->activeSubscriptions.push_back(subscription);
+        dPtr->channel->subscribe(stream.get(), dPtr->service, this);
+        dPtr->activeStreams.push_back(stream);
     } else {
         error({QGrpcStatus::Unknown, QLatin1String("No channel(s) attached.")});
     }
-    return subscription;
+    return stream;
 }
 
 QAbstractProtobufSerializer *QAbstractGrpcClient::serializer() const

+ 8 - 8
src/grpc/qabstractgrpcclient.h

@@ -47,7 +47,7 @@
 namespace QtProtobuf {
 
 class QGrpcAsyncReply;
-class QGrpcSubscription;
+class QGrpcStream;
 class QGrpcAsyncOperationBase;
 class QAbstractGrpcChannel;
 class QAbstractGrpcClientPrivate;
@@ -55,7 +55,7 @@ class QAbstractGrpcClientPrivate;
 /*!
  * \private
  */
-using SubscriptionHandler = std::function<void(const QByteArray&)>;
+using StreamHandler = std::function<void(const QByteArray&)>;
 
 /*!
  * \ingroup QtGrpc
@@ -135,7 +135,7 @@ protected:
      *             update recevied from server-stream
      */
     template<typename A>
-    QGrpcSubscriptionShared subscribe(const QString &method, const A &arg) {
+    QGrpcStreamShared subscribe(const QString &method, const A &arg) {
         return subscribe(method, arg.serialize(serializer()));
     }
 
@@ -150,7 +150,7 @@ protected:
      *       updated message recevied from server-stream
      */
     template<typename A, typename R>
-    QGrpcSubscriptionShared subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
+    QGrpcStreamShared subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
         if (ret.isNull()) {
             static const QString nullPointerError("Unable to subscribe method: %1. Pointer to return data is null");
             error({QGrpcStatus::InvalidArgument, nullPointerError.arg(method)});
@@ -162,7 +162,7 @@ protected:
             if (!ret.isNull()) {
                 tryDeserialize(*ret, data);
             } else {
-                static const QLatin1String nullPointerError("Pointer to return data is null while subscription update received");
+                static const QLatin1String nullPointerError("Pointer to return data is null while stream update received");
                 error({QGrpcStatus::InvalidArgument, nullPointerError});
                 qProtoCritical() << nullPointerError;
             }
@@ -170,8 +170,8 @@ protected:
     }
 
     /*!
-     * \brief Canceles all subscriptions for specified \p method
-     * \param[in] method Name of method subscription for to be canceled
+     * \brief Canceles all streams for specified \p method
+     * \param[in] method Name of method stream for to be canceled
      */
     void cancel(const QString &method);
 
@@ -190,7 +190,7 @@ private:
     QGrpcAsyncReplyShared call(const QString &method, const QByteArray &arg);
 
     //!\private
-    QGrpcSubscriptionShared subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler = {});
+    QGrpcStreamShared subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::StreamHandler &handler = {});
 
     /*!
      * \private

+ 1 - 1
src/grpc/qgrpcasyncoperationbase_p.h

@@ -41,7 +41,7 @@ namespace QtProtobuf {
 /*!
  * \ingroup QtGrpc
  * \private
- * \brief The QGrpcAsyncOperationBase class implements subscription logic
+ * \brief The QGrpcAsyncOperationBase class implements stream logic
  */
 class Q_GRPC_EXPORT QGrpcAsyncOperationBase : public QObject
 {

+ 22 - 22
src/grpc/qgrpcchannel.cpp

@@ -46,7 +46,7 @@
 #include "qabstractgrpccredentials.h"
 #include "qgrpcasyncreply.h"
 #include "qgrpcstatus.h"
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpccredentials.h"
 #include "qprotobufserializerregistry_p.h"
@@ -78,7 +78,7 @@ static inline void parseQByteArray(const QByteArray &bytearray, grpc::ByteBuffer
     buffer.Swap(&tmp);
 }
 
-QGrpcChannelSubscription::QGrpcChannelSubscription(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent) : QObject(parent)
+QGrpcChannelStream::QGrpcChannelStream(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent) : QObject(parent)
 {
     grpc::ByteBuffer request;
     parseQByteArray(data, request);
@@ -117,15 +117,15 @@ QGrpcChannelSubscription::QGrpcChannelSubscription(grpc::Channel *channel, const
         return; // exit thread
     });
 
-    connect(thread, &QThread::finished, this, &QGrpcChannelSubscription::finished);
+    connect(thread, &QThread::finished, this, &QGrpcChannelStream::finished);
 }
 
-void QGrpcChannelSubscription::start()
+void QGrpcChannelStream::start()
 {
     thread->start();
 }
 
-QGrpcChannelSubscription::~QGrpcChannelSubscription()
+QGrpcChannelStream::~QGrpcChannelStream()
 {
     cancel();
     thread->wait();
@@ -136,9 +136,9 @@ QGrpcChannelSubscription::~QGrpcChannelSubscription()
     }
 }
 
-void QGrpcChannelSubscription::cancel() {
+void QGrpcChannelStream::cancel() {
     // TODO: check thread safety
-    qProtoDebug() << "Subscription thread terminated";
+    qProtoDebug() << "Stream thread terminated";
     context.TryCancel();
 }
 
@@ -256,29 +256,29 @@ QGrpcStatus QGrpcChannelPrivate::call(const QString &method, const QString &serv
     return call.status;
 }
 
-void QGrpcChannelPrivate::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
+void QGrpcChannelPrivate::subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client)
 {
-    assert(subscription != nullptr);
+    assert(stream != nullptr);
 
-    QString rpcName = QString("/%1/%2").arg(service).arg(subscription->method());
+    QString rpcName = QString("/%1/%2").arg(service).arg(stream->method());
 
-    std::shared_ptr<QGrpcChannelSubscription> sub;
+    std::shared_ptr<QGrpcChannelStream> sub;
     std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> readConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> clientConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
 
     sub.reset(
-        new QGrpcChannelSubscription(m_channel.get(), rpcName, subscription->arg(), subscription),
-        [](QGrpcChannelSubscription * sub) { sub->deleteLater(); }
+        new QGrpcChannelStream(m_channel.get(), rpcName, stream->arg(), stream),
+        [](QGrpcChannelStream * sub) { sub->deleteLater(); }
     );
 
-    *readConnection = QObject::connect(sub.get(), &QGrpcChannelSubscription::dataReady, subscription, [subscription](const QByteArray &data) {
-        subscription->handler(data);
+    *readConnection = QObject::connect(sub.get(), &QGrpcChannelStream::dataReady, stream, [stream](const QByteArray &data) {
+        stream->handler(data);
     });
 
-    *connection = QObject::connect(sub.get(), &QGrpcChannelSubscription::finished, subscription, [sub, subscription, readConnection, abortConnection, service, connection, clientConnection](){
-        qProtoDebug() << "Subscription ended with server closing connection";
+    *connection = QObject::connect(sub.get(), &QGrpcChannelStream::finished, stream, [sub, stream, readConnection, abortConnection, service, connection, clientConnection](){
+        qProtoDebug() << "Stream ended with server closing connection";
 
         QObject::disconnect(*connection);
         QObject::disconnect(*readConnection);
@@ -287,12 +287,12 @@ void QGrpcChannelPrivate::subscribe(QGrpcSubscription *subscription, const QStri
 
         if (sub->status.code() != QGrpcStatus::Ok)
         {
-            subscription->error(sub->status);
+            stream->error(sub->status);
         }
     });
 
-    *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, sub.get(), [connection, abortConnection, readConnection, sub, clientConnection] {
-        qProtoDebug() << "Subscription client was finished";
+    *abortConnection = QObject::connect(stream, &QGrpcStream::finished, sub.get(), [connection, abortConnection, readConnection, sub, clientConnection] {
+        qProtoDebug() << "Stream client was finished";
 
         QObject::disconnect(*connection);
         QObject::disconnect(*readConnection);
@@ -335,9 +335,9 @@ void QGrpcChannel::call(const QString &method, const QString &service, const QBy
     dPtr->call(method, service, args, reply);
 }
 
-void QGrpcChannel::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
+void QGrpcChannel::subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client)
 {
-    dPtr->subscribe(subscription, service, client);
+    dPtr->subscribe(stream, service, client);
 }
 
 std::shared_ptr<QAbstractProtobufSerializer> QGrpcChannel::serializer() const

+ 1 - 1
src/grpc/qgrpcchannel.h

@@ -58,7 +58,7 @@ public:
 
     QGrpcStatus call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret) override;
     void call(const QString &method, const QString &service, const QByteArray &args, QtProtobuf::QGrpcAsyncReply *reply) override;
-    void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) override;
+    void subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client) override;
     std::shared_ptr<QAbstractProtobufSerializer> serializer() const override;
 
 private:

+ 5 - 5
src/grpc/qgrpcchannel_p.h

@@ -34,7 +34,7 @@
 
 #include "qabstractgrpccredentials.h"
 #include "qgrpcasyncreply.h"
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpccredentials.h"
 #include "qprotobufserializerregistry_p.h"
@@ -43,13 +43,13 @@
 namespace QtProtobuf {
 
 //! \private
-class QGrpcChannelSubscription : public QObject {
+class QGrpcChannelStream : public QObject {
     //! \private
     Q_OBJECT;
 
 public:
-    QGrpcChannelSubscription(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent = nullptr);
-    ~QGrpcChannelSubscription();
+    QGrpcChannelStream(grpc::Channel *channel, const QString &method, const QByteArray &data, QObject *parent = nullptr);
+    ~QGrpcChannelStream();
 
     void cancel();
     void start();
@@ -101,7 +101,7 @@ struct QGrpcChannelPrivate {
 
     void call(const QString &method, const QString &service, const QByteArray &args, QGrpcAsyncReply *reply);
     QGrpcStatus call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret);
-    void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client);
+    void subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client);
 };
 
 };

+ 12 - 12
src/grpc/qgrpchttp2channel.cpp

@@ -37,7 +37,7 @@
 #include <unordered_map>
 
 #include "qgrpcasyncreply.h"
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpccredentials.h"
 #include "qprotobufserializerregistry_p.h"
@@ -269,15 +269,15 @@ void QGrpcHttp2Channel::call(const QString &method, const QString &service, cons
     });
 }
 
-void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client)
+void QGrpcHttp2Channel::subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client)
 {
-    assert(subscription != nullptr);
-    QNetworkReply *networkReply = dPtr->post(subscription->method(), service, subscription->arg(), true);
+    assert(stream != nullptr);
+    QNetworkReply *networkReply = dPtr->post(stream->method(), service, stream->arg(), true);
 
     std::shared_ptr<QMetaObject::Connection> finishConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> readConnection(new QMetaObject::Connection);
-    *readConnection = QObject::connect(networkReply, &QNetworkReply::readyRead, subscription, [networkReply, subscription, this]() {
+    *readConnection = QObject::connect(networkReply, &QNetworkReply::readyRead, stream, [networkReply, stream, this]() {
         auto replyIt = dPtr->activeStreamReplies.find(networkReply);
 
         QByteArray data = networkReply->readAll();
@@ -289,7 +289,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
             qProtoDebug() << "First chunk received: " << data.size() << " expectedDataSize: " << expectedDataSize;
 
             if (expectedDataSize == 0) {
-                subscription->handler(QByteArray());
+                stream->handler(QByteArray());
                 return;
             }
 
@@ -303,7 +303,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
         qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
         while (dataContainer.container.size() >= dataContainer.expectedSize && !networkReply->isFinished()) {
             qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
-            subscription->handler(dataContainer.container.mid(GrpcMessageSizeHeaderSize, dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
+            stream->handler(dataContainer.container.mid(GrpcMessageSizeHeaderSize, dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
             dataContainer.container.remove(0, dataContainer.expectedSize);
             if (dataContainer.container.size() > GrpcMessageSizeHeaderSize) {
                 dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(dataContainer.container);
@@ -332,7 +332,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
         networkReply->deleteLater();
     });
 
-    *finishConnection = QObject::connect(networkReply, &QNetworkReply::finished, subscription, [subscription, service, networkReply, abortConnection, readConnection, finishConnection, client, this]() {
+    *finishConnection = QObject::connect(networkReply, &QNetworkReply::finished, stream, [stream, service, networkReply, abortConnection, readConnection, finishConnection, client, this]() {
         QString errorString = networkReply->errorString();
         QNetworkReply::NetworkError networkError = networkReply->error();
         if (*readConnection) {
@@ -345,22 +345,22 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
         dPtr->activeStreamReplies.erase(networkReply);
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
         networkReply->deleteLater();
-        qProtoWarning() << subscription->method() << "call" << service << "subscription finished: " << errorString;
+        qProtoWarning() << stream->method() << "call" << service << "stream finished: " << errorString;
         switch (networkError) {
         case QNetworkReply::RemoteHostClosedError:
             qProtoDebug() << "Remote server closed connection. Reconnect silently";
-            subscribe(subscription, service, client);
+            subscribe(stream, service, client);
             break;
         case QNetworkReply::NoError:
             //Reply closed without error
             break;
         default:
-            subscription->error(QGrpcStatus{StatusCodeMap.at(networkError), QString("%1 call %2 subscription failed: %3").arg(service).arg(subscription->method()).arg(errorString)});
+            stream->error(QGrpcStatus{StatusCodeMap.at(networkError), QString("%1 call %2 stream failed: %3").arg(service).arg(stream->method()).arg(errorString)});
             break;
         }
     });
 
-    *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, networkReply, [networkReply, finishConnection, abortConnection, readConnection] {
+    *abortConnection = QObject::connect(stream, &QGrpcStream::finished, networkReply, [networkReply, finishConnection, abortConnection, readConnection] {
         if (*finishConnection) {
             QObject::disconnect(*finishConnection);
         }

+ 1 - 1
src/grpc/qgrpchttp2channel.h

@@ -57,7 +57,7 @@ public:
 
     QGrpcStatus call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret) override;
     void call(const QString &method, const QString &service, const QByteArray &args, QtProtobuf::QGrpcAsyncReply *reply) override;
-    void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) override;
+    void subscribe(QGrpcStream *stream, const QString &service, QAbstractGrpcClient *client) override;
     std::shared_ptr<QAbstractProtobufSerializer> serializer() const override;
 private:
     Q_DISABLE_COPY_MOVE(QGrpcHttp2Channel)

+ 1 - 1
src/grpc/qgrpcstatus.h

@@ -51,7 +51,7 @@ class QGrpcStatusPrivate;
 
 /*!
  * \ingroup QtGrpc
- * \brief The QGrpcStatus class contains information about last gRPC operation. In case of error in call/subscription
+ * \brief The QGrpcStatus class contains information about last gRPC operation. In case of error in call/stream
  *        processing QGrpcStatus will contain code any of non-Ok QGrpcStatus::StatusCode.
  *        This class combines QGrpcStatus::StatusCode and message returned from channel or QGrpc framework.
  */

+ 5 - 5
src/grpc/qgrpcsubscription.cpp → src/grpc/qgrpcstream.cpp

@@ -23,15 +23,15 @@
  * DEALINGS IN THE SOFTWARE.
  */
 
-#include "qgrpcsubscription.h"
+#include "qgrpcstream.h"
 
 #include <qtprotobuflogging.h>
 #include <QThread>
 
 using namespace QtProtobuf;
 
-QGrpcSubscription::QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                                     const QByteArray &arg, const SubscriptionHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+QGrpcStream::QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
+                                     const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
   , m_method(method)
   , m_arg(arg)
 {
@@ -40,14 +40,14 @@ QGrpcSubscription::QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel>
     }
 }
 
-void QGrpcSubscription::addHandler(const SubscriptionHandler &handler)
+void QGrpcStream::addHandler(const StreamHandler &handler)
 {
     if (handler) {
         m_handlers.push_back(handler);
     }
 }
 
-void QGrpcSubscription::cancel()
+void QGrpcStream::cancel()
 {
     if (thread() != QThread::currentThread()) {
         QMetaObject::invokeMethod(this, [this](){m_channel->cancel(this);}, Qt::BlockingQueuedConnection);

+ 18 - 18
src/grpc/qgrpcsubscription.h → src/grpc/qgrpcstream.h

@@ -23,7 +23,7 @@
  * DEALINGS IN THE SOFTWARE.
  */
 
-#pragma once //QGrpcSubscription
+#pragma once //QGrpcStream
 
 #include <functional>
 #include <QMutex>
@@ -41,62 +41,62 @@ class QAbstractGrpcClient;
 
 /*!
  * \ingroup QtGrpc
- * \brief The QGrpcSubscription class
+ * \brief The QGrpcStream class
  */
-class Q_GRPC_EXPORT QGrpcSubscription final : public QGrpcAsyncOperationBase
+class Q_GRPC_EXPORT QGrpcStream final : public QGrpcAsyncOperationBase
 {
     Q_OBJECT
 public:
     /*!
-     * \brief Cancels this subscription and try to abort call in channel
+     * \brief Cancels this stream and try to abort call in channel
      */
     void cancel();
 
     /*!
-     * \brief Returns method for this subscription
+     * \brief Returns method for this stream
      */
     QString method() const {
         return m_method;
     }
 
     /*!
-     * \brief Returns serialized arguments for this subscription
+     * \brief Returns serialized arguments for this stream
      */
     QByteArray arg() const {
         return m_arg;
     }
 
     /*!
-     * \brief Invokes handler method assigned to this subscription.
-     * \param data updated subscription data buffer
+     * \brief Invokes handler method assigned to this stream.
+     * \param data updated stream data buffer
      * \details Should be used by QAbstractGrpcChannel implementations,
-     *          to update data in subscription and notify clients about subscription updates.
+     *          to update data in stream and notify clients about stream updates.
      */
     void handler(const QByteArray& data) {
         setData(data);
         for (auto handler : m_handlers) {
             handler(data);
         }
-        updated();
+        messageReceived();
     }
 
 signals:
     /*!
-     * \brief The signal is emitted when subscription received updated value from server
+     * \brief The signal is emitted when stream received updated value from server
      */
-    void updated();
+    void messageReceived();
 
 protected:
     //! \private
-    QGrpcSubscription(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                      const QByteArray &arg, const SubscriptionHandler &handler, QAbstractGrpcClient *parent);
+    QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
+                      const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent);
     //! \private
-    virtual ~QGrpcSubscription() = default;
+    virtual ~QGrpcStream() = default;
 
     //! \private
-    void addHandler(const SubscriptionHandler &handler);
+    void addHandler(const StreamHandler &handler);
 
-    bool operator ==(const QGrpcSubscription &other) const {
+    bool operator ==(const QGrpcStream &other) const {
         return other.method() == this->method() &&
                 other.arg() == this->arg();
     }
@@ -105,7 +105,7 @@ private:
     friend class QAbstractGrpcClient;
     QString m_method;
     QByteArray m_arg;
-    std::vector<SubscriptionHandler> m_handlers;
+    std::vector<StreamHandler> m_handlers;
 };
 
 }

+ 2 - 2
src/grpc/qtgrpcglobal.h

@@ -67,7 +67,7 @@
 
 namespace QtProtobuf {
 class QGrpcAsyncReply;
-class QGrpcSubscription;
+class QGrpcStream;
 using QGrpcAsyncReplyShared = std::shared_ptr<QGrpcAsyncReply>;
-using QGrpcSubscriptionShared = std::shared_ptr<QGrpcSubscription>;
+using QGrpcStreamShared = std::shared_ptr<QGrpcStream>;
 }

+ 2 - 2
src/grpc/quick/CMakeLists.txt

@@ -18,12 +18,12 @@ else()
 endif()
 
 file(GLOB SOURCES
-    qquickgrpcsubscription.cpp
+    qquickgrpcstream.cpp
     qtgrpcquickplugin.cpp)
 
 file(GLOB HEADERS
     qtgrpcquickplugin.h
-    qquickgrpcsubscription_p.h
+    qquickgrpcstream_p.h
     qtgrpcquick_global.h)
 
 add_library(${TARGET} ${SOURCES})

+ 21 - 21
src/grpc/quick/qquickgrpcsubscription.cpp → src/grpc/quick/qquickgrpcstream.cpp

@@ -24,34 +24,34 @@
  */
 
 
-#include "qquickgrpcsubscription_p.h"
+#include "qquickgrpcstream_p.h"
 
-#include <QGrpcSubscription>
+#include <QGrpcStream>
 #include <QJSEngine>
 #include <QQmlEngine>
 
 using namespace QtProtobuf;
 
-QQuickGrpcSubscription::QQuickGrpcSubscription(QObject *parent) : QObject(parent)
+QQuickGrpcStream::QQuickGrpcStream(QObject *parent) : QObject(parent)
   , m_enabled(false)
   , m_returnValue(nullptr)
 {
 }
 
-QQuickGrpcSubscription::~QQuickGrpcSubscription()
+QQuickGrpcStream::~QQuickGrpcStream()
 {
-    if (m_subscription) {
-        m_subscription->cancel();
+    if (m_stream) {
+        m_stream->cancel();
     }
     delete m_returnValue;
 }
 
 
-void QQuickGrpcSubscription::updateSubscription()
+void QQuickGrpcStream::updateStream()
 {
-    if (m_subscription) {
-        m_subscription->cancel();
-        m_subscription.reset();
+    if (m_stream) {
+        m_stream->cancel();
+        m_stream.reset();
     }
 
     if (m_returnValue != nullptr) {
@@ -69,14 +69,14 @@ void QQuickGrpcSubscription::updateSubscription()
     }
 }
 
-bool QQuickGrpcSubscription::subscribe()
+bool QQuickGrpcStream::subscribe()
 {
     QString uppercaseMethodName = m_method;
     uppercaseMethodName.replace(0, 1, m_method[0].toUpper());
     const QMetaObject *metaObject = m_client->metaObject();
     QMetaMethod method;
     for (int i = 0; i < metaObject->methodCount(); i++) {
-        if (QString("qmlSubscribe%1Updates_p").arg(uppercaseMethodName) == metaObject->method(i).name()) {
+        if (QString("qmlSubscribe%1_p").arg(uppercaseMethodName) == metaObject->method(i).name()) {
             method = metaObject->method(i);
             break;
         }
@@ -150,27 +150,27 @@ bool QQuickGrpcSubscription::subscribe()
         return false;
     }
 
-    QGrpcSubscriptionShared subscription = nullptr;
+    QGrpcStreamShared stream = nullptr;
     bool ok = method.invoke(m_client, Qt::DirectConnection,
-                                      QGenericReturnArgument("QtProtobuf::QGrpcSubscriptionShared", static_cast<void *>(&subscription)),
+                                      QGenericReturnArgument("QtProtobuf::QGrpcStreamShared", static_cast<void *>(&stream)),
                                       QGenericArgument(method.parameterTypes().at(0).data(), static_cast<const void *>(&argument)),
                                       QGenericArgument(method.parameterTypes().at(1).data(), static_cast<const void *>(&m_returnValue)));
-    if (!ok || subscription == nullptr) {
-        errorString = QString("Unable to call ") + m_method + " invalidate subscription.";
+    if (!ok || stream == nullptr) {
+        errorString = QString("Unable to call ") + m_method + " invalidate stream.";
         qProtoWarning() << errorString;
         error({QGrpcStatus::Unknown, errorString});
         return false;
     }
 
-    m_subscription = subscription;
+    m_stream = stream;
 
-    connect(m_subscription.get(), &QGrpcSubscription::updated, this, [this](){
-        updated(qjsEngine(this)->toScriptValue(m_returnValue));
+    connect(m_stream.get(), &QGrpcStream::messageReceived, this, [this](){
+        messageReceived(qjsEngine(this)->toScriptValue(m_returnValue));
     });
 
-    connect(m_subscription.get(), &QGrpcSubscription::error, this, &QQuickGrpcSubscription::error);//TODO: Probably it's good idea to disable subscription here
+    connect(m_stream.get(), &QGrpcStream::error, this, &QQuickGrpcStream::error);//TODO: Probably it's good idea to disable stream here
 
-    connect(m_subscription.get(), &QGrpcSubscription::finished, this, [this](){ m_subscription.reset(); setEnabled(false); });
+    connect(m_stream.get(), &QGrpcStream::finished, this, [this](){ m_stream.reset(); setEnabled(false); });
 
     return true;
 }

+ 30 - 30
src/grpc/quick/qquickgrpcsubscription_p.h → src/grpc/quick/qquickgrpcstream_p.h

@@ -31,40 +31,40 @@
 
 /*!
  * \ingroup QtGrpcQML
- * \class GrpcSubscription
- * \brief GrpcSubscription provides access to gRPC subscriptions from QML.
+ * \class GrpcStream
+ * \brief GrpcStream provides access to gRPC streams from QML.
  *
- * \details GrpcSubscription might be used from QML code to receive updates for gRPC server or bidirectional streaming methods.
+ * \details GrpcStream might be used from QML code to receive updates for gRPC server or bidirectional streaming methods.
  * Follwing properties should be provided and can not be empty, to subscribe streaming method:
  * - client
  * - method
  * - argument
- * Changing any of this properties cause subscription cancelation and re-initialization.
+ * Changing any of this properties cause stream cancelation and re-initialization.
  *
  * \subsection Properties
  * \subsubsection client QtProtobuf::QAbstractGrpcClient *client
- * \details Client used for subscription.
+ * \details Client used for stream.
  *
  * \subsubsection enabled bool enabled
- * \details Controls subscription status. If subscription is active, switch this flag to 'false'
- * to cancel subscription. Switching to 'false' keeps all fields ready to restore subscription.
+ * \details Controls stream status. If stream is active, switch this flag to 'false'
+ * to cancel stream. Switching to 'false' keeps all fields ready to restore stream.
  *
  * \subsubsection method QString method
- * \details The name of streaming method that will be used for subscription.
+ * \details The name of streaming method that will be used for stream.
  *
  * \subsubsection argument QObject *argument
- * \details Pointer to argument that will be used for subscription.
+ * \details Pointer to argument that will be used for stream.
  *
  * \subsubsection returnValue QObject *returnValue
- * \details Value returned by the subscription (Note that it is the same "return" object passed by the "updated" signal)
+ * \details Value returned by the stream (Note that it is the same "return" object passed by the "updated" signal)
  *
  * \subsection Signals
- * \subsubsection updated updated(ReturnType value)
- * \details The signal notifies about received update for subscription. It provides "return" value ready for use in QML.
+ * \subsubsection updated messageReceived(ReturnType value)
+ * \details The signal notifies about received update for stream. It provides "return" value ready for use in QML.
  * \code
- * GrpcSubscription {
+ * GrpcStream {
  *     ...
- *     onUpdated: {
+ *     onMessageReceived: {
  *         //Use value passed as argument to binding
  *         ...
  *     }
@@ -72,15 +72,15 @@
  * \endcode
  *
  * \subsubsection error error(QtProtobuf::QGrpcStatus status)
- * \details The signal notifies about error occured for subscription. Provides GrpcStatus as argument to assigned handler.
- * \note Some errors at subscription initialization phase disable GrpcSubscription
+ * \details The signal notifies about error occured for stream. Provides GrpcStatus as argument to assigned handler.
+ * \note Some errors at stream initialization phase disable GrpcStream
  *
  * \code
- * GrpcSubscription {
- *     id: mySubscription
+ * GrpcStream {
+ *     id: myStream
  *     ...
  *     onError: {
- *         console.log("Subscription error: " + status.code + " " + status.message)
+ *         console.log("Stream error: " + status.code + " " + status.message)
  *     }
  * }
  * \endcode
@@ -90,10 +90,10 @@ class QJSValue;
 
 namespace QtProtobuf {
 
-class QGrpcSubscription;
+class QGrpcStream;
 
 //! \private
-class QQuickGrpcSubscription : public QObject
+class QQuickGrpcStream : public QObject
 {
     Q_OBJECT
     Q_PROPERTY(QAbstractGrpcClient *client READ client WRITE setClient NOTIFY clientChanged)
@@ -103,8 +103,8 @@ class QQuickGrpcSubscription : public QObject
     Q_PROPERTY(QObject *returnValue READ returnValue NOTIFY returnValueChanged)
 
 public:
-    QQuickGrpcSubscription(QObject *parent = nullptr);
-    ~QQuickGrpcSubscription();
+    QQuickGrpcStream(QObject *parent = nullptr);
+    ~QQuickGrpcStream();
 
     QAbstractGrpcClient *client() const {
         return m_client;
@@ -133,7 +133,7 @@ public:
 
         m_client = client;
         emit clientChanged();
-        updateSubscription();
+        updateStream();
     }
 
     void setEnabled(bool enabled) {
@@ -143,7 +143,7 @@ public:
 
         m_enabled = enabled;
         emit enabledChanged();
-        updateSubscription();
+        updateStream();
     }
 
     void setMethod(QString method) {
@@ -153,7 +153,7 @@ public:
 
         m_method = method;
         emit methodChanged();
-        updateSubscription();
+        updateStream();
     }
 
     void setArgument(QObject *argument) {
@@ -163,11 +163,11 @@ public:
 
         m_argument = argument;
         emit argumentChanged();
-        updateSubscription();
+        updateStream();
     }
 
 signals:
-    void updated(const QJSValue &value);
+    void messageReceived(const QJSValue &value);
     void error(const QtProtobuf::QGrpcStatus &status);
 
     void clientChanged();
@@ -177,13 +177,13 @@ signals:
     void returnValueChanged();
 
 private:
-    void updateSubscription();
+    void updateStream();
     bool subscribe();
     QPointer<QAbstractGrpcClient> m_client;
     bool m_enabled;
     QString m_method;
     QPointer<QObject> m_argument;
-    std::shared_ptr<QGrpcSubscription> m_subscription;
+    std::shared_ptr<QGrpcStream> m_stream;
     QObject *m_returnValue;
 };
 

+ 2 - 2
src/grpc/quick/qtgrpcquickplugin.cpp

@@ -25,7 +25,7 @@
 
 #include "qtgrpcquickplugin.h"
 
-#include "qquickgrpcsubscription_p.h"
+#include "qquickgrpcstream_p.h"
 
 #include <QDebug>
 #include <QQmlEngine>
@@ -39,5 +39,5 @@ void QtGrpcQuickPlugin::registerTypes(const char *uri)
     Q_ASSERT(uri == QLatin1String("QtGrpc"));
 
     qmlRegisterUncreatableType<QtProtobuf::QGrpcStatus>("QtGrpc", QT_PROTOBUF_VERSION_MAJOR, QT_PROTOBUF_VERSION_MINOR, "GrpcStatus", "GrpcStatus only could be recevied from gRPC calls");
-    qmlRegisterType<QQuickGrpcSubscription>("QtGrpc", QT_PROTOBUF_VERSION_MAJOR, QT_PROTOBUF_VERSION_MINOR, "GrpcSubscription");
+    qmlRegisterType<QQuickGrpcStream>("QtGrpc", QT_PROTOBUF_VERSION_MAJOR, QT_PROTOBUF_VERSION_MINOR, "GrpcStream");
 }

+ 48 - 48
tests/test_grpc/clienttest.cpp

@@ -237,9 +237,9 @@ TEST_P(ClientTest, StringEchoStreamTest)
     QEventLoop waiter;
 
     int i = 0;
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
-        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    QObject::connect(stream.get(), &QGrpcStream::messageReceived, &m_app, [&result, &i, &waiter, stream]() {
+        SimpleStringMessage ret = stream->read<SimpleStringMessage>();
 
         ++i;
 
@@ -269,15 +269,15 @@ TEST_P(ClientTest, StringEchoStreamAbortTest)
     QEventLoop waiter;
 
     int i = 0;
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
-        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    QObject::connect(stream.get(), &QGrpcStream::messageReceived, &m_app, [&result, &i, &waiter, stream]() {
+        SimpleStringMessage ret = stream->read<SimpleStringMessage>();
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
 
         if (i == 3) {
-            subscription->cancel();
+            stream->cancel();
             QTimer::singleShot(4000, &waiter, &QEventLoop::quit);
         }
     });
@@ -301,23 +301,23 @@ TEST_P(ClientTest, StringEchoStreamAbortByTimerTest)
 
 
     int i = 0;
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    QTimer::singleShot(3500, subscription.get(), [subscription]() {
-        subscription->cancel();
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    QTimer::singleShot(3500, stream.get(), [stream]() {
+        stream->cancel();
     });
 
     bool isFinished = false;
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::finished, [&isFinished]() {
         isFinished = true;
     });
 
     bool isError = false;
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::error, [&isError]() {
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::error, [&isError]() {
         isError = true;
     });
 
-    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
-        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+    QObject::connect(stream.get(), &QGrpcStream::messageReceived, &m_app, [&result, &i, stream]() {
+        SimpleStringMessage ret = stream->read<SimpleStringMessage>();
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
@@ -333,7 +333,7 @@ TEST_P(ClientTest, StringEchoStreamAbortByTimerTest)
     testClient->deleteLater();
 }
 
-TEST_P(ClientTest, StringEchoStreamTestRetUpdates)
+TEST_P(ClientTest, StringEchoStreamTestRet)
 {
     auto testClient = (*GetParam())();
     SimpleStringMessage request;
@@ -343,7 +343,7 @@ TEST_P(ClientTest, StringEchoStreamTestRetUpdates)
 
     QEventLoop waiter;
 
-    testClient->subscribeTestMethodServerStreamUpdates(request, result);
+    testClient->subscribeTestMethodServerStream(request, result);
 
     int i = 0;
     QObject::connect(result.data(), &SimpleStringMessage::testFieldStringChanged, &m_app, [&i]() {
@@ -372,10 +372,10 @@ TEST_P(ClientTest, HugeBlobEchoStreamTest)
     QByteArray dataHash = QCryptographicHash::hash(request.testBytes(), QCryptographicHash::Sha256);
     QEventLoop waiter;
 
-    auto subscription = testClient->subscribeTestMethodBlobServerStreamUpdates(request);
+    auto stream = testClient->subscribeTestMethodBlobServerStream(request);
 
-    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &waiter, subscription]() {
-        BlobMessage ret = subscription->read<BlobMessage>();
+    QObject::connect(stream.get(), &QGrpcStream::messageReceived, &m_app, [&result, &waiter, stream]() {
+        BlobMessage ret = stream->read<BlobMessage>();
         result.setTestBytes(ret.testBytes());
         waiter.quit();
     });
@@ -570,7 +570,7 @@ TEST_P(ClientTest, AsyncReplySubscribeTest)
     testClient->deleteLater();
 }
 
-TEST_P(ClientTest, MultipleSubscriptionsTest)
+TEST_P(ClientTest, MultipleStreamsTest)
 {
     auto testClient = (*GetParam())();
     SimpleStringMessage result;
@@ -578,14 +578,14 @@ TEST_P(ClientTest, MultipleSubscriptionsTest)
     QEventLoop waiter;
     request.setTestFieldString("Stream");
 
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    auto subscriptionNext = testClient->subscribeTestMethodServerStreamUpdates(request);
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    auto streamNext = testClient->subscribeTestMethodServerStream(request);
 
-    ASSERT_EQ(subscription, subscriptionNext);
+    ASSERT_EQ(stream, streamNext);
 
     int i = 0;
-    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
-        SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+    QObject::connect(stream.get(), &QGrpcStream::messageReceived, &m_app, [&result, &i, stream]() {
+        SimpleStringMessage ret = stream->read<SimpleStringMessage>();
         ++i;
 
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
@@ -599,51 +599,51 @@ TEST_P(ClientTest, MultipleSubscriptionsTest)
     testClient->deleteLater();
 }
 
-TEST_P(ClientTest, MultipleSubscriptionsCancelTest)
+TEST_P(ClientTest, MultipleStreamsCancelTest)
 {
     auto testClient = (*GetParam())();
     SimpleStringMessage result;
     SimpleStringMessage request;
     request.setTestFieldString("Stream");
 
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    auto subscriptionNext = testClient->subscribeTestMethodServerStreamUpdates(request);
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    auto streamNext = testClient->subscribeTestMethodServerStream(request);
 
-    ASSERT_EQ(subscription, subscriptionNext);
+    ASSERT_EQ(stream, streamNext);
 
     bool isFinished = false;
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::finished, [&isFinished]() {
         isFinished = true;
     });
 
     bool isFinishedNext = false;
-    QObject::connect(subscriptionNext.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
+    QObject::connect(streamNext.get(), &QtProtobuf::QGrpcStream::finished, [&isFinishedNext]() {
         isFinishedNext = true;
     });
 
-    subscriptionNext->cancel();
+    streamNext->cancel();
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
 
-    subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    ASSERT_NE(subscription, subscriptionNext);
+    stream = testClient->subscribeTestMethodServerStream(request);
+    ASSERT_NE(stream, streamNext);
 
-    subscriptionNext = testClient->subscribeTestMethodServerStreamUpdates(request);
+    streamNext = testClient->subscribeTestMethodServerStream(request);
 
-    ASSERT_EQ(subscription, subscriptionNext);
+    ASSERT_EQ(stream, streamNext);
 
     isFinished = false;
-    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::finished, [&isFinished]() {
         isFinished = true;
     });
 
     isFinishedNext = false;
-    QObject::connect(subscriptionNext.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
+    QObject::connect(streamNext.get(), &QtProtobuf::QGrpcStream::finished, [&isFinishedNext]() {
         isFinishedNext = true;
     });
 
-    subscription->cancel();
+    stream->cancel();
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
@@ -744,19 +744,19 @@ TEST_P(ClientTest, StringEchoStreamThreadTest)
     std::shared_ptr<QThread> thread(QThread::create([&](){
         QEventLoop waiter;
         QThread *validThread = QThread::currentThread();
-        auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-        QObject::connect(subscription.get(), &QGrpcSubscription::updated, &waiter, [&result, &i, &waiter, subscription, &threadsOk, validThread]() {
-            SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+        auto stream = testClient->subscribeTestMethodServerStream(request);
+        QObject::connect(stream.get(), &QGrpcStream::messageReceived, &waiter, [&result, &i, &waiter, stream, &threadsOk, validThread]() {
+            SimpleStringMessage ret = stream->read<SimpleStringMessage>();
             result.setTestFieldString(result.testFieldString() + ret.testFieldString());
             ++i;
             if (i == 4) {
                 waiter.quit();
             }
-            threadsOk &= subscription->thread() != QThread::currentThread();
+            threadsOk &= stream->thread() != QThread::currentThread();
             threadsOk &= validThread == QThread::currentThread();
         });
 
-        threadsOk &= subscription->thread() != QThread::currentThread();
+        threadsOk &= stream->thread() != QThread::currentThread();
         QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
         waiter.exec();
     }));
@@ -796,13 +796,13 @@ TEST_P(ClientTest, StreamCancelWhileErrorTimeoutTest)
     QEventLoop waiter;
 
     bool ok = false;
-    auto subscription = testClient->subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(subscription.get(), &QGrpcSubscription::finished, &m_app, [&ok, &waiter]() {
+    auto stream = testClient->subscribeTestMethodServerStream(request);
+    QObject::connect(stream.get(), &QGrpcStream::finished, &m_app, [&ok, &waiter]() {
         ok = true;
         waiter.quit();
     });
-    subscription->cancel();
-    subscription.reset();
+    stream->cancel();
+    stream.reset();
 
     QTimer::singleShot(5000, &waiter, &QEventLoop::quit);
     waiter.exec();

+ 47 - 43
tests/test_grpc_qml/qml/tst_grpc.qml

@@ -34,7 +34,6 @@ TestCase {
     name: "gRPC client qml test"
     SimpleStringMessage {
         id: stringMsg
-        testFieldString: "Test string"
     }
 
     SimpleIntMessage {
@@ -44,11 +43,11 @@ TestCase {
 
     QtObject {
         id: returnMsg
-        property string ret: serverStreamSubscription.returnValue.testFieldString
+        property string ret: serverStream.returnValue.testFieldString
     }
 
-    GrpcSubscription {
-        id: serverStreamSubscription
+    GrpcStream {
+        id: serverStream
         property bool ok: true
         property int updateCount: 0
 
@@ -56,18 +55,18 @@ TestCase {
         client: TestServiceClient
         method: "testMethodServerStream"
         argument: stringMsg
-        onUpdated: {
+        onMessageReceived: {
             ++updateCount;
-            ok = ok && value.testFieldString === ("Test string" + updateCount) && returnMsg.ret == ("Test string" + updateCount)
+            ok = ok && value.testFieldString === ("test_serverStream" + updateCount) && returnMsg.ret == ("test_serverStream" + updateCount)
         }
         onError: {
-            console.log("Subscription error: " + status.code + " " + status.message)
+            console.log("Stream error: " + status.code + " " + status.message)
             ok = false;
         }
     }
 
-    GrpcSubscription {
-        id: serverStreamCancelSubscription
+    GrpcStream {
+        id: serverStreamCancel
         property bool ok: true
         property int updateCount: 0
 
@@ -75,27 +74,27 @@ TestCase {
         client: TestServiceClient
         method: "testMethodServerStream"
         argument: stringMsg
-        onUpdated: {
+        onMessageReceived: {
             ++updateCount;
-            ok = ok && value.testFieldString === "Test string" + updateCount
+            ok = ok && value.testFieldString === "test_serverStreamCancel" + updateCount
             if (updateCount === 3) {
-                serverStreamCancelSubscription.enabled = false;
+                serverStreamCancel.enabled = false;
             }
         }
         onError: {
-            console.log("Subscription error: " + status.code + " " + status.message)
+            console.log("Stream error: " + status.code + " " + status.message)
             ok = false;
         }
     }
 
-    GrpcSubscription {
-        id: serverStreamInvalidSubscription
+    GrpcStream {
+        id: serverStreamInvalid
         property bool ok: false
         enabled: false
         client: TestServiceClient
         method: "testMethodServerStreamNotExist"
         argument: stringMsg
-        onUpdated: {
+        onMessageReceived: {
             ok = false;
         }
         onError: {
@@ -104,29 +103,30 @@ TestCase {
     }
 
     Loader {
-        id: subscriptionLoader
+        id: streamLoader
         active: false
+        property bool ok: true
+        property int updateCount: 0
         sourceComponent: Component {
-            GrpcSubscription {
-                property bool ok: true
-                property int updateCount: 0
+            GrpcStream {
                 enabled: true
                 client: TestServiceClient
                 method: "testMethodServerStream"
                 argument: stringMsg
-                onUpdated: {
-                    ++updateCount;
-                    ok = ok && value.testFieldString === ("Test string" + updateCount)
+                onMessageReceived: {
+                    ++streamLoader.updateCount;
+                    streamLoader.ok = ok && value.testFieldString === ("test_1loader" + streamLoader.updateCount)
                 }
                 onError: {
-                    console.log("Subscription error: " + status.code + " " + status.message)
-                    ok = false;
+                    console.log("Stream error: " + status.code + " " + status.message)
+                    streamLoader.ok = false;
                 }
             }
         }
     }
 
     function test_stringEchoTest() {
+        stringMsg.testFieldString = "test_stringEchoTest";
         var called = false;
         var errorCalled = false;
         TestServiceClient.testMethod(stringMsg, function(result) {
@@ -139,6 +139,7 @@ TestCase {
     }
 
     function test_statusTest() {
+        stringMsg.testFieldString = "test_statusTest";
         var called = false;
         var errorCalled = false;
         TestServiceClient.testMethodStatusMessage(stringMsg, function(result) {
@@ -163,31 +164,32 @@ TestCase {
         compare(!called && !errorCalled, true, "testMethod was not called proper way")
         wait(1000)
         compare(called && !errorCalled, true, "testMethod was not called proper way")
-        stringMsg.testFieldString = "Test string";
     }
 
-    function test_serverStreamSubscription() {
-        serverStreamSubscription.enabled = true;
+    function test_serverStream() {
+        stringMsg.testFieldString = "test_serverStream";
+        serverStream.enabled = true;
         wait(20000);
-        compare(serverStreamSubscription.ok, true, "Subscription data failed")
-        compare(serverStreamSubscription.updateCount, 4, "Subscription failed, update was not called right amount times")
+        compare(serverStream.ok, true, "Stream data failed")
+        compare(serverStream.updateCount, 4, "Stream failed, update was not called right amount times")
     }
 
-    function test_serverStreamCancelSubscription() {
-        serverStreamCancelSubscription.enabled = true;
+    function test_serverStreamCancel() {
+        stringMsg.testFieldString = "test_serverStreamCancel";
+        serverStreamCancel.enabled = true;
         wait(20000);
-        compare(serverStreamCancelSubscription.ok, true, "Subscription data failed")
-        compare(serverStreamCancelSubscription.updateCount, 3, "Subscription failed, update was not called right amount times")
+        compare(serverStreamCancel.ok, true, "Stream data failed")
+        compare(serverStreamCancel.updateCount, 3, "Stream failed, update was not called right amount times")
     }
 
-    function test_serverStreamInvalidSubscription() {
-        serverStreamInvalidSubscription.enabled = true;
+    function test_serverStreamInvalid() {
+        serverStreamInvalid.enabled = true;
         wait(500);
-        compare(serverStreamInvalidSubscription.ok, true, "Subscription data failed")
-        compare(serverStreamInvalidSubscription.enabled, false, "Subscription data failed")
+        compare(serverStreamInvalid.ok, true, "Stream data failed")
+        compare(serverStreamInvalid.enabled, false, "Stream data failed")
     }
 
-    function test_nonCompatibleArgRet() {
+    function test_nonCompatibleArgRet() {        
         var called = false;
         var errorCalled = false;
         TestServiceClient.testMethodNonCompatibleArgRet(intMsg, function(result) {
@@ -200,11 +202,13 @@ TestCase {
     }
 
     function test_1loader() {//This test has to be called first to fail other tests in case if it fails
-        subscriptionLoader.active = true;
+        stringMsg.testFieldString = "test_1loader";
+        streamLoader.active = true;
         wait(1500);
-        compare(subscriptionLoader.item.ok, true, "Subscription data failed")
-        compare(subscriptionLoader.item.updateCount, 1, "Subscription failed, update was not called right amount times")
-        subscriptionLoader.active = false;
+        streamLoader.active = false;
+        wait(2000);
+        compare(streamLoader.ok, true, "Stream data failed")
+        compare(streamLoader.updateCount, 1, "Stream failed, update was not called right amount times")
     }
 
     SimpleStringMessage {