| Index: ipc/ipc_channel_win.cc
|
| diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc
|
| index 7b4f174c65ca74074e7e7b8b0b7abe65484b38d0..7e441b65937b17b6dda8d5f839a2c9b19057ed3c 100644
|
| --- a/ipc/ipc_channel_win.cc
|
| +++ b/ipc/ipc_channel_win.cc
|
| @@ -104,6 +104,109 @@ bool Channel::ChannelImpl::IsNamedServerInitialized(
|
| return GetLastError() == ERROR_SEM_TIMEOUT;
|
| }
|
|
|
| +
|
| +Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
|
| + char* buffer,
|
| + int buffer_len,
|
| + int* /* bytes_read */) {
|
| + if (INVALID_HANDLE_VALUE == pipe_)
|
| + return READ_FAILED;
|
| +
|
| + DWORD bytes_read = 0;
|
| + BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize,
|
| + &bytes_read, &input_state_.context.overlapped);
|
| + if (!ok) {
|
| + DWORD err = GetLastError();
|
| + if (err == ERROR_IO_PENDING) {
|
| + input_state_.is_pending = true;
|
| + return READ_PENDING;
|
| + }
|
| + LOG(ERROR) << "pipe error: " << err;
|
| + return READ_FAILED;
|
| + }
|
| +
|
| + // We could return READ_SUCCEEDED here. But the way that this code is
|
| + // structured we instead go back to the message loop. Our completion port
|
| + // will be signalled even in the "synchronously completed" state.
|
| + //
|
| + // This allows us to potentially process some outgoing messages and
|
| + // interleave other work on this thread when we're getting hammered with
|
| + // input messages. Potentially, this could be tuned to be more efficient
|
| + // with some testing.
|
| + input_state_.is_pending = true;
|
| + return READ_PENDING;
|
| +}
|
| +
|
| +bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
|
| + return true;
|
| +}
|
| +
|
| +void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
|
| + // The hello message contains one parameter containing the PID.
|
| + listener_->OnChannelConnected(MessageIterator(msg).NextInt());
|
| +}
|
| +
|
| +bool Channel::ChannelImpl::DidEmptyInputBuffers() {
|
| + return true;
|
| +}
|
| +
|
| +bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
|
| + int input_data_len) {
|
| + const char* p;
|
| + const char* end;
|
| +
|
| + // Possibly combine with the overflow buffer to make a larger buffer.
|
| + if (input_overflow_buf_.empty()) {
|
| + p = input_data;
|
| + end = input_data + input_data_len;
|
| + } else {
|
| + if (input_overflow_buf_.size() >
|
| + kMaximumMessageSize - input_data_len) {
|
| + input_overflow_buf_.clear();
|
| + LOG(ERROR) << "IPC message is too big";
|
| + return false;
|
| + }
|
| + input_overflow_buf_.append(input_data, input_data_len);
|
| + p = input_overflow_buf_.data();
|
| + end = p + input_overflow_buf_.size();
|
| + }
|
| +
|
| + // Dispatch all complete messages in the data buffer.
|
| + while (p < end) {
|
| + const char* message_tail = Message::FindNext(p, end);
|
| + if (message_tail) {
|
| + int len = static_cast<int>(message_tail - p);
|
| + Message m(p, len);
|
| + if (!WillDispatchInputMessage(&m))
|
| + return false;
|
| +
|
| + if (IsHelloMessage(m))
|
| + HandleHelloMessage(m);
|
| + else
|
| + listener_->OnMessageReceived(m);
|
| + p = message_tail;
|
| + } else {
|
| + // Last message is partial.
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // Save any partial data in the overflow buffer.
|
| + input_overflow_buf_.assign(p, end - p);
|
| +
|
| + if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
|
| + return false;
|
| + return true;
|
| +}
|
| +
|
| +bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const {
|
| + return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE;
|
| +}
|
| +
|
| +bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) {
|
| + return DispatchInputData(input_buf_, bytes_read);
|
| +}
|
| +
|
| // static
|
| const std::wstring Channel::ChannelImpl::PipeName(
|
| const std::string& channel_id) {
|
| @@ -257,89 +360,19 @@ bool Channel::ChannelImpl::ProcessConnection() {
|
| return true;
|
| }
|
|
|
| -bool Channel::ChannelImpl::ProcessIncomingMessages(
|
| - MessageLoopForIO::IOContext* context,
|
| - DWORD bytes_read) {
|
| - DCHECK(thread_check_->CalledOnValidThread());
|
| - if (input_state_.is_pending) {
|
| - input_state_.is_pending = false;
|
| - DCHECK(context);
|
| -
|
| - if (!context || !bytes_read)
|
| +bool Channel::ChannelImpl::ProcessIncomingMessages() {
|
| + while (true) {
|
| + int bytes_read = 0;
|
| + ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
|
| + &bytes_read);
|
| + if (read_state == READ_FAILED)
|
| return false;
|
| - } else {
|
| - // This happens at channel initialization.
|
| - DCHECK(!bytes_read && context == &input_state_.context);
|
| - }
|
| -
|
| - for (;;) {
|
| - if (bytes_read == 0) {
|
| - if (INVALID_HANDLE_VALUE == pipe_)
|
| - return false;
|
| -
|
| - // Read from pipe...
|
| - BOOL ok = ReadFile(pipe_,
|
| - input_buf_,
|
| - Channel::kReadBufferSize,
|
| - &bytes_read,
|
| - &input_state_.context.overlapped);
|
| - if (!ok) {
|
| - DWORD err = GetLastError();
|
| - if (err == ERROR_IO_PENDING) {
|
| - input_state_.is_pending = true;
|
| - return true;
|
| - }
|
| - LOG(ERROR) << "pipe error: " << err;
|
| - return false;
|
| - }
|
| - input_state_.is_pending = true;
|
| + if (read_state == READ_PENDING)
|
| return true;
|
| - }
|
| - DCHECK(bytes_read);
|
| -
|
| - // Process messages from input buffer.
|
| -
|
| - const char* p, *end;
|
| - if (input_overflow_buf_.empty()) {
|
| - p = input_buf_;
|
| - end = p + bytes_read;
|
| - } else {
|
| - if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
|
| - input_overflow_buf_.clear();
|
| - LOG(ERROR) << "IPC message is too big";
|
| - return false;
|
| - }
|
| - input_overflow_buf_.append(input_buf_, bytes_read);
|
| - p = input_overflow_buf_.data();
|
| - end = p + input_overflow_buf_.size();
|
| - }
|
| -
|
| - while (p < end) {
|
| - const char* message_tail = Message::FindNext(p, end);
|
| - if (message_tail) {
|
| - int len = static_cast<int>(message_tail - p);
|
| - const Message m(p, len);
|
| - DVLOG(2) << "received message on channel @" << this
|
| - << " with type " << m.type();
|
| - if (m.routing_id() == MSG_ROUTING_NONE &&
|
| - m.type() == HELLO_MESSAGE_TYPE) {
|
| - // The Hello message contains only the process id.
|
| - listener_->OnChannelConnected(MessageIterator(m).NextInt());
|
| - } else {
|
| - listener_->OnMessageReceived(m);
|
| - }
|
| - p = message_tail;
|
| - } else {
|
| - // Last message is partial.
|
| - break;
|
| - }
|
| - }
|
| - input_overflow_buf_.assign(p, end - p);
|
| -
|
| - bytes_read = 0; // Get more data.
|
| + DCHECK(bytes_read > 0);
|
| + if (!DispatchInputData(input_buf_, bytes_read))
|
| + return false;
|
| }
|
| -
|
| - return true;
|
| }
|
|
|
| bool Channel::ChannelImpl::ProcessOutgoingMessages(
|
| @@ -400,8 +433,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
|
| }
|
|
|
| void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
|
| - DWORD bytes_transfered, DWORD error) {
|
| - bool ok;
|
| + DWORD bytes_transfered,
|
| + DWORD error) {
|
| + bool ok = true;
|
| DCHECK(thread_check_->CalledOnValidThread());
|
| if (context == &input_state_.context) {
|
| if (waiting_connect_) {
|
| @@ -414,10 +448,26 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
|
| return;
|
| // else, fall-through and look for incoming messages...
|
| }
|
| - // we don't support recursion through OnMessageReceived yet!
|
| +
|
| + // We don't support recursion through OnMessageReceived yet!
|
| DCHECK(!processing_incoming_);
|
| AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true);
|
| - ok = ProcessIncomingMessages(context, bytes_transfered);
|
| +
|
| + // Process the new data.
|
| + if (input_state_.is_pending) {
|
| + // This is the normal case for everything except the initialization step.
|
| + input_state_.is_pending = false;
|
| + if (!bytes_transfered)
|
| + ok = false;
|
| + else
|
| + ok = AsyncReadComplete(bytes_transfered);
|
| + } else {
|
| + DCHECK(!bytes_transfered);
|
| + }
|
| +
|
| + // Request more data.
|
| + if (ok)
|
| + ok = ProcessIncomingMessages();
|
| } else {
|
| DCHECK(context == &output_state_.context);
|
| ok = ProcessOutgoingMessages(context, bytes_transfered);
|
|
|