Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(482)

Unified Diff: remoting/protocol/jingle_session.cc

Issue 2417913002: Process incoming IQs in the same order that they were sent. (Closed)
Patch Set: Rebase Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « remoting/protocol/jingle_session.h ('k') | remoting/protocol/jingle_session_manager.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: remoting/protocol/jingle_session.cc
diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc
index 49b1f13deeaa98a3ed24c3ae1e95c1972fdc313d..6e6457abe14fc14294f4d79c3085ea4a04b46cb9 100644
--- a/remoting/protocol/jingle_session.cc
+++ b/remoting/protocol/jingle_session.cc
@@ -10,10 +10,9 @@
#include <utility>
#include "base/bind.h"
-#include "base/rand_util.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
-#include "base/strings/string_number_conversions.h"
+#include "base/strings/string_split.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "remoting/base/constants.h"
@@ -25,6 +24,7 @@
#include "remoting/protocol/transport.h"
#include "remoting/signaling/iq_sender.h"
#include "third_party/webrtc/libjingle/xmllite/xmlelement.h"
+#include "third_party/webrtc/libjingle/xmpp/constants.h"
#include "third_party/webrtc/p2p/base/candidate.h"
using buzz::XmlElement;
@@ -47,6 +47,13 @@ const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3;
// Timeout for the transport-info messages.
const int kTransportInfoTimeout = 10 * 60;
+// Special value for an invalid sequential ID for an incoming IQ.
+const int kInvalid = -1;
+
+// Special value indicating that any sequential ID is valid for the next
+// incoming IQ.
+const int kAny = -1;
+
ErrorCode AuthRejectionReasonToErrorCode(
Authenticator::RejectionReason reason) {
switch (reason) {
@@ -63,15 +70,107 @@ ErrorCode AuthRejectionReasonToErrorCode(
return UNKNOWN_ERROR;
}
+// Extracts a sequential id from the id attribute of the IQ stanza.
+int GetSequentialId(const std::string& id) {
+ std::vector<std::string> tokens =
+ SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
+ // Legacy endpoints does not encode the IQ ordering in the ID attribute
+ if (tokens.size() != 2) {
+ return kInvalid;
+ }
+
+ int result = kInvalid;
+ if (!base::StringToInt(tokens[1].c_str(), &result)) {
+ return kInvalid;
+ }
+ return result;
+}
+
} // namespace
+// A Queue that sorts incoming messages and returns them in the ascending order
+// of sequence ids. The sequence id can be extracted from the ID attribute of
+// an IQ stanza, which have the following format <opaque_string>_<sequence_id>.
+//
+// Background:
+// The chromoting signaling channel does not guarantee that the incoming IQs are
+// delivered in the order that it is sent.
+//
+// This behavior leads to transient session setup failures. For instance,
+// a <transport-info> that is sent after a <session-info> message is sometimes
+// delivered to the client out of order, causing the client to close the
+// session due to an unexpected request.
+class JingleSession::OrderedMessageQueue {
+ public:
+ OrderedMessageQueue() {}
+ ~OrderedMessageQueue() {}
+
+ // Returns the list of messages ordered by their sequential IDs.
+ std::vector<std::unique_ptr<PendingMessage>> OnIncomingMessage(
+ const std::string& id,
+ std::unique_ptr<PendingMessage>);
+
+ private:
+ // Implements an ordered list by using map with the |sequence_id| as the key,
+ // so that |queue_| is always sorted by |sequence_id|.
+ std::map<int, std::unique_ptr<PendingMessage>> queue_;
+
+ int next_incoming_ = kAny;
+
+ DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue);
+};
+
+std::vector<std::unique_ptr<JingleSession::PendingMessage>>
+JingleSession::OrderedMessageQueue::OnIncomingMessage(
+ const std::string& id,
+ std::unique_ptr<JingleSession::PendingMessage> message) {
+ std::vector<std::unique_ptr<JingleSession::PendingMessage>> result;
+ int current = GetSequentialId(id);
+ // If there is no sequencing order encoded in the id, just return the
+ // message.
+ if (current == kInvalid) {
+ result.push_back(std::move(message));
+ return result;
+ }
+
+ if (next_incoming_ == kAny) {
+ next_incoming_ = current;
+ }
+
+ // Ensure there are no duplicate sequence ids.
+ DCHECK_GE(current, next_incoming_);
+ DCHECK(queue_.find(current) == queue_.end());
+
+ queue_.insert(std::make_pair(current, std::move(message)));
+
+ auto it = queue_.begin();
+ while (it != queue_.end() && it->first == next_incoming_) {
+ result.push_back(std::move(it->second));
+ it = queue_.erase(it);
+ next_incoming_++;
+ }
+
+ if (current - next_incoming_ >= 3) {
+ LOG(WARNING) << "Multiple messages are missing: expected= "
+ << next_incoming_ << " current= " << current;
+ }
+ return result;
+};
+
+JingleSession::PendingMessage::PendingMessage(
+ std::unique_ptr<JingleMessage> message,
+ const ReplyCallback& reply_callback)
+ : message(std::move(message)), reply_callback(reply_callback) {}
+
+JingleSession::PendingMessage::~PendingMessage() {}
+
JingleSession::JingleSession(JingleSessionManager* session_manager)
: session_manager_(session_manager),
event_handler_(nullptr),
state_(INITIALIZING),
error_(OK),
- weak_factory_(this) {
-}
+ message_queue_(new OrderedMessageQueue),
+ weak_factory_(this) {}
JingleSession::~JingleSession() {
session_manager_->SessionDestroyed(this);
@@ -220,9 +319,12 @@ void JingleSession::SendTransportInfo(
peer_address_, JingleMessage::TRANSPORT_INFO, session_id_));
message->transport_info = std::move(transport_info);
+ std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
+ stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId());
+
auto request = session_manager_->iq_sender()->SendIq(
- message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse,
- base::Unretained(this)));
+ std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse,
+ base::Unretained(this)));
if (request) {
request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout));
transport_info_requests_.push_back(std::move(request));
@@ -283,9 +385,12 @@ void JingleSession::Close(protocol::ErrorCode error) {
void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) {
DCHECK(thread_checker_.CalledOnValidThread());
+ std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
+ stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId());
+
auto request = session_manager_->iq_sender()->SendIq(
- message->ToXml(), base::Bind(&JingleSession::OnMessageResponse,
- base::Unretained(this), message->action));
+ std::move(stanza), base::Bind(&JingleSession::OnMessageResponse,
+ base::Unretained(this), message->action));
int timeout = kDefaultMessageTimeout;
if (message->action == JingleMessage::SESSION_INITIATE ||
@@ -369,8 +474,22 @@ void JingleSession::OnTransportInfoResponse(IqRequest* request,
}
}
-void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message,
+void JingleSession::OnIncomingMessage(const std::string& id,
+ std::unique_ptr<JingleMessage> message,
const ReplyCallback& reply_callback) {
+ std::unique_ptr<PendingMessage> item(
+ new PendingMessage(std::move(message), reply_callback));
+ std::vector<std::unique_ptr<PendingMessage>> ordered =
+ message_queue_->OnIncomingMessage(id, std::move(item));
+ for (auto& message : ordered) {
+ ProcessIncomingMessage(std::move(message->message),
+ message->reply_callback);
+ }
+}
+
+void JingleSession::ProcessIncomingMessage(
+ std::unique_ptr<JingleMessage> message,
+ const ReplyCallback& reply_callback) {
DCHECK(thread_checker_.CalledOnValidThread());
if (peer_address_ != message->from) {
@@ -396,8 +515,7 @@ void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message,
}
if (!message->transport_info ||
- !transport_->ProcessTransportInfo(
- message->transport_info.get())) {
+ !transport_->ProcessTransportInfo(message->transport_info.get())) {
reply_callback.Run(JingleMessageReply::BAD_REQUEST);
return;
}
@@ -609,5 +727,9 @@ bool JingleSession::is_session_active() {
state_ == AUTHENTICATING || state_ == AUTHENTICATED;
}
+std::string JingleSession::GetNextOutgoingId() {
+ return outgoing_id_prefix_ + "_" + base::IntToString(++next_outgoing_id_);
+}
+
} // namespace protocol
} // namespace remoting
« no previous file with comments | « remoting/protocol/jingle_session.h ('k') | remoting/protocol/jingle_session_manager.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698