Index: remoting/protocol/jingle_session.cc |
diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc |
index 49b1f13deeaa98a3ed24c3ae1e95c1972fdc313d..6e6457abe14fc14294f4d79c3085ea4a04b46cb9 100644 |
--- a/remoting/protocol/jingle_session.cc |
+++ b/remoting/protocol/jingle_session.cc |
@@ -10,10 +10,9 @@ |
#include <utility> |
#include "base/bind.h" |
-#include "base/rand_util.h" |
#include "base/single_thread_task_runner.h" |
#include "base/stl_util.h" |
-#include "base/strings/string_number_conversions.h" |
+#include "base/strings/string_split.h" |
#include "base/threading/thread_task_runner_handle.h" |
#include "base/time/time.h" |
#include "remoting/base/constants.h" |
@@ -25,6 +24,7 @@ |
#include "remoting/protocol/transport.h" |
#include "remoting/signaling/iq_sender.h" |
#include "third_party/webrtc/libjingle/xmllite/xmlelement.h" |
+#include "third_party/webrtc/libjingle/xmpp/constants.h" |
#include "third_party/webrtc/p2p/base/candidate.h" |
using buzz::XmlElement; |
@@ -47,6 +47,13 @@ const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3; |
// Timeout for the transport-info messages. |
const int kTransportInfoTimeout = 10 * 60; |
+// Special value for an invalid sequential ID for an incoming IQ. |
+const int kInvalid = -1; |
+ |
+// Special value indicating that any sequential ID is valid for the next |
+// incoming IQ. |
+const int kAny = -1; |
+ |
ErrorCode AuthRejectionReasonToErrorCode( |
Authenticator::RejectionReason reason) { |
switch (reason) { |
@@ -63,15 +70,107 @@ ErrorCode AuthRejectionReasonToErrorCode( |
return UNKNOWN_ERROR; |
} |
+// Extracts a sequential id from the id attribute of the IQ stanza. |
+int GetSequentialId(const std::string& id) { |
+ std::vector<std::string> tokens = |
+ SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); |
+ // Legacy endpoints does not encode the IQ ordering in the ID attribute |
+ if (tokens.size() != 2) { |
+ return kInvalid; |
+ } |
+ |
+ int result = kInvalid; |
+ if (!base::StringToInt(tokens[1].c_str(), &result)) { |
+ return kInvalid; |
+ } |
+ return result; |
+} |
+ |
} // namespace |
+// A Queue that sorts incoming messages and returns them in the ascending order |
+// of sequence ids. The sequence id can be extracted from the ID attribute of |
+// an IQ stanza, which have the following format <opaque_string>_<sequence_id>. |
+// |
+// Background: |
+// The chromoting signaling channel does not guarantee that the incoming IQs are |
+// delivered in the order that it is sent. |
+// |
+// This behavior leads to transient session setup failures. For instance, |
+// a <transport-info> that is sent after a <session-info> message is sometimes |
+// delivered to the client out of order, causing the client to close the |
+// session due to an unexpected request. |
+class JingleSession::OrderedMessageQueue { |
+ public: |
+ OrderedMessageQueue() {} |
+ ~OrderedMessageQueue() {} |
+ |
+ // Returns the list of messages ordered by their sequential IDs. |
+ std::vector<std::unique_ptr<PendingMessage>> OnIncomingMessage( |
+ const std::string& id, |
+ std::unique_ptr<PendingMessage>); |
+ |
+ private: |
+ // Implements an ordered list by using map with the |sequence_id| as the key, |
+ // so that |queue_| is always sorted by |sequence_id|. |
+ std::map<int, std::unique_ptr<PendingMessage>> queue_; |
+ |
+ int next_incoming_ = kAny; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); |
+}; |
+ |
+std::vector<std::unique_ptr<JingleSession::PendingMessage>> |
+JingleSession::OrderedMessageQueue::OnIncomingMessage( |
+ const std::string& id, |
+ std::unique_ptr<JingleSession::PendingMessage> message) { |
+ std::vector<std::unique_ptr<JingleSession::PendingMessage>> result; |
+ int current = GetSequentialId(id); |
+ // If there is no sequencing order encoded in the id, just return the |
+ // message. |
+ if (current == kInvalid) { |
+ result.push_back(std::move(message)); |
+ return result; |
+ } |
+ |
+ if (next_incoming_ == kAny) { |
+ next_incoming_ = current; |
+ } |
+ |
+ // Ensure there are no duplicate sequence ids. |
+ DCHECK_GE(current, next_incoming_); |
+ DCHECK(queue_.find(current) == queue_.end()); |
+ |
+ queue_.insert(std::make_pair(current, std::move(message))); |
+ |
+ auto it = queue_.begin(); |
+ while (it != queue_.end() && it->first == next_incoming_) { |
+ result.push_back(std::move(it->second)); |
+ it = queue_.erase(it); |
+ next_incoming_++; |
+ } |
+ |
+ if (current - next_incoming_ >= 3) { |
+ LOG(WARNING) << "Multiple messages are missing: expected= " |
+ << next_incoming_ << " current= " << current; |
+ } |
+ return result; |
+}; |
+ |
+JingleSession::PendingMessage::PendingMessage( |
+ std::unique_ptr<JingleMessage> message, |
+ const ReplyCallback& reply_callback) |
+ : message(std::move(message)), reply_callback(reply_callback) {} |
+ |
+JingleSession::PendingMessage::~PendingMessage() {} |
+ |
JingleSession::JingleSession(JingleSessionManager* session_manager) |
: session_manager_(session_manager), |
event_handler_(nullptr), |
state_(INITIALIZING), |
error_(OK), |
- weak_factory_(this) { |
-} |
+ message_queue_(new OrderedMessageQueue), |
+ weak_factory_(this) {} |
JingleSession::~JingleSession() { |
session_manager_->SessionDestroyed(this); |
@@ -220,9 +319,12 @@ void JingleSession::SendTransportInfo( |
peer_address_, JingleMessage::TRANSPORT_INFO, session_id_)); |
message->transport_info = std::move(transport_info); |
+ std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
+ stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId()); |
+ |
auto request = session_manager_->iq_sender()->SendIq( |
- message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse, |
- base::Unretained(this))); |
+ std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse, |
+ base::Unretained(this))); |
if (request) { |
request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout)); |
transport_info_requests_.push_back(std::move(request)); |
@@ -283,9 +385,12 @@ void JingleSession::Close(protocol::ErrorCode error) { |
void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
+ std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
+ stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId()); |
+ |
auto request = session_manager_->iq_sender()->SendIq( |
- message->ToXml(), base::Bind(&JingleSession::OnMessageResponse, |
- base::Unretained(this), message->action)); |
+ std::move(stanza), base::Bind(&JingleSession::OnMessageResponse, |
+ base::Unretained(this), message->action)); |
int timeout = kDefaultMessageTimeout; |
if (message->action == JingleMessage::SESSION_INITIATE || |
@@ -369,8 +474,22 @@ void JingleSession::OnTransportInfoResponse(IqRequest* request, |
} |
} |
-void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
+void JingleSession::OnIncomingMessage(const std::string& id, |
+ std::unique_ptr<JingleMessage> message, |
const ReplyCallback& reply_callback) { |
+ std::unique_ptr<PendingMessage> item( |
+ new PendingMessage(std::move(message), reply_callback)); |
+ std::vector<std::unique_ptr<PendingMessage>> ordered = |
+ message_queue_->OnIncomingMessage(id, std::move(item)); |
+ for (auto& message : ordered) { |
+ ProcessIncomingMessage(std::move(message->message), |
+ message->reply_callback); |
+ } |
+} |
+ |
+void JingleSession::ProcessIncomingMessage( |
+ std::unique_ptr<JingleMessage> message, |
+ const ReplyCallback& reply_callback) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
if (peer_address_ != message->from) { |
@@ -396,8 +515,7 @@ void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
} |
if (!message->transport_info || |
- !transport_->ProcessTransportInfo( |
- message->transport_info.get())) { |
+ !transport_->ProcessTransportInfo(message->transport_info.get())) { |
reply_callback.Run(JingleMessageReply::BAD_REQUEST); |
return; |
} |
@@ -609,5 +727,9 @@ bool JingleSession::is_session_active() { |
state_ == AUTHENTICATING || state_ == AUTHENTICATED; |
} |
+std::string JingleSession::GetNextOutgoingId() { |
+ return outgoing_id_prefix_ + "_" + base::IntToString(++next_outgoing_id_); |
+} |
+ |
} // namespace protocol |
} // namespace remoting |