ソースを参照

Add disconnect for subscription

- Fix lambda memory leak for subscription, when error occured

Fixes: #157
Alexey Edelev 4 年 前
コミット
bbe7d5f81b
2 ファイル変更21 行追加14 行削除
  1. 4 2
      src/grpc/qabstractgrpcclient.cpp
  2. 17 12
      src/grpc/qgrpchttp2channel.cpp

+ 4 - 2
src/grpc/qabstractgrpcclient.cpp

@@ -149,14 +149,15 @@ QGrpcSubscriptionShared QAbstractGrpcClient::subscribe(const QString &method, co
             return *it; //If subscription already exists return it for handling
         }
 
-        connect(subscription.get(), &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
+        auto errorConnection = std::make_shared<QMetaObject::Connection>();
+        *errorConnection = connect(subscription.get(), &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription error: " << status.message();
             error(status);
             dPtr->channel->subscribe(subscription.get(), dPtr->service, this);
         });
 
         auto finishedConnection = std::make_shared<QMetaObject::Connection>();
-        *finishedConnection = connect(subscription.get(), &QGrpcSubscription::finished, this, [this, subscription, finishedConnection]() mutable {
+        *finishedConnection = connect(subscription.get(), &QGrpcSubscription::finished, this, [this, subscription, errorConnection, finishedConnection]() mutable {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription finished";
             auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscriptionShared activeSubscription) {
                return *activeSubscription == *subscription;
@@ -165,6 +166,7 @@ QGrpcSubscriptionShared QAbstractGrpcClient::subscribe(const QString &method, co
             if (it != std::end(dPtr->activeSubscriptions)) {
                 dPtr->activeSubscriptions.erase(it);
             }
+            QObject::disconnect(*errorConnection);
             QObject::disconnect(*finishedConnection);
             subscription.reset();
         });

+ 17 - 12
src/grpc/qgrpchttp2channel.cpp

@@ -110,6 +110,7 @@ struct QGrpcHttp2ChannelPrivate {
     std::unique_ptr<QAbstractGrpcCredentials> credentials;
     QSslConfiguration sslConfig;
     std::unordered_map<QNetworkReply *, ExpectedData> activeStreamReplies;
+    QObject lambdaContext;
 
     QNetworkReply *post(const QString &method, const QString &service, const QByteArray &args, bool stream = false) {
         QUrl callUrl = url;
@@ -155,6 +156,8 @@ struct QGrpcHttp2ChannelPrivate {
     static void abortNetworkReply(QNetworkReply *networkReply) {
         if (networkReply->isRunning()) {
             networkReply->abort();
+        } else {
+            networkReply->deleteLater();
         }
     }
 
@@ -192,8 +195,6 @@ struct QGrpcHttp2ChannelPrivate {
     static int getExpectedDataSize(const QByteArray &container) {
         return qFromBigEndian(*reinterpret_cast<const int *>(container.data() + 1)) + GrpcMessageSizeHeaderSize;
     }
-
-    QObject lambdaContext;
 };
 
 }
@@ -222,6 +223,7 @@ QGrpcStatus QGrpcHttp2Channel::call(const QString &method, const QString &servic
     QGrpcStatus::StatusCode grpcStatus = QGrpcStatus::StatusCode::Unknown;
     ret = dPtr->processReply(networkReply, grpcStatus);
 
+    networkReply->deleteLater();
     qProtoDebug() << __func__ << "RECV: " << ret.toHex() << "grpcStatus" << grpcStatus;
     return {grpcStatus, QString::fromUtf8(networkReply->rawHeader(GrpcStatusMessage))};
 }
@@ -251,6 +253,7 @@ void QGrpcHttp2Channel::call(const QString &method, const QString &service, cons
             reply->setData({});
             reply->error({grpcStatus, QString::fromUtf8(networkReply->rawHeader(GrpcStatusMessage))});
         }
+        networkReply->deleteLater();
     });
 
     *abortConnection = QObject::connect(reply, &QGrpcAsyncReply::error, networkReply, [networkReply, connection, abortConnection] (const QGrpcStatus &status) {
@@ -261,7 +264,7 @@ void QGrpcHttp2Channel::call(const QString &method, const QString &service, cons
             if (*abortConnection) {
                 QObject::disconnect(*abortConnection);
             }
-            QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+            networkReply->deleteLater();
         }
     });
 }
@@ -271,7 +274,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
     assert(subscription != nullptr);
     QNetworkReply *networkReply = dPtr->post(subscription->method(), service, subscription->arg(), true);
 
-    std::shared_ptr<QMetaObject::Connection> connection(new QMetaObject::Connection);
+    std::shared_ptr<QMetaObject::Connection> finishConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> abortConnection(new QMetaObject::Connection);
     std::shared_ptr<QMetaObject::Connection> readConnection(new QMetaObject::Connection);
     *readConnection = QObject::connect(networkReply, &QNetworkReply::readyRead, subscription, [networkReply, subscription, this]() {
@@ -314,21 +317,22 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
         }
     });
 
-    QObject::connect(client, &QAbstractGrpcClient::destroyed, networkReply, [networkReply, connection, abortConnection, readConnection, this]() {
+    QObject::connect(client, &QAbstractGrpcClient::destroyed, networkReply, [networkReply, finishConnection, abortConnection, readConnection, this]() {
         if (*readConnection) {
             QObject::disconnect(*readConnection);
         }
         if (*abortConnection) {
             QObject::disconnect(*abortConnection);
         }
-        if (*connection) {
-            QObject::disconnect(*connection);
+        if (*finishConnection) {
+            QObject::disconnect(*finishConnection);
         }
         dPtr->activeStreamReplies.erase(networkReply);
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+        networkReply->deleteLater();
     });
 
-    *connection = QObject::connect(networkReply, &QNetworkReply::finished, subscription, [subscription, service, networkReply, abortConnection, readConnection, client, this]() {
+    *finishConnection = QObject::connect(networkReply, &QNetworkReply::finished, subscription, [subscription, service, networkReply, abortConnection, readConnection, finishConnection, client, this]() {
         QString errorString = networkReply->errorString();
         QNetworkReply::NetworkError networkError = networkReply->error();
         if (*readConnection) {
@@ -340,7 +344,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
 
         dPtr->activeStreamReplies.erase(networkReply);
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
-
+        networkReply->deleteLater();
         qProtoWarning() << subscription->method() << "call" << service << "subscription finished: " << errorString;
         switch (networkError) {
         case QNetworkReply::RemoteHostClosedError:
@@ -356,9 +360,9 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
         }
     });
 
-    *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, networkReply, [networkReply, connection, abortConnection, readConnection] {
-        if (*connection) {
-            QObject::disconnect(*connection);
+    *abortConnection = QObject::connect(subscription, &QGrpcSubscription::finished, networkReply, [networkReply, finishConnection, abortConnection, readConnection] {
+        if (*finishConnection) {
+            QObject::disconnect(*finishConnection);
         }
         if (*readConnection) {
             QObject::disconnect(*readConnection);
@@ -367,6 +371,7 @@ void QGrpcHttp2Channel::subscribe(QGrpcSubscription *subscription, const QString
             QObject::disconnect(*abortConnection);
         }
         QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
+        networkReply->deleteLater();
     });
 }