Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(373)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/TcpSocket.h"
6
7 #include <assert.h>
8 #include <string.h>
9
10 #include "net/SocketSubSystem.h"
11 #include "ppapi/c/pp_errors.h"
12 #include "ppapi/cpp/module.h"
13 #include "util/DebugPrint.h"
14
15 TCPSocket::TCPSocket(SocketSubSystem* sys, int oflag)
16 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
17 read_buf_(kBufSize), write_sent_(false), sys(sys) {
18 }
19
20 TCPSocket::~TCPSocket() {
21 assert(!socket_);
22 assert(!ref_);
23 }
24
25 void TCPSocket::addref() {
26 ++ref_;
27 }
28
29 void TCPSocket::release() {
30 if (!--ref_)
31 delete this;
32 }
33
34 bool TCPSocket::connect(const char* host, uint16_t port) {
35 int32_t result = PP_OK_COMPLETIONPENDING;
36 pp::Module::Get()->core()->CallOnMainThread(0,
37 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
38 dbgprintf("host: %s, port: %d\n", host, port);
39 while (result == PP_OK_COMPLETIONPENDING)
40 sys->cond().wait(sys->mutex());
41 return result == PP_OK;
42 }
43
44 bool TCPSocket::acceptFrom(PP_Resource resource) {
45 int32_t result = PP_OK_COMPLETIONPENDING;
46 pp::Module::Get()->core()->CallOnMainThread(0,
47 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
48 while (result == PP_OK_COMPLETIONPENDING)
49 sys->cond().wait(sys->mutex());
50 return result == PP_OK;
51 }
52
53 void TCPSocket::close() {
54 if (socket_) {
55 int32_t result = PP_OK_COMPLETIONPENDING;
56 pp::Module::Get()->core()->CallOnMainThread(0,
57 factory_.NewCallback(&TCPSocket::Close, &result));
58 while (result == PP_OK_COMPLETIONPENDING)
59 sys->cond().wait(sys->mutex());
60 }
61 }
62
63 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
64 if (!is_open())
65 return EIO;
66
67 if (is_block()) {
68 while (in_buf_.empty() && is_open())
69 sys->cond().wait(sys->mutex());
70 }
71
72 *nread = 0;
73 while (*nread < count) {
74 if (in_buf_.empty())
75 break;
76
77 buf[(*nread)++] = in_buf_.front();
78 in_buf_.pop_front();
79 }
80
81 if (*nread == 0) {
82 if (!is_open()) {
83 return 0;
84 } else {
85 *nread = -1;
86 return EAGAIN;
87 }
88 }
89
90 return 0;
91 }
92
93 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
94 if (!is_open())
95 return EIO;
96
97 out_buf_.insert(out_buf_.end(), buf, buf + count);
98 if (is_block()) {
99 int32_t result = PP_OK_COMPLETIONPENDING;
100 pp::Module::Get()->core()->CallOnMainThread(0,
101 factory_.NewCallback(&TCPSocket::Write, &result));
102 while (result == PP_OK_COMPLETIONPENDING)
103 sys->cond().wait(sys->mutex());
104 if ((size_t)result != count) {
105 *nwrote = -1;
106 return EIO;
107 } else {
108 *nwrote = count;
109 return 0;
110 }
111 } else {
112 if (!write_sent_) {
113 write_sent_ = true;
114 pp::Module::Get()->core()->CallOnMainThread(0,
115 factory_.NewCallback(&TCPSocket::Write,
116 reinterpret_cast<int32_t*>(NULL)));
117 }
118 *nwrote = count;
119 return 0;
120 }
121 }
122
123 int TCPSocket::fcntl(int cmd, va_list ap) {
124 if (cmd == F_GETFL) {
125 return oflag_;
126 } else if (cmd == F_SETFL) {
127 oflag_ = va_arg(ap, long);
128 return 0;
129 } else {
130 return -1;
131 }
132 }
133
134 bool TCPSocket::is_read_ready() {
135 return !is_open() || !in_buf_.empty();
136 }
137
138 bool TCPSocket::is_write_ready() {
139 return !is_open() || out_buf_.size() < kBufSize;
140 }
141
142 bool TCPSocket::is_exception() {
143 return !is_open();
144 }
145
146 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
147 int32_t* pres) {
148 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
149 assert(!socket_);
150 socket_ = new pp::TCPSocketPrivate(sys->instance());
151 *pres = socket_->Connect(host, port,
152 factory_.NewCallback(&TCPSocket::OnConnect, pres));
153 delete lock_;
154 if (*pres != PP_OK_COMPLETIONPENDING)
155 {
156 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
157 sys->cond().broadcast();
158 delete lock;
159 }
160 }
161
162 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
163 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
164 if (result == PP_OK) {
165 Read(PP_OK, NULL);
166 } else {
167 delete socket_;
168 socket_ = NULL;
169 }
170 *pres = result;
171 delete lock_;
172 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
173 sys->cond().broadcast();
174 delete lock;
175 }
176
177 void TCPSocket::Read(int32_t result, int32_t* pres) {
178 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
179 if (!is_open())
180 return;
181
182 result = socket_->Read(&read_buf_[0], read_buf_.size(),
183 factory_.NewCallback(&TCPSocket::OnRead, pres));
184 delete lock_;
185 if (result != PP_OK_COMPLETIONPENDING) {
186 delete socket_;
187 socket_ = NULL;
188 if (pres)
189 *pres = result;
190 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
191 sys->cond().broadcast();
192 delete lock;
193 }
194 }
195
196 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
197 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
198 if (!is_open())
199 return;
200
201 if (result > 0) {
202 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
203 Read(PP_OK, NULL);
204 } else {
205 delete socket_;
206 socket_ = NULL;
207 }
208 if (pres)
209 *pres = result;
210 delete lock_;
211 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
212 sys->cond().broadcast();
213 delete lock;
214 }
215
216 void TCPSocket::Write(int32_t result, int32_t* pres) {
217 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
218 if (!is_open())
219 return;
220
221 if (write_buf_.size()) {
222 // Previous write operation is in progress.
223 pp::Module::Get()->core()->CallOnMainThread(1,
224 factory_.NewCallback(&TCPSocket::Write, &result));
225 return;
226 }
227 assert(out_buf_.size());
228 write_buf_.swap(out_buf_);
229 result = socket_->Write(&write_buf_[0], write_buf_.size(),
230 factory_.NewCallback(&TCPSocket::OnWrite, pres));
231 delete lock_;
232 if (result != PP_OK_COMPLETIONPENDING) {
233 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
234 LOG("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
235 delete socket_;
236 socket_ = NULL;
237 delete lock_;
238 if (pres)
239 *pres = result;
240 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
241 sys->cond().broadcast();
242 delete lock;
243 }
244 write_sent_ = false;
245 }
246
247 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
248 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
249 if (!is_open())
250 return;
251
252 if (result < 0 || (size_t)result > write_buf_.size()) {
253 // Write error.
254 LOG("TCPSocket::OnWrite: close socket\n");
255 delete socket_;
256 socket_ = NULL;
257 } else if ((size_t)result < write_buf_.size()) {
258 // Partial write. Insert remaining bytes at the beginning of out_buf_.
259 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
260 }
261 if (pres)
262 *pres = result;
263 write_buf_.clear();
264 delete lock_;
265 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
266 sys->cond().broadcast();
267 delete lock;
268 }
269
270 void TCPSocket::Close(int32_t result, int32_t* pres) {
271 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
272 delete socket_;
273 socket_ = NULL;
274 if (pres)
275 *pres = PP_OK;
276 delete lock_;
277 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
278 sys->cond().broadcast();
279 delete lock;
280 }
281
282 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
283 Mutex::Lock* lock_ = new Mutex::Lock(mutex_);
284 assert(!socket_);
285 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
286 Read(PP_OK, NULL);
287 *pres = PP_OK;
288 delete lock_;
289 Mutex::Lock* lock = new Mutex::Lock(sys->mutex());
290 sys->cond().broadcast();
291 delete lock;
292 return true;
293 }
294
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698