Chromium Code Reviews| Index: ipc/ipc_channel_win.cc |
| diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc |
| index 7b4f174c65ca74074e7e7b8b0b7abe65484b38d0..2655360c0503c78f11c56441b1fbaa04c4f73c81 100644 |
| --- a/ipc/ipc_channel_win.cc |
| +++ b/ipc/ipc_channel_win.cc |
| @@ -104,6 +104,109 @@ bool Channel::ChannelImpl::IsNamedServerInitialized( |
| return GetLastError() == ERROR_SEM_TIMEOUT; |
| } |
| + |
| +Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( |
| + char* buffer, |
| + int buffer_len, |
| + int* /* bytes_read */) { |
| + if (INVALID_HANDLE_VALUE == pipe_) |
| + return READ_FAILED; |
| + |
| + DWORD bytes_read = 0; |
| + BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize, |
| + &bytes_read, &input_state_.context.overlapped); |
| + if (!ok) { |
| + DWORD err = GetLastError(); |
| + if (err == ERROR_IO_PENDING) { |
| + input_state_.is_pending = true; |
| + return READ_PENDING; |
| + } |
| + LOG(ERROR) << "pipe error: " << err; |
| + return READ_FAILED; |
| + } |
| + |
| + // We could return READ_SUCCEEDED here. But the way that this code is |
| + // structured, but instead we go back to the message loop. Our completion |
|
rvargas (doing something else)
2012/03/02 02:22:08
nit: this comment needs some rewording.
|
| + // port will be signalled even in the "synchronously completed" state. |
| + // |
| + // This allows us to potentially process some outgoing messages and |
| + // interleave other work on this thread when we're getting hammered with |
| + // input messages. Potentially, this could be tuned to be more efficient |
| + // with some testing. |
| + input_state_.is_pending = true; |
| + return READ_PENDING; |
| +} |
| + |
| +bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { |
| + return true; |
| +} |
| + |
| +void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { |
| + // The hello message contains one parameter containing the PID. |
| + listener_->OnChannelConnected(MessageIterator(msg).NextInt()); |
| +} |
| + |
| +bool Channel::ChannelImpl::DidEmptyInputBuffers() { |
| + return true; |
| +} |
| + |
| +bool Channel::ChannelImpl::DispatchInputData(const char* input_data, |
| + int input_data_len) { |
| + const char* p; |
| + const char* end; |
| + |
| + // Possibly combine with the overflow buffer to make a larger buffer. |
| + if (input_overflow_buf_.empty()) { |
| + p = input_data; |
| + end = input_data + input_data_len; |
| + } else { |
| + if (input_overflow_buf_.size() > |
| + kMaximumMessageSize - input_data_len) { |
| + input_overflow_buf_.clear(); |
| + LOG(ERROR) << "IPC message is too big"; |
| + return false; |
| + } |
| + input_overflow_buf_.append(input_data, input_data_len); |
| + p = input_overflow_buf_.data(); |
| + end = p + input_overflow_buf_.size(); |
| + } |
| + |
| + // Dispatch all complete messages in the data buffer. |
| + while (p < end) { |
| + const char* message_tail = Message::FindNext(p, end); |
| + if (message_tail) { |
| + int len = static_cast<int>(message_tail - p); |
| + Message m(p, len); |
| + if (!WillDispatchInputMessage(&m)) |
| + return false; |
| + |
| + if (IsHelloMessage(m)) |
| + HandleHelloMessage(m); |
| + else |
| + listener_->OnMessageReceived(m); |
| + p = message_tail; |
| + } else { |
| + // Last message is partial. |
| + break; |
| + } |
| + } |
| + |
| + // Save any partial data in the overflow buffer. |
| + input_overflow_buf_.assign(p, end - p); |
| + |
| + if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) |
| + return false; |
| + return true; |
| +} |
| + |
| +bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const { |
| + return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE; |
| +} |
| + |
| +bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) { |
| + return DispatchInputData(input_buf_, bytes_read); |
| +} |
| + |
| // static |
| const std::wstring Channel::ChannelImpl::PipeName( |
| const std::string& channel_id) { |
| @@ -257,89 +360,19 @@ bool Channel::ChannelImpl::ProcessConnection() { |
| return true; |
| } |
| -bool Channel::ChannelImpl::ProcessIncomingMessages( |
| - MessageLoopForIO::IOContext* context, |
| - DWORD bytes_read) { |
| - DCHECK(thread_check_->CalledOnValidThread()); |
| - if (input_state_.is_pending) { |
|
brettw
2012/03/01 21:24:16
This check moved to OnIOCompleted.
|
| - input_state_.is_pending = false; |
| - DCHECK(context); |
| - |
| - if (!context || !bytes_read) |
| +bool Channel::ChannelImpl::ProcessIncomingMessages() { |
| + while (true) { |
| + int bytes_read = 0; |
| + ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, |
| + &bytes_read); |
| + if (read_state == READ_FAILED) |
| return false; |
| - } else { |
| - // This happens at channel initialization. |
| - DCHECK(!bytes_read && context == &input_state_.context); |
| - } |
| - |
| - for (;;) { |
| - if (bytes_read == 0) { |
|
brettw
2012/03/01 21:24:16
This code moved to ReadData
|
| - if (INVALID_HANDLE_VALUE == pipe_) |
| - return false; |
| - |
| - // Read from pipe... |
| - BOOL ok = ReadFile(pipe_, |
| - input_buf_, |
| - Channel::kReadBufferSize, |
| - &bytes_read, |
| - &input_state_.context.overlapped); |
| - if (!ok) { |
| - DWORD err = GetLastError(); |
| - if (err == ERROR_IO_PENDING) { |
| - input_state_.is_pending = true; |
| - return true; |
| - } |
| - LOG(ERROR) << "pipe error: " << err; |
| - return false; |
| - } |
| - input_state_.is_pending = true; |
| + if (read_state == READ_PENDING) |
| return true; |
| - } |
| - DCHECK(bytes_read); |
| - |
| - // Process messages from input buffer. |
| - |
| - const char* p, *end; |
|
brettw
2012/03/01 21:24:16
The rest of this loop moved to DispatchInputData
|
| - if (input_overflow_buf_.empty()) { |
| - p = input_buf_; |
| - end = p + bytes_read; |
| - } else { |
| - if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { |
| - input_overflow_buf_.clear(); |
| - LOG(ERROR) << "IPC message is too big"; |
| - return false; |
| - } |
| - input_overflow_buf_.append(input_buf_, bytes_read); |
| - p = input_overflow_buf_.data(); |
| - end = p + input_overflow_buf_.size(); |
| - } |
| - |
| - while (p < end) { |
| - const char* message_tail = Message::FindNext(p, end); |
| - if (message_tail) { |
| - int len = static_cast<int>(message_tail - p); |
| - const Message m(p, len); |
| - DVLOG(2) << "received message on channel @" << this |
| - << " with type " << m.type(); |
| - if (m.routing_id() == MSG_ROUTING_NONE && |
| - m.type() == HELLO_MESSAGE_TYPE) { |
| - // The Hello message contains only the process id. |
| - listener_->OnChannelConnected(MessageIterator(m).NextInt()); |
| - } else { |
| - listener_->OnMessageReceived(m); |
| - } |
| - p = message_tail; |
| - } else { |
| - // Last message is partial. |
| - break; |
| - } |
| - } |
| - input_overflow_buf_.assign(p, end - p); |
| - |
| - bytes_read = 0; // Get more data. |
| + DCHECK(bytes_read > 0); |
| + if (!DispatchInputData(input_buf_, bytes_read)) |
| + return false; |
| } |
| - |
| - return true; |
| } |
| bool Channel::ChannelImpl::ProcessOutgoingMessages( |
| @@ -400,8 +433,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages( |
| } |
| void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
| - DWORD bytes_transfered, DWORD error) { |
| - bool ok; |
| + DWORD bytes_transfered, |
| + DWORD error) { |
| + bool ok = true; |
| DCHECK(thread_check_->CalledOnValidThread()); |
| if (context == &input_state_.context) { |
| if (waiting_connect_) { |
| @@ -414,10 +448,26 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
| return; |
| // else, fall-through and look for incoming messages... |
| } |
| - // we don't support recursion through OnMessageReceived yet! |
| + |
| + // We don't support recursion through OnMessageReceived yet! |
| DCHECK(!processing_incoming_); |
| AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true); |
| - ok = ProcessIncomingMessages(context, bytes_transfered); |
| + |
| + // Process the new data. |
| + if (input_state_.is_pending) { |
| + // This is the normal case for everything except the initialization step. |
| + input_state_.is_pending = false; |
|
rvargas (doing something else)
2012/03/02 02:22:08
It may be better to move this block to AsyncReadCo
brettw
2012/03/02 05:31:18
I was going to make AsyncReadComplete cross-platfo
rvargas (doing something else)
2012/03/02 18:44:17
OK.
Does that mean that AsyncReadComplete is goin
|
| + if (!bytes_transfered) |
| + ok = false; |
| + else |
| + ok = AsyncReadComplete(bytes_transfered); |
|
brettw
2012/03/01 21:24:16
ProcessIncomingMessages doesn't need any parameter
|
| + } else { |
| + DCHECK(!bytes_transfered); |
| + } |
| + |
| + // Request more data. |
| + if (ok) |
| + ok = ProcessIncomingMessages(); |
| } else { |
| DCHECK(context == &output_state_.context); |
| ok = ProcessOutgoingMessages(context, bytes_transfered); |