Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(433)

Side by Side Diff: ipc/ipc_channel_win.cc

Issue 9568031: Refactoring on Windows IPC channel. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_win.h" 5 #include "ipc/ipc_channel_win.h"
6 6
7 #include <windows.h> 7 #include <windows.h>
8 8
9 #include "base/auto_reset.h" 9 #include "base/auto_reset.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 // static 97 // static
98 bool Channel::ChannelImpl::IsNamedServerInitialized( 98 bool Channel::ChannelImpl::IsNamedServerInitialized(
99 const std::string& channel_id) { 99 const std::string& channel_id) {
100 if (WaitNamedPipe(PipeName(channel_id).c_str(), 1)) 100 if (WaitNamedPipe(PipeName(channel_id).c_str(), 1))
101 return true; 101 return true;
102 // If ERROR_SEM_TIMEOUT occurred, the pipe exists but is handling another 102 // If ERROR_SEM_TIMEOUT occurred, the pipe exists but is handling another
103 // connection. 103 // connection.
104 return GetLastError() == ERROR_SEM_TIMEOUT; 104 return GetLastError() == ERROR_SEM_TIMEOUT;
105 } 105 }
106 106
107
108 Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
109 char* buffer,
110 int buffer_len,
111 int* /* bytes_read */) {
112 if (INVALID_HANDLE_VALUE == pipe_)
113 return READ_FAILED;
114
115 DWORD bytes_read = 0;
116 BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize,
117 &bytes_read, &input_state_.context.overlapped);
118 if (!ok) {
119 DWORD err = GetLastError();
120 if (err == ERROR_IO_PENDING) {
121 input_state_.is_pending = true;
122 return READ_PENDING;
123 }
124 LOG(ERROR) << "pipe error: " << err;
125 return READ_FAILED;
126 }
127
128 // We could return READ_SUCCEEDED here. But the way that this code is
129 // structured, but instead we go back to the message loop. Our completion
rvargas (doing something else) 2012/03/02 02:22:08 nit: this comment needs some rewording.
130 // port will be signalled even in the "synchronously completed" state.
131 //
132 // This allows us to potentially process some outgoing messages and
133 // interleave other work on this thread when we're getting hammered with
134 // input messages. Potentially, this could be tuned to be more efficient
135 // with some testing.
136 input_state_.is_pending = true;
137 return READ_PENDING;
138 }
139
140 bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
141 return true;
142 }
143
144 void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
145 // The hello message contains one parameter containing the PID.
146 listener_->OnChannelConnected(MessageIterator(msg).NextInt());
147 }
148
149 bool Channel::ChannelImpl::DidEmptyInputBuffers() {
150 return true;
151 }
152
153 bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
154 int input_data_len) {
155 const char* p;
156 const char* end;
157
158 // Possibly combine with the overflow buffer to make a larger buffer.
159 if (input_overflow_buf_.empty()) {
160 p = input_data;
161 end = input_data + input_data_len;
162 } else {
163 if (input_overflow_buf_.size() >
164 kMaximumMessageSize - input_data_len) {
165 input_overflow_buf_.clear();
166 LOG(ERROR) << "IPC message is too big";
167 return false;
168 }
169 input_overflow_buf_.append(input_data, input_data_len);
170 p = input_overflow_buf_.data();
171 end = p + input_overflow_buf_.size();
172 }
173
174 // Dispatch all complete messages in the data buffer.
175 while (p < end) {
176 const char* message_tail = Message::FindNext(p, end);
177 if (message_tail) {
178 int len = static_cast<int>(message_tail - p);
179 Message m(p, len);
180 if (!WillDispatchInputMessage(&m))
181 return false;
182
183 if (IsHelloMessage(m))
184 HandleHelloMessage(m);
185 else
186 listener_->OnMessageReceived(m);
187 p = message_tail;
188 } else {
189 // Last message is partial.
190 break;
191 }
192 }
193
194 // Save any partial data in the overflow buffer.
195 input_overflow_buf_.assign(p, end - p);
196
197 if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
198 return false;
199 return true;
200 }
201
202 bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const {
203 return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE;
204 }
205
206 bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) {
207 return DispatchInputData(input_buf_, bytes_read);
208 }
209
107 // static 210 // static
108 const std::wstring Channel::ChannelImpl::PipeName( 211 const std::wstring Channel::ChannelImpl::PipeName(
109 const std::string& channel_id) { 212 const std::string& channel_id) {
110 std::string name("\\\\.\\pipe\\chrome."); 213 std::string name("\\\\.\\pipe\\chrome.");
111 return ASCIIToWide(name.append(channel_id)); 214 return ASCIIToWide(name.append(channel_id));
112 } 215 }
113 216
114 bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle, 217 bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle,
115 Mode mode) { 218 Mode mode) {
116 DCHECK_EQ(INVALID_HANDLE_VALUE, pipe_); 219 DCHECK_EQ(INVALID_HANDLE_VALUE, pipe_);
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 // The pipe is being closed. 353 // The pipe is being closed.
251 return false; 354 return false;
252 default: 355 default:
253 NOTREACHED(); 356 NOTREACHED();
254 return false; 357 return false;
255 } 358 }
256 359
257 return true; 360 return true;
258 } 361 }
259 362
260 bool Channel::ChannelImpl::ProcessIncomingMessages( 363 bool Channel::ChannelImpl::ProcessIncomingMessages() {
261 MessageLoopForIO::IOContext* context, 364 while (true) {
262 DWORD bytes_read) { 365 int bytes_read = 0;
263 DCHECK(thread_check_->CalledOnValidThread()); 366 ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
264 if (input_state_.is_pending) { 367 &bytes_read);
brettw 2012/03/01 21:24:16 This check moved to OnIOCompleted.
265 input_state_.is_pending = false; 368 if (read_state == READ_FAILED)
266 DCHECK(context);
267
268 if (!context || !bytes_read)
269 return false; 369 return false;
270 } else { 370 if (read_state == READ_PENDING)
271 // This happens at channel initialization. 371 return true;
272 DCHECK(!bytes_read && context == &input_state_.context); 372 DCHECK(bytes_read > 0);
373 if (!DispatchInputData(input_buf_, bytes_read))
374 return false;
273 } 375 }
274
275 for (;;) {
276 if (bytes_read == 0) {
brettw 2012/03/01 21:24:16 This code moved to ReadData
277 if (INVALID_HANDLE_VALUE == pipe_)
278 return false;
279
280 // Read from pipe...
281 BOOL ok = ReadFile(pipe_,
282 input_buf_,
283 Channel::kReadBufferSize,
284 &bytes_read,
285 &input_state_.context.overlapped);
286 if (!ok) {
287 DWORD err = GetLastError();
288 if (err == ERROR_IO_PENDING) {
289 input_state_.is_pending = true;
290 return true;
291 }
292 LOG(ERROR) << "pipe error: " << err;
293 return false;
294 }
295 input_state_.is_pending = true;
296 return true;
297 }
298 DCHECK(bytes_read);
299
300 // Process messages from input buffer.
301
302 const char* p, *end;
brettw 2012/03/01 21:24:16 The rest of this loop moved to DispatchInputData
303 if (input_overflow_buf_.empty()) {
304 p = input_buf_;
305 end = p + bytes_read;
306 } else {
307 if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
308 input_overflow_buf_.clear();
309 LOG(ERROR) << "IPC message is too big";
310 return false;
311 }
312 input_overflow_buf_.append(input_buf_, bytes_read);
313 p = input_overflow_buf_.data();
314 end = p + input_overflow_buf_.size();
315 }
316
317 while (p < end) {
318 const char* message_tail = Message::FindNext(p, end);
319 if (message_tail) {
320 int len = static_cast<int>(message_tail - p);
321 const Message m(p, len);
322 DVLOG(2) << "received message on channel @" << this
323 << " with type " << m.type();
324 if (m.routing_id() == MSG_ROUTING_NONE &&
325 m.type() == HELLO_MESSAGE_TYPE) {
326 // The Hello message contains only the process id.
327 listener_->OnChannelConnected(MessageIterator(m).NextInt());
328 } else {
329 listener_->OnMessageReceived(m);
330 }
331 p = message_tail;
332 } else {
333 // Last message is partial.
334 break;
335 }
336 }
337 input_overflow_buf_.assign(p, end - p);
338
339 bytes_read = 0; // Get more data.
340 }
341
342 return true;
343 } 376 }
344 377
345 bool Channel::ChannelImpl::ProcessOutgoingMessages( 378 bool Channel::ChannelImpl::ProcessOutgoingMessages(
346 MessageLoopForIO::IOContext* context, 379 MessageLoopForIO::IOContext* context,
347 DWORD bytes_written) { 380 DWORD bytes_written) {
348 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's 381 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
349 // no connection? 382 // no connection?
350 DCHECK(thread_check_->CalledOnValidThread()); 383 DCHECK(thread_check_->CalledOnValidThread());
351 384
352 if (output_state_.is_pending) { 385 if (output_state_.is_pending) {
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
393 } 426 }
394 427
395 DVLOG(2) << "sent message @" << m << " on channel @" << this 428 DVLOG(2) << "sent message @" << m << " on channel @" << this
396 << " with type " << m->type(); 429 << " with type " << m->type();
397 430
398 output_state_.is_pending = true; 431 output_state_.is_pending = true;
399 return true; 432 return true;
400 } 433 }
401 434
402 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, 435 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
403 DWORD bytes_transfered, DWORD error) { 436 DWORD bytes_transfered,
404 bool ok; 437 DWORD error) {
438 bool ok = true;
405 DCHECK(thread_check_->CalledOnValidThread()); 439 DCHECK(thread_check_->CalledOnValidThread());
406 if (context == &input_state_.context) { 440 if (context == &input_state_.context) {
407 if (waiting_connect_) { 441 if (waiting_connect_) {
408 if (!ProcessConnection()) 442 if (!ProcessConnection())
409 return; 443 return;
410 // We may have some messages queued up to send... 444 // We may have some messages queued up to send...
411 if (!output_queue_.empty() && !output_state_.is_pending) 445 if (!output_queue_.empty() && !output_state_.is_pending)
412 ProcessOutgoingMessages(NULL, 0); 446 ProcessOutgoingMessages(NULL, 0);
413 if (input_state_.is_pending) 447 if (input_state_.is_pending)
414 return; 448 return;
415 // else, fall-through and look for incoming messages... 449 // else, fall-through and look for incoming messages...
416 } 450 }
417 // we don't support recursion through OnMessageReceived yet! 451
452 // We don't support recursion through OnMessageReceived yet!
418 DCHECK(!processing_incoming_); 453 DCHECK(!processing_incoming_);
419 AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true); 454 AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true);
420 ok = ProcessIncomingMessages(context, bytes_transfered); 455
456 // Process the new data.
457 if (input_state_.is_pending) {
458 // This is the normal case for everything except the initialization step.
459 input_state_.is_pending = false;
rvargas (doing something else) 2012/03/02 02:22:08 It may be better to move this block to AsyncReadCo
brettw 2012/03/02 05:31:18 I was going to make AsyncReadComplete cross-platfo
rvargas (doing something else) 2012/03/02 18:44:17 OK. Does that mean that AsyncReadComplete is goin
460 if (!bytes_transfered)
461 ok = false;
462 else
463 ok = AsyncReadComplete(bytes_transfered);
brettw 2012/03/01 21:24:16 ProcessIncomingMessages doesn't need any parameter
464 } else {
465 DCHECK(!bytes_transfered);
466 }
467
468 // Request more data.
469 if (ok)
470 ok = ProcessIncomingMessages();
421 } else { 471 } else {
422 DCHECK(context == &output_state_.context); 472 DCHECK(context == &output_state_.context);
423 ok = ProcessOutgoingMessages(context, bytes_transfered); 473 ok = ProcessOutgoingMessages(context, bytes_transfered);
424 } 474 }
425 if (!ok && INVALID_HANDLE_VALUE != pipe_) { 475 if (!ok && INVALID_HANDLE_VALUE != pipe_) {
426 // We don't want to re-enter Close(). 476 // We don't want to re-enter Close().
427 Close(); 477 Close();
428 listener_->OnChannelError(); 478 listener_->OnChannelError();
429 } 479 }
430 } 480 }
(...skipping 24 matching lines...) Expand all
455 bool Channel::Send(Message* message) { 505 bool Channel::Send(Message* message) {
456 return channel_impl_->Send(message); 506 return channel_impl_->Send(message);
457 } 507 }
458 508
459 // static 509 // static
460 bool Channel::IsNamedServerInitialized(const std::string& channel_id) { 510 bool Channel::IsNamedServerInitialized(const std::string& channel_id) {
461 return ChannelImpl::IsNamedServerInitialized(channel_id); 511 return ChannelImpl::IsNamedServerInitialized(channel_id);
462 } 512 }
463 513
464 } // namespace IPC 514 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698