Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: ipc/ipc_channel_nacl.cc

Issue 10174048: PPAPI/NaCl: Speculative implementation for ipc_channel_nacl.cc (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: eliminate writer thread, make reading loop tighter Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« ipc/ipc_channel_nacl.h ('K') | « ipc/ipc_channel_nacl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_nacl.h" 5 #include "ipc/ipc_channel_nacl.h"
6 6
7 #include <errno.h>
8 #include <stddef.h>
9 #include <sys/types.h>
10
11 #include <algorithm>
12
13 #include "base/bind.h"
7 #include "base/file_util.h" 14 #include "base/file_util.h"
8 #include "base/logging.h" 15 #include "base/logging.h"
9 16 #include "base/message_loop_proxy.h"
10 // This file is currently a stub to get us linking. 17 #include "base/process_util.h"
11 // TODO(brettw) implement this. 18 #include "base/synchronization/lock.h"
19 #include "base/task_runner_util.h"
20 #include "base/threading/simple_thread.h"
21 #include "ipc/file_descriptor_set_posix.h"
22 #include "ipc/ipc_logging.h"
23 #include "sys/nacl_imc_api.h"
Mark Seaborn 2012/05/01 22:07:09 These "sys/nacl_*" headers are "system" headers fo
24 #include "sys/nacl_syscalls.h"
12 25
13 namespace IPC { 26 namespace IPC {
27 namespace {
brettw 2012/05/01 22:29:48 Style nit: blank lines around this and the closing
28 scoped_ptr<std::vector<char> > ReadDataOnReaderThread(int pipe) {
29 DCHECK(pipe >= 0);
30 scoped_ptr<std::vector<char> > null_ptr;
31
32 if (pipe < 0)
33 return null_ptr.Pass();
34
35 CHECK(Channel::kReadBufferSize);
brettw 2012/05/01 22:29:48 I'm not sure why you're asserting on this. I'd jus
dmichael (off chromium) 2012/05/03 17:06:35 That was there since I dereference buffer->at(0).
36 scoped_ptr<std::vector<char> > buffer(
37 new std::vector<char>(Channel::kReadBufferSize));
38 struct NaClImcMsgHdr msg = {0};
39 struct NaClImcMsgIoVec iov = {&buffer->at(0), buffer->size()};
40 msg.iov = &iov;
41 msg.iov_length = 1;
42
43 int bytes_read = imc_recvmsg(pipe, &msg, 0);
44 while (bytes_read < 0 && errno == EAGAIN)
Mark Seaborn 2012/05/01 22:07:09 You don't need to handle EAGAIN. I assume your Ch
brettw 2012/05/01 22:29:48 Right, we can probably not bother with EAGAIN for
dmichael (off chromium) 2012/05/03 17:06:35 My thinking was that I don't know for sure yet if
45 bytes_read = imc_recvmsg(pipe, &msg, 0);
46
47 if (bytes_read < 0) {
48 if (errno == ECONNRESET || errno == EPIPE) {
Mark Seaborn 2012/05/01 22:07:09 Ditto. It's up to Chrome what errors this NaCl de
dmichael (off chromium) 2012/05/03 17:06:35 Yeah. I think I'll leave this here for now while I
49 return null_ptr.Pass();
50 } else {
51 PLOG(ERROR) << "pipe error (" << pipe << ")";
52 return null_ptr.Pass();
53 }
54 } else if (bytes_read == 0) {
55 // The pipe has closed...
56 return null_ptr.Pass();
57 }
58 DCHECK(bytes_read);
59 buffer->resize(bytes_read);
60 return buffer.Pass();
61 }
62 } // namespace
63
64 class Channel::ChannelImpl::ReaderThreadRunner
65 : public base::DelegateSimpleThread::Delegate {
66 public:
67 // |pipe|: A file descriptor from which we will read using imc_recvmsg.
68 // |data_read_callback|: A callback we invoke (on the main thread) when we
69 // have read data. The callback is passed a buffer of
70 // data that was read.
71 // |failure_callback|: A callback we invoke when we have a failure reading
72 // from |pipe|.
73 // |main_message_loop|: A proxy for the main thread, where we will invoke the
74 // above callbacks.
75 ReaderThreadRunner(
76 int pipe,
77 base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback,
78 base::Callback<void ()> failure_callback,
79 base::MessageLoopProxy* main_message_loop);
80
81 // DelegateSimpleThread implementation. Reads data from the pipe in a loop
82 // until either we are told to quit or a read fails.
83 virtual void Run() OVERRIDE;
84
85 // Tell the thread to quit (thread-safe).
86 void MakeQuit();
87 private:
brettw 2012/05/02 18:18:56 Blank line before this.
88 int pipe_;
89 base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback_;
90 base::Callback<void ()> failure_callback_;
91 base::MessageLoopProxy* main_message_loop_;
92 bool should_quit_;
93 base::Lock quit_lock_;
94 DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner);
brettw 2012/05/02 18:18:56 Blank line before this.
95 };
14 96
15 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, 97 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
16 Mode mode, 98 Mode mode,
17 Listener* listener) 99 Listener* listener)
18 : ChannelReader(listener) { 100 : ChannelReader(listener),
101 mode_(mode),
102 peer_pid_(base::kNullProcessId),
103 waiting_connect_(true),
104 pipe_(-1),
105 pipe_name_(channel_handle.name),
106 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
107 if (!CreatePipe(channel_handle)) {
108 // The pipe may have been closed already.
109 const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client";
110 LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name
111 << "\" in " << modestr << " mode";
112 }
113 reader_thread_runner_.reset(
114 new ReaderThreadRunner(
115 pipe_,
116 base::Bind(&Channel::ChannelImpl::DidRecvMsg,
117 weak_ptr_factory_.GetWeakPtr()),
118 base::Bind(&Channel::ChannelImpl::ReadDidFail,
119 weak_ptr_factory_.GetWeakPtr()),
120 base::MessageLoopProxy::current()));
121 reader_thread_.reset(
122 new base::DelegateSimpleThread(reader_thread_runner_.get(),
123 "ipc_channel_nacl reader thread"));
19 } 124 }
20 125
21 Channel::ChannelImpl::~ChannelImpl() { 126 Channel::ChannelImpl::~ChannelImpl() {
22 Close(); 127 Close();
23 } 128 }
24 129
25 bool Channel::ChannelImpl::Connect() { 130 bool Channel::ChannelImpl::Connect() {
26 NOTIMPLEMENTED(); 131 if (pipe_ == -1) {
27 return false; 132 DLOG(INFO) << "Channel creation failed: " << pipe_name_;
133 return false;
134 }
135
136 reader_thread_->Start();
137 if (mode_ & MODE_CLIENT_FLAG) {
138 // If we are a client we want to send a hello message out immediately.
139 // In server mode we will send a hello message when we receive one from a
140 // client.
141 waiting_connect_ = false;
142 QueueHelloMessage();
143 } else if (mode_ & MODE_SERVER_FLAG) {
144 waiting_connect_ = true;
145 return true;
146 } else {
147 NOTREACHED();
148 return false;
149 }
28 } 150 }
29 151
30 void Channel::ChannelImpl::Close() { 152 void Channel::ChannelImpl::Close() {
31 NOTIMPLEMENTED(); 153 reader_thread_runner_->MakeQuit();
154 reader_thread_->Join();
155 pipe_ = -1;
156 reader_thread_runner_.reset();
157 reader_thread_.reset();
158 read_queue_.clear();
159 output_queue_.clear();
160 // TODO(dmichael): Should we be opening & closing the virtual FD, or can we
161 // rely on the trusted side to manage it?
32 } 162 }
33 163
34 bool Channel::ChannelImpl::Send(Message* message) { 164 bool Channel::ChannelImpl::Send(Message* message) {
35 NOTIMPLEMENTED(); 165 DVLOG(2) << "sending message @" << message << " on channel @" << this
36 } 166 << " with type " << message->type();
37 167 scoped_ptr<Message> message_ptr(message);
38 int Channel::ChannelImpl::GetClientFileDescriptor() const { 168
39 NOTIMPLEMENTED(); 169 #ifdef IPC_MESSAGE_LOG_ENABLED
40 return -1; 170 Logging::GetInstance()->OnSendMessage(message, "");
41 } 171 #endif // IPC_MESSAGE_LOG_ENABLED
42 172
43 int Channel::ChannelImpl::TakeClientFileDescriptor() { 173 output_queue_.push_back(linked_ptr<Message>(message));
44 NOTIMPLEMENTED(); 174 if (!waiting_connect_)
45 return -1; 175 return ProcessOutgoingMessages();
46 } 176
47 177 return true;
48 bool Channel::ChannelImpl::AcceptsConnections() const { 178 }
49 NOTIMPLEMENTED(); 179
50 return false; 180 void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<std::vector<char> > buffer) {
51 } 181 // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from
52 182 // the reader thread after Close is called. If so, we ignore it.
53 bool Channel::ChannelImpl::HasAcceptedConnection() const { 183 if (pipe_ == -1)
54 NOTIMPLEMENTED(); 184 return;
55 return false; 185 bool first_message_for_connection = waiting_connect_;
56 } 186 if (waiting_connect_) {
57 187 // In client mode, we should have already cleared waiting_connect_.
58 bool Channel::ChannelImpl::GetClientEuid(uid_t* client_euid) const { 188 DCHECK(mode_ & MODE_SERVER_FLAG);
59 NOTIMPLEMENTED(); 189 waiting_connect_ = false;
60 return false; 190 }
61 } 191
62 192 read_queue_.push_back(linked_ptr<std::vector<char> >(buffer.release()));
63 void Channel::ChannelImpl::ResetToAcceptingConnectionState() { 193
64 NOTIMPLEMENTED(); 194 // If this was the first message we received, and we're a server, there is a
65 } 195 // hello message (and possibly others) in the queue waiting to be sent.
66 196 if (first_message_for_connection && (mode_ & MODE_SERVER_FLAG))
67 Channel::ChannelImpl::ReadState 197 ProcessOutgoingMessages();
68 Channel::ChannelImpl::ReadData(char* buffer, 198 }
69 int buffer_len, 199
70 int* bytes_read) { 200 void Channel::ChannelImpl::ReadDidFail() {
71 return Channel::ChannelImpl::ReadState(); 201 Close();
202 }
203
204 Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner(
brettw 2012/05/02 18:18:56 This is confusing. Can you group all the ReaderThr
dmichael (off chromium) 2012/05/03 17:06:35 Yeah, I had it this way because originally I had R
205 int pipe,
206 base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback,
207 base::Callback<void ()> failure_callback,
208 base::MessageLoopProxy* main_message_loop)
209 : pipe_(pipe),
210 data_read_callback_(data_read_callback),
211 failure_callback_(failure_callback),
212 main_message_loop_(main_message_loop),
213 should_quit_(false) {
214 }
215
216 void Channel::ChannelImpl::ReaderThreadRunner::Run() {
217 while (true) {
218 scoped_ptr<std::vector<char> > buffer(ReadDataOnReaderThread(pipe_));
219 quit_lock_.Acquire();
brettw 2012/05/02 18:18:56 I don't see how this quitting works. Normally this
dmichael (off chromium) 2012/05/03 17:06:35 You're right, that makes more sense. Done.
220 if (should_quit_)
221 return;
222 quit_lock_.Release();
223
224 if (buffer.get()) {
225 main_message_loop_->PostTask(FROM_HERE,
226 base::Bind(data_read_callback_, base::Passed(buffer.Pass())));
227 } else {
228 main_message_loop_->PostTask(FROM_HERE, failure_callback_);
229 // Because the read failed, we know we're going to quit. Don't bother
230 // trying to read again.
231 return;
232 }
233 }
234 }
235
236 void Channel::ChannelImpl::ReaderThreadRunner::MakeQuit() {
237 base::AutoLock lock(quit_lock_);
238 should_quit_ = true;
239 }
240
241 bool Channel::ChannelImpl::CreatePipe(
242 const IPC::ChannelHandle& channel_handle) {
243 DCHECK(pipe_ == -1);
244
245 // There's one possible case in NaCl:
246 // 1) It's a channel wrapping a pipe that is given to us.
247 // We don't support these:
248 // 2) It's for a named channel.
249 // 3) It's for a client that we implement ourself.
250 // 4) It's the initial IPC channel.
251
252 if (channel_handle.socket.fd == -1) {
253 NOTIMPLEMENTED();
254 return false;
255 }
256 pipe_ = channel_handle.socket.fd;
257 return true;
258 }
259
260 bool Channel::ChannelImpl::ProcessOutgoingMessages() {
261 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
262 // no connection?
263 if (output_queue_.empty())
264 return true;
265
266 if (pipe_ == -1)
267 return false;
268
269 // Write out all the messages. The trusted implementation is guaranteed to not
270 // block. See the Chrome-side implementation of NaClCustomDesc
271 // TODO(dmichael): Update this comment to be more specific.
272 while (!output_queue_.empty()) {
273 linked_ptr<Message> msg = output_queue_.front();
274 output_queue_.pop_front();
275
276 struct NaClImcMsgHdr msgh = {0};
277 struct NaClImcMsgIoVec iov = {const_cast<void*>(msg->data()), msg->size()};
278 msgh.iov = &iov;
279 msgh.iov_length = 1;
280 // imc_sendmsg is implemented by <TODO(dmichael), reference class here>,
281 // and is guaranteed to not block. It is also assumed that each message
282 // sent via imc_sendmsg represents a complete IPC::Message (no more no
283 // less), so if you update this code to do something different, you must
284 // also update <TODO(dmichael), reference class here>.
285 ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0);
286
287 if (bytes_written < 0) {
288 // The trusted side should only ever give us an error of EPIPE. We
289 // should never be interrupted, nor should we get EAGAIN.
290 DCHECK(errno == EPIPE);
291 Close();
292 PLOG(ERROR) << "pipe_ error on "
293 << pipe_
294 << " Currently writing message of size: "
295 << msg->size();
296 return false;
297 }
298
299 // Message sent OK!
300 DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type()
301 << " on fd " << pipe_;
302 }
303 return true;
304 }
305
306 int Channel::ChannelImpl::GetHelloMessageProcId() {
307 // TODO(dmichael): Is this right in NaCl?
Mark Seaborn 2012/05/01 22:07:09 Nope. NaCl doesn't expose process IDs. Well, "sh
dmichael (off chromium) 2012/05/03 17:06:35 Do you have a bug to track that? I would think tha
308 int pid = base::GetCurrentProcId();
309 return pid;
310 }
311
312 void Channel::ChannelImpl::QueueHelloMessage() {
313 // Create the Hello message
314 scoped_ptr<Message> msg(new Message(MSG_ROUTING_NONE,
315 HELLO_MESSAGE_TYPE,
316 IPC::Message::PRIORITY_NORMAL));
317 if (!msg->WriteInt(GetHelloMessageProcId())) {
318 NOTREACHED() << "Unable to pickle hello message proc id";
319 return;
320 }
321 Send(msg.release());
322 }
323
324 Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
325 char* buffer,
326 int buffer_len,
327 int* bytes_read) {
328 *bytes_read = 0;
329 if (pipe_ == -1)
330 return READ_FAILED;
331 if (read_queue_.empty())
332 return READ_PENDING;
333 while (!read_queue_.empty() && *bytes_read < buffer_len) {
334 linked_ptr<std::vector<char> > vec(read_queue_.front());
335 int bytes_to_read = buffer_len - *bytes_read;
336 if (vec->size() <= bytes_to_read) {
337 // We can read and discard the entire vector.
338 std::copy(vec->begin(), vec->end(), buffer + *bytes_read);
339 *bytes_read += vec->size();
340 read_queue_.pop_front();
341 } else {
342 // Read all the bytes we can and discard them from the front of the
343 // vector. (This can be slowish, since erase has to move the back of the
344 // vector to the front, but it's hopefully a temporary hack and it keeps
345 // the code simple).
346 std::copy(vec->begin(), vec->begin() + bytes_to_read,
347 buffer + *bytes_read);
348 vec->erase(vec->begin(), vec->begin() + bytes_to_read);
349 *bytes_read += bytes_to_read;
350 }
351 }
352 return READ_SUCCEEDED;
72 } 353 }
73 354
74 bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { 355 bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
75 return false; 356 return true;
76 } 357 }
77 358
78 bool Channel::ChannelImpl::DidEmptyInputBuffers() { 359 bool Channel::ChannelImpl::DidEmptyInputBuffers() {
79 return false; 360 return true;
80 } 361 }
81 362
82 void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { 363 void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
83 } 364 // The Hello message contains only the process id.
84 365 PickleIterator iter(msg);
85 // static 366 int pid;
86 bool Channel::ChannelImpl::IsNamedServerInitialized( 367 if (!msg.ReadInt(&iter, &pid))
87 const std::string& channel_id) { 368 NOTREACHED();
88 return false; //file_util::PathExists(FilePath(channel_id)); 369 peer_pid_ = pid;
370 QueueHelloMessage();
371 listener()->OnChannelConnected(pid);
89 } 372 }
90 373
91 //------------------------------------------------------------------------------ 374 //------------------------------------------------------------------------------
92 // Channel's methods simply call through to ChannelImpl. 375 // Channel's methods simply call through to ChannelImpl.
93 376
94 Channel::Channel(const IPC::ChannelHandle& channel_handle, 377 Channel::Channel(const IPC::ChannelHandle& channel_handle,
95 Mode mode, 378 Mode mode,
96 Listener* listener) 379 Listener* listener)
97 : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) { 380 : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) {
98 } 381 }
(...skipping 11 matching lines...) Expand all
110 } 393 }
111 394
112 void Channel::set_listener(Listener* listener) { 395 void Channel::set_listener(Listener* listener) {
113 channel_impl_->set_listener(listener); 396 channel_impl_->set_listener(listener);
114 } 397 }
115 398
116 bool Channel::Send(Message* message) { 399 bool Channel::Send(Message* message) {
117 return channel_impl_->Send(message); 400 return channel_impl_->Send(message);
118 } 401 }
119 402
120 int Channel::GetClientFileDescriptor() const {
121 return channel_impl_->GetClientFileDescriptor();
122 }
123
124 int Channel::TakeClientFileDescriptor() {
125 return channel_impl_->TakeClientFileDescriptor();
126 }
127
128 bool Channel::AcceptsConnections() const {
129 return channel_impl_->AcceptsConnections();
130 }
131
132 bool Channel::HasAcceptedConnection() const {
133 return channel_impl_->HasAcceptedConnection();
134 }
135
136 bool Channel::GetClientEuid(uid_t* client_euid) const {
137 return channel_impl_->GetClientEuid(client_euid);
138 }
139
140 void Channel::ResetToAcceptingConnectionState() {
141 channel_impl_->ResetToAcceptingConnectionState();
142 }
143
144 base::ProcessId Channel::peer_pid() const { return 0; }
145
146 // static
147 bool Channel::IsNamedServerInitialized(const std::string& channel_id) {
148 return ChannelImpl::IsNamedServerInitialized(channel_id);
149 }
150
151 // static 403 // static
152 std::string Channel::GenerateVerifiedChannelID(const std::string& prefix) { 404 std::string Channel::GenerateVerifiedChannelID(const std::string& prefix) {
153 // A random name is sufficient validation on posix systems, so we don't need 405 // A random name is sufficient validation on posix systems, so we don't need
154 // an additional shared secret. 406 // an additional shared secret.
155 std::string id = prefix; 407 std::string id = prefix;
156 if (!id.empty()) 408 if (!id.empty())
157 id.append("."); 409 id.append(".");
158 410
159 return id.append(GenerateUniqueRandomChannelID()); 411 return id.append(GenerateUniqueRandomChannelID());
160 } 412 }
161 413
162 } // namespace IPC 414 } // namespace IPC
OLDNEW
« ipc/ipc_channel_nacl.h ('K') | « ipc/ipc_channel_nacl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698