Index: ipc/ipc_channel_nacl.cc |
diff --git a/ipc/ipc_channel_nacl.cc b/ipc/ipc_channel_nacl.cc |
index 683353edcde2b05b569ea54523fbc5a4ead4b082..a5bb4cd44ab859a5d198c29cbdf4576ba63046f6 100644 |
--- a/ipc/ipc_channel_nacl.cc |
+++ b/ipc/ipc_channel_nacl.cc |
@@ -4,18 +4,148 @@ |
#include "ipc/ipc_channel_nacl.h" |
+#include <errno.h> |
+#include <stddef.h> |
+#include <sys/nacl_imc_api.h> |
+#include <sys/nacl_syscalls.h> |
+#include <sys/types.h> |
+ |
+#include <algorithm> |
+ |
+#include "base/bind.h" |
#include "base/file_util.h" |
#include "base/logging.h" |
- |
-// This file is currently a stub to get us linking. |
-// TODO(brettw) implement this. |
+#include "base/message_loop_proxy.h" |
+#include "base/process_util.h" |
+#include "base/synchronization/lock.h" |
+#include "base/task_runner_util.h" |
+#include "base/threading/simple_thread.h" |
+#include "ipc/file_descriptor_set_posix.h" |
+#include "ipc/ipc_logging.h" |
namespace IPC { |
+namespace { |
+ |
+scoped_ptr<std::vector<char> > ReadDataOnReaderThread(int pipe) { |
+ DCHECK(pipe >= 0); |
+ scoped_ptr<std::vector<char> > null_ptr; |
+ |
+ if (pipe < 0) |
+ return null_ptr.Pass(); |
+ |
+ scoped_ptr<std::vector<char> > buffer( |
+ new std::vector<char>(Channel::kReadBufferSize)); |
+ struct NaClImcMsgHdr msg = {0}; |
+ struct NaClImcMsgIoVec iov = {&buffer->at(0), buffer->size()}; |
+ msg.iov = &iov; |
+ msg.iov_length = 1; |
+ |
+ int bytes_read = imc_recvmsg(pipe, &msg, 0); |
+ |
+ if (bytes_read < 0) { |
+ // TODO(dmichael): Define what errors actually happen in the NaClCustomDesc |
+ // implementation and handle them appropriately. These are |
+ // placeholders. |
+ if (errno == ECONNRESET || errno == EPIPE) { |
+ return null_ptr.Pass(); |
+ } else { |
+ PLOG(ERROR) << "pipe error (" << pipe << ")"; |
+ return null_ptr.Pass(); |
+ } |
+ } else if (bytes_read == 0) { |
+ // The pipe has closed... |
+ return null_ptr.Pass(); |
+ } |
+ DCHECK(bytes_read); |
+ buffer->resize(bytes_read); |
+ return buffer.Pass(); |
+} |
+ |
+} // namespace |
+ |
+class Channel::ChannelImpl::ReaderThreadRunner |
+ : public base::DelegateSimpleThread::Delegate { |
+ public: |
+ // |pipe|: A file descriptor from which we will read using imc_recvmsg. |
+ // |data_read_callback|: A callback we invoke (on the main thread) when we |
+ // have read data. The callback is passed a buffer of |
+ // data that was read. |
+ // |failure_callback|: A callback we invoke when we have a failure reading |
+ // from |pipe|. |
+ // |main_message_loop|: A proxy for the main thread, where we will invoke the |
+ // above callbacks. |
+ ReaderThreadRunner( |
+ int pipe, |
+ base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, |
+ base::Callback<void ()> failure_callback, |
+ base::MessageLoopProxy* main_message_loop); |
+ |
+ // DelegateSimpleThread implementation. Reads data from the pipe in a loop |
+ // until either we are told to quit or a read fails. |
+ virtual void Run() OVERRIDE; |
+ |
+ private: |
+ int pipe_; |
+ base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback_; |
+ base::Callback<void ()> failure_callback_; |
+ base::MessageLoopProxy* main_message_loop_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner); |
+}; |
+ |
+Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner( |
+ int pipe, |
+ base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, |
+ base::Callback<void ()> failure_callback, |
+ base::MessageLoopProxy* main_message_loop) |
+ : pipe_(pipe), |
+ data_read_callback_(data_read_callback), |
+ failure_callback_(failure_callback), |
+ main_message_loop_(main_message_loop) { |
+} |
+ |
+void Channel::ChannelImpl::ReaderThreadRunner::Run() { |
+ while (true) { |
+ scoped_ptr<std::vector<char> > buffer(ReadDataOnReaderThread(pipe_)); |
+ if (buffer.get()) { |
+ main_message_loop_->PostTask(FROM_HERE, |
+ base::Bind(data_read_callback_, base::Passed(buffer.Pass()))); |
+ } else { |
+ main_message_loop_->PostTask(FROM_HERE, failure_callback_); |
+ // Because the read failed, we know we're going to quit. Don't bother |
+ // trying to read again. |
+ return; |
+ } |
+ } |
+} |
Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, |
- Mode mode, |
- Listener* listener) |
- : ChannelReader(listener) { |
+ Mode mode, |
+ Listener* listener) |
+ : ChannelReader(listener), |
+ mode_(mode), |
+ peer_pid_(base::kNullProcessId), |
+ waiting_connect_(true), |
+ pipe_(-1), |
+ pipe_name_(channel_handle.name), |
+ weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
+ if (!CreatePipe(channel_handle)) { |
+ // The pipe may have been closed already. |
+ const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client"; |
+ LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name |
+ << "\" in " << modestr << " mode"; |
+ } |
+ reader_thread_runner_.reset( |
+ new ReaderThreadRunner( |
+ pipe_, |
+ base::Bind(&Channel::ChannelImpl::DidRecvMsg, |
+ weak_ptr_factory_.GetWeakPtr()), |
+ base::Bind(&Channel::ChannelImpl::ReadDidFail, |
+ weak_ptr_factory_.GetWeakPtr()), |
+ base::MessageLoopProxy::current())); |
+ reader_thread_.reset( |
+ new base::DelegateSimpleThread(reader_thread_runner_.get(), |
+ "ipc_channel_nacl reader thread")); |
} |
Channel::ChannelImpl::~ChannelImpl() { |
@@ -23,69 +153,211 @@ Channel::ChannelImpl::~ChannelImpl() { |
} |
bool Channel::ChannelImpl::Connect() { |
- NOTIMPLEMENTED(); |
- return false; |
+ if (pipe_ == -1) { |
+ DLOG(INFO) << "Channel creation failed: " << pipe_name_; |
+ return false; |
+ } |
+ |
+ reader_thread_->Start(); |
+ if (mode_ & MODE_CLIENT_FLAG) { |
+ // If we are a client we want to send a hello message out immediately. |
+ // In server mode we will send a hello message when we receive one from a |
+ // client. |
+ waiting_connect_ = false; |
+ QueueHelloMessage(); |
+ } else if (mode_ & MODE_SERVER_FLAG) { |
+ waiting_connect_ = true; |
+ return true; |
+ } else { |
+ NOTREACHED(); |
+ return false; |
+ } |
} |
void Channel::ChannelImpl::Close() { |
- NOTIMPLEMENTED(); |
+ close(pipe_); |
Mark Seaborn
2012/05/03 21:42:22
Doing close() before joining the thread is not the
|
+ reader_thread_->Join(); |
+ pipe_ = -1; |
+ reader_thread_runner_.reset(); |
+ reader_thread_.reset(); |
+ read_queue_.clear(); |
+ output_queue_.clear(); |
} |
bool Channel::ChannelImpl::Send(Message* message) { |
- NOTIMPLEMENTED(); |
+ DVLOG(2) << "sending message @" << message << " on channel @" << this |
+ << " with type " << message->type(); |
+ scoped_ptr<Message> message_ptr(message); |
+ |
+#ifdef IPC_MESSAGE_LOG_ENABLED |
+ Logging::GetInstance()->OnSendMessage(message, ""); |
+#endif // IPC_MESSAGE_LOG_ENABLED |
+ |
+ output_queue_.push_back(linked_ptr<Message>(message)); |
+ if (!waiting_connect_) |
+ return ProcessOutgoingMessages(); |
+ |
+ return true; |
} |
-int Channel::ChannelImpl::GetClientFileDescriptor() const { |
- NOTIMPLEMENTED(); |
- return -1; |
+void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<std::vector<char> > buffer) { |
+ // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from |
+ // the reader thread after Close is called. If so, we ignore it. |
+ if (pipe_ == -1) |
+ return; |
+ bool first_message_for_connection = waiting_connect_; |
+ if (waiting_connect_) { |
+ // In client mode, we should have already cleared waiting_connect_. |
+ DCHECK(mode_ & MODE_SERVER_FLAG); |
+ waiting_connect_ = false; |
+ } |
+ |
+ read_queue_.push_back(linked_ptr<std::vector<char> >(buffer.release())); |
+ |
+ // If this was the first message we received, and we're a server, there is a |
+ // hello message (and possibly others) in the queue waiting to be sent. |
+ if (first_message_for_connection && (mode_ & MODE_SERVER_FLAG)) |
+ ProcessOutgoingMessages(); |
} |
-int Channel::ChannelImpl::TakeClientFileDescriptor() { |
- NOTIMPLEMENTED(); |
- return -1; |
+void Channel::ChannelImpl::ReadDidFail() { |
+ Close(); |
} |
-bool Channel::ChannelImpl::AcceptsConnections() const { |
- NOTIMPLEMENTED(); |
- return false; |
+bool Channel::ChannelImpl::CreatePipe( |
+ const IPC::ChannelHandle& channel_handle) { |
+ DCHECK(pipe_ == -1); |
+ |
+ // There's one possible case in NaCl: |
+ // 1) It's a channel wrapping a pipe that is given to us. |
+ // We don't support these: |
+ // 2) It's for a named channel. |
+ // 3) It's for a client that we implement ourself. |
+ // 4) It's the initial IPC channel. |
+ |
+ if (channel_handle.socket.fd == -1) { |
+ NOTIMPLEMENTED(); |
+ return false; |
+ } |
+ pipe_ = channel_handle.socket.fd; |
+ return true; |
} |
-bool Channel::ChannelImpl::HasAcceptedConnection() const { |
- NOTIMPLEMENTED(); |
- return false; |
+bool Channel::ChannelImpl::ProcessOutgoingMessages() { |
+ DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
+ // no connection? |
+ if (output_queue_.empty()) |
+ return true; |
+ |
+ if (pipe_ == -1) |
+ return false; |
+ |
+ // Write out all the messages. The trusted implementation is guaranteed to not |
+ // block. See the Chrome-side implementation of NaClCustomDesc. |
+ // TODO(dmichael): Update this comment to be more specific. |
+ while (!output_queue_.empty()) { |
+ linked_ptr<Message> msg = output_queue_.front(); |
+ output_queue_.pop_front(); |
+ |
+ struct NaClImcMsgHdr msgh = {0}; |
+ struct NaClImcMsgIoVec iov = {const_cast<void*>(msg->data()), msg->size()}; |
+ msgh.iov = &iov; |
+ msgh.iov_length = 1; |
+ // imc_sendmsg is implemented by <TODO(dmichael), reference class here>, |
+ // and is guaranteed to not block. It is also assumed that each message |
+ // sent via imc_sendmsg represents a complete IPC::Message (no more no |
+ // less), so if you update this code to do something different, you must |
+ // also update <TODO(dmichael), reference class here>. |
+ ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0); |
+ |
+ if (bytes_written < 0) { |
+ // The trusted side should only ever give us an error of EPIPE. We |
+ // should never be interrupted, nor should we get EAGAIN. |
+ DCHECK(errno == EPIPE); |
+ Close(); |
+ PLOG(ERROR) << "pipe_ error on " |
+ << pipe_ |
+ << " Currently writing message of size: " |
+ << msg->size(); |
+ return false; |
+ } |
+ |
+ // Message sent OK! |
+ DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type() |
+ << " on fd " << pipe_; |
+ } |
+ return true; |
} |
-bool Channel::ChannelImpl::GetClientEuid(uid_t* client_euid) const { |
- NOTIMPLEMENTED(); |
- return false; |
+int Channel::ChannelImpl::GetHelloMessageProcId() { |
+ // TODO(dmichael): Is there any sensible thing to return here? |
+ return 0; |
} |
-void Channel::ChannelImpl::ResetToAcceptingConnectionState() { |
- NOTIMPLEMENTED(); |
+void Channel::ChannelImpl::QueueHelloMessage() { |
+ // TODO(dmichael): Does it actually make sense to do the Hello message in the |
+ // NaCl channel? The underlying channel that implements |
+ // NaClCustomDesc should already take care of the |
+ // client/server handshake. |
+ // Create the Hello message. |
+ scoped_ptr<Message> msg(new Message(MSG_ROUTING_NONE, |
+ HELLO_MESSAGE_TYPE, |
+ IPC::Message::PRIORITY_NORMAL)); |
+ if (!msg->WriteInt(GetHelloMessageProcId())) { |
+ NOTREACHED() << "Unable to pickle hello message proc id"; |
+ return; |
+ } |
+ Send(msg.release()); |
} |
-Channel::ChannelImpl::ReadState |
- Channel::ChannelImpl::ReadData(char* buffer, |
- int buffer_len, |
- int* bytes_read) { |
- return Channel::ChannelImpl::ReadState(); |
+Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( |
+ char* buffer, |
+ int buffer_len, |
+ int* bytes_read) { |
+ *bytes_read = 0; |
+ if (pipe_ == -1) |
+ return READ_FAILED; |
+ if (read_queue_.empty()) |
+ return READ_PENDING; |
+ while (!read_queue_.empty() && *bytes_read < buffer_len) { |
+ linked_ptr<std::vector<char> > vec(read_queue_.front()); |
+ int bytes_to_read = buffer_len - *bytes_read; |
+ if (vec->size() <= bytes_to_read) { |
+ // We can read and discard the entire vector. |
+ std::copy(vec->begin(), vec->end(), buffer + *bytes_read); |
+ *bytes_read += vec->size(); |
+ read_queue_.pop_front(); |
+ } else { |
+ // Read all the bytes we can and discard them from the front of the |
+ // vector. (This can be slowish, since erase has to move the back of the |
+ // vector to the front, but it's hopefully a temporary hack and it keeps |
+ // the code simple). |
+ std::copy(vec->begin(), vec->begin() + bytes_to_read, |
+ buffer + *bytes_read); |
+ vec->erase(vec->begin(), vec->begin() + bytes_to_read); |
+ *bytes_read += bytes_to_read; |
+ } |
+ } |
+ return READ_SUCCEEDED; |
} |
bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { |
- return false; |
+ return true; |
} |
bool Channel::ChannelImpl::DidEmptyInputBuffers() { |
- return false; |
+ return true; |
} |
void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { |
-} |
- |
-// static |
-bool Channel::ChannelImpl::IsNamedServerInitialized( |
- const std::string& channel_id) { |
- return false; //file_util::PathExists(FilePath(channel_id)); |
+ // The Hello message contains only the process id. |
+ PickleIterator iter(msg); |
+ int pid; |
+ if (!msg.ReadInt(&iter, &pid)) |
+ NOTREACHED(); |
+ peer_pid_ = pid; |
+ QueueHelloMessage(); |
+ listener()->OnChannelConnected(pid); |
} |
//------------------------------------------------------------------------------ |
@@ -117,37 +389,6 @@ bool Channel::Send(Message* message) { |
return channel_impl_->Send(message); |
} |
-int Channel::GetClientFileDescriptor() const { |
- return channel_impl_->GetClientFileDescriptor(); |
-} |
- |
-int Channel::TakeClientFileDescriptor() { |
- return channel_impl_->TakeClientFileDescriptor(); |
-} |
- |
-bool Channel::AcceptsConnections() const { |
- return channel_impl_->AcceptsConnections(); |
-} |
- |
-bool Channel::HasAcceptedConnection() const { |
- return channel_impl_->HasAcceptedConnection(); |
-} |
- |
-bool Channel::GetClientEuid(uid_t* client_euid) const { |
- return channel_impl_->GetClientEuid(client_euid); |
-} |
- |
-void Channel::ResetToAcceptingConnectionState() { |
- channel_impl_->ResetToAcceptingConnectionState(); |
-} |
- |
-base::ProcessId Channel::peer_pid() const { return 0; } |
- |
-// static |
-bool Channel::IsNamedServerInitialized(const std::string& channel_id) { |
- return ChannelImpl::IsNamedServerInitialized(channel_id); |
-} |
- |
// static |
std::string Channel::GenerateVerifiedChannelID(const std::string& prefix) { |
// A random name is sufficient validation on posix systems, so we don't need |