Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(120)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/TcpSocket.h"
6
7 #include <assert.h>
8 #include <string.h>
9
10 #include "net/SocketSubSystem.h"
11 #include "ppapi/c/pp_errors.h"
12 #include "ppapi/cpp/module.h"
13 #include "util/DebugPrint.h"
14
15 TCPSocket::TCPSocket(int oflag)
16 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
17 read_buf_(kBufSize), write_sent_(false) {
18 }
19
20 TCPSocket::~TCPSocket() {
21 assert(!socket_);
22 assert(!ref_);
23 }
24
25 void TCPSocket::addref() {
26 ++ref_;
27 }
28
29 void TCPSocket::release() {
30 if (!--ref_)
31 delete this;
32 }
33
34 FileStream* TCPSocket::dup(int fd) {
Evgeniy Stepanov 2012/05/25 12:05:57 this seems unneeded
vissi 2012/05/25 13:07:09 Done.
35 return NULL;
36 }
37
38 bool TCPSocket::connect(const char* host, uint16_t port) {
39 int32_t result = PP_OK_COMPLETIONPENDING;
40 pp::Module::Get()->core()->CallOnMainThread(0,
41 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
42 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
43 dbgprintf("host: %s, port: %d\n", host, port);
44 while (result == PP_OK_COMPLETIONPENDING)
45 sys->cond().wait(sys->mutex());
46 return result == PP_OK;
47 }
48
49 bool TCPSocket::accept(PP_Resource resource) {
Evgeniy Stepanov 2012/05/25 12:05:57 call it something like fromAccept to avoid confusi
vissi 2012/05/25 13:07:09 Doen.
50 int32_t result = PP_OK_COMPLETIONPENDING;
51 pp::Module::Get()->core()->CallOnMainThread(0,
52 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
53 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
54 while (result == PP_OK_COMPLETIONPENDING)
55 sys->cond().wait(sys->mutex());
56 return result == PP_OK;
57 }
58
59 void TCPSocket::close() {
60 if (socket_) {
61 int32_t result = PP_OK_COMPLETIONPENDING;
62 pp::Module::Get()->core()->CallOnMainThread(0,
63 factory_.NewCallback(&TCPSocket::Close, &result));
64 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
65 while (result == PP_OK_COMPLETIONPENDING)
66 sys->cond().wait(sys->mutex());
67 }
68 }
69
70 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
71 if (!is_open())
72 return EIO;
73
74 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
75 if (is_block()) {
76 while (in_buf_.empty() && is_open())
77 sys->cond().wait(sys->mutex());
78 }
79
80 *nread = 0;
81 while (*nread < count) {
82 if (in_buf_.empty())
83 break;
84
85 buf[(*nread)++] = in_buf_.front();
86 in_buf_.pop_front();
87 }
88
89 if (*nread == 0) {
90 if (!is_open()) {
91 return 0;
92 } else {
93 *nread = -1;
94 return EAGAIN;
95 }
96 }
97
98 return 0;
99 }
100
101 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
102 if (!is_open())
103 return EIO;
104
105 out_buf_.insert(out_buf_.end(), buf, buf + count);
106 if (is_block()) {
107 int32_t result = PP_OK_COMPLETIONPENDING;
108 pp::Module::Get()->core()->CallOnMainThread(0,
109 factory_.NewCallback(&TCPSocket::Write, &result));
110 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
111 while (result == PP_OK_COMPLETIONPENDING)
112 sys->cond().wait(sys->mutex());
113 if ((size_t)result != count) {
114 *nwrote = -1;
115 return EIO;
116 } else {
117 *nwrote = count;
118 return 0;
119 }
120 } else {
121 if (!write_sent_) {
122 write_sent_ = true;
123 pp::Module::Get()->core()->CallOnMainThread(0,
124 factory_.NewCallback(&TCPSocket::Write,
125 reinterpret_cast<int32_t*>(NULL)));
126 }
127 *nwrote = count;
128 return 0;
129 }
130 }
131
132 int TCPSocket::fcntl(int cmd, va_list ap) {
133 if (cmd == F_GETFL) {
134 return oflag_;
135 } else if (cmd == F_SETFL) {
136 oflag_ = va_arg(ap, long);
137 return 0;
138 } else {
139 return -1;
140 }
141 }
142
143 bool TCPSocket::is_read_ready() {
144 return !is_open() || !in_buf_.empty();
145 }
146
147 bool TCPSocket::is_write_ready() {
148 return !is_open() || out_buf_.size() < kBufSize;
149 }
150
151 bool TCPSocket::is_exception() {
152 return !is_open();
153 }
154
155 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
156 int32_t* pres) {
157 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
158 Mutex::Lock lock(sys->mutex());
159 assert(!socket_);
160 socket_ = new pp::TCPSocketPrivate(sys->instance());
161 *pres = socket_->Connect(host, port,
162 factory_.NewCallback(&TCPSocket::OnConnect, pres));
163 if (*pres != PP_OK_COMPLETIONPENDING)
164 sys->cond().broadcast();
165 }
166
167 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
168 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
169 Mutex::Lock lock(sys->mutex());
170 if (result == PP_OK) {
171 Read(PP_OK, NULL);
172 } else {
173 delete socket_;
174 socket_ = NULL;
175 }
176 *pres = result;
177 sys->cond().broadcast();
178 }
179
180 void TCPSocket::Read(int32_t result, int32_t* pres) {
181 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
182 Mutex::Lock lock(sys->mutex());
183 if (!is_open())
184 return;
185
186 result = socket_->Read(&read_buf_[0], read_buf_.size(),
187 factory_.NewCallback(&TCPSocket::OnRead, pres));
188 if (result != PP_OK_COMPLETIONPENDING) {
189 delete socket_;
190 socket_ = NULL;
191 if (pres)
192 *pres = result;
193 sys->cond().broadcast();
194 }
195 }
196
197 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
198 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
199 Mutex::Lock lock(sys->mutex());
200 if (!is_open())
201 return;
202
203 if (result > 0) {
204 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
205 Read(PP_OK, NULL);
206 } else {
207 delete socket_;
208 socket_ = NULL;
209 }
210 if (pres)
211 *pres = result;
212 sys->cond().broadcast();
213 }
214
215 void TCPSocket::Write(int32_t result, int32_t* pres) {
216 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
217 Mutex::Lock lock(sys->mutex());
218 if (!is_open())
219 return;
220
221 if (write_buf_.size()) {
222 // Previous write operation is in progress.
223 pp::Module::Get()->core()->CallOnMainThread(1,
224 factory_.NewCallback(&TCPSocket::Write, &result));
225 return;
226 }
227 assert(out_buf_.size());
228 write_buf_.swap(out_buf_);
229 result = socket_->Write(&write_buf_[0], write_buf_.size(),
230 factory_.NewCallback(&TCPSocket::OnWrite, pres));
231 if (result != PP_OK_COMPLETIONPENDING) {
232 LOG("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
233 delete socket_;
234 socket_ = NULL;
235 if (pres)
236 *pres = result;
237 sys->cond().broadcast();
238 }
239 write_sent_ = false;
240 }
241
242 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
243 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
244 Mutex::Lock lock(sys->mutex());
245 if (!is_open())
246 return;
247
248 if (result < 0 || (size_t)result > write_buf_.size()) {
249 // Write error.
250 LOG("TCPSocket::OnWrite: close socket\n");
251 delete socket_;
252 socket_ = NULL;
253 } else if ((size_t)result < write_buf_.size()) {
254 // Partial write. Insert remaining bytes at the beginning of out_buf_.
255 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
256 }
257 if (pres)
258 *pres = result;
259 write_buf_.clear();
260 sys->cond().broadcast();
261 }
262
263 void TCPSocket::Close(int32_t result, int32_t* pres) {
264 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
265 Mutex::Lock lock(sys->mutex());
266 delete socket_;
267 socket_ = NULL;
268 if (pres)
269 *pres = PP_OK;
270 sys->cond().broadcast();
271 }
272
273 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
274 SocketSubSystem* sys = SocketSubSystem::GetSocketSubSystem();
275 Mutex::Lock lock(sys->mutex());
276 assert(!socket_);
277 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
278 Read(PP_OK, NULL);
279 *pres = PP_OK;
280 sys->cond().broadcast();
281 return true;
282 }
283
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698