Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/TcpSocket.h"
6
7 #include <assert.h>
8 #include <string.h>
9
10 #include "net/SocketSubSystem.h"
11 #include "ppapi/c/pp_errors.h"
12 #include "ppapi/cpp/module.h"
13 #include "util/DebugPrint.h"
14
15 TCPSocket::TCPSocket(SocketSubSystem* sys, int oflag)
16 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
17 read_buf_(kBufSize), write_sent_(false), sys(sys) {
18 }
19
20 TCPSocket::~TCPSocket() {
21 assert(!socket_);
22 assert(!ref_);
23 }
24
25 void TCPSocket::addref() {
26 ++ref_;
27 }
28
29 void TCPSocket::release() {
30 if (!--ref_)
31 delete this;
32 }
33
34 bool TCPSocket::connect(const char* host, uint16_t port) {
35 int32_t result = PP_OK_COMPLETIONPENDING;
36 pp::Module::Get()->core()->CallOnMainThread(0,
37 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
38 dbgprintf("host: %s, port: %d\n", host, port);
39 while (result == PP_OK_COMPLETIONPENDING) {
40 sys->cond().wait(sys->mutex());
41 }
42 return result == PP_OK;
43 }
44
45 bool TCPSocket::acceptFrom(PP_Resource resource) {
46 int32_t result = PP_OK_COMPLETIONPENDING;
47 pp::Module::Get()->core()->CallOnMainThread(0,
48 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
49 while (result == PP_OK_COMPLETIONPENDING) {
50 sys->cond().wait(sys->mutex());
51 }
52 return result == PP_OK;
53 }
54
55 void TCPSocket::close() {
56 if (socket_) {
57 int32_t result = PP_OK_COMPLETIONPENDING;
58 pp::Module::Get()->core()->CallOnMainThread(0,
59 factory_.NewCallback(&TCPSocket::Close, &result));
60 while (result == PP_OK_COMPLETIONPENDING) {
61 sys->cond().wait(sys->mutex());
62 }
63 }
64 }
65
66 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
67 if (!is_open())
68 return EIO;
69
70 if (is_block()) {
71 while (in_buf_.empty() && is_open())
72 sys->cond().wait(sys->mutex());
73 }
74
75 *nread = 0;
76 while (*nread < count) {
77 if (in_buf_.empty())
78 break;
79
80 buf[(*nread)++] = in_buf_.front();
81 in_buf_.pop_front();
82 }
83
84 if (*nread == 0) {
85 if (!is_open()) {
86 return 0;
87 } else {
88 *nread = -1;
89 return EAGAIN;
90 }
91 }
92
93 return 0;
94 }
95
96 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
97 if (!is_open())
98 return EIO;
99
100 out_buf_.insert(out_buf_.end(), buf, buf + count);
101 if (is_block()) {
102 int32_t result = PP_OK_COMPLETIONPENDING;
103 pp::Module::Get()->core()->CallOnMainThread(0,
104 factory_.NewCallback(&TCPSocket::Write, &result));
105 while (result == PP_OK_COMPLETIONPENDING)
106 sys->cond().wait(sys->mutex());
107 if ((size_t)result != count) {
108 *nwrote = -1;
109 return EIO;
110 } else {
111 *nwrote = count;
112 return 0;
113 }
114 } else {
115 if (!write_sent_) {
116 write_sent_ = true;
117 pp::Module::Get()->core()->CallOnMainThread(0,
118 factory_.NewCallback(&TCPSocket::Write,
119 reinterpret_cast<int32_t*>(NULL)));
120 }
121 *nwrote = count;
122 return 0;
123 }
124 }
125
126 int TCPSocket::fcntl(int cmd, va_list ap) {
127 if (cmd == F_GETFL) {
128 return oflag_;
129 } else if (cmd == F_SETFL) {
130 oflag_ = va_arg(ap, long);
131 return 0;
132 } else {
133 return -1;
134 }
135 }
136
137 bool TCPSocket::is_read_ready() {
138 return !is_open() || !in_buf_.empty();
139 }
140
141 bool TCPSocket::is_write_ready() {
142 return !is_open() || out_buf_.size() < kBufSize;
143 }
144
145 bool TCPSocket::is_exception() {
146 return !is_open();
147 }
148
149 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
150 int32_t* pres) {
151 Mutex::Lock lock(sys->mutex());
152 assert(!socket_);
153 socket_ = new pp::TCPSocketPrivate(sys->instance());
154 *pres = socket_->Connect(host, port,
155 factory_.NewCallback(&TCPSocket::OnConnect, pres));
156 if (*pres != PP_OK_COMPLETIONPENDING)
157 {
Dmitry Polukhin 2012/05/29 15:00:51 Please put { on the line with 'if'.
vissi 2012/05/30 08:10:51 Done.
158 sys->cond().broadcast();
159 }
160 }
161
162 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
163 Mutex::Lock lock(sys->mutex());
164 if (result == PP_OK) {
165 Read(PP_OK, NULL);
166 } else {
167 delete socket_;
168 socket_ = NULL;
169 }
170 *pres = result;
171 sys->cond().broadcast();
172 }
173
174 void TCPSocket::Read(int32_t result, int32_t* pres) {
175 Mutex::Lock lock(sys->mutex());
176 if (!is_open())
177 return;
178
179 result = socket_->Read(&read_buf_[0], read_buf_.size(),
180 factory_.NewCallback(&TCPSocket::OnRead, pres));
181 if (result != PP_OK_COMPLETIONPENDING) {
182 delete socket_;
183 socket_ = NULL;
184 if (pres)
185 *pres = result;
186 sys->cond().broadcast();
187 }
188 }
189
190 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
191 Mutex::Lock lock(sys->mutex());
192 if (!is_open())
193 return;
194
195 if (result > 0) {
196 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
197 Read(PP_OK, NULL);
198 } else {
199 delete socket_;
200 socket_ = NULL;
201 }
202 if (pres)
203 *pres = result;
204 sys->cond().broadcast();
205 }
206
207 void TCPSocket::Write(int32_t result, int32_t* pres) {
208 Mutex::Lock lock(sys->mutex());
209 if (!is_open())
210 return;
211
212 if (write_buf_.size()) {
213 // Previous write operation is in progress.
214 pp::Module::Get()->core()->CallOnMainThread(1,
215 factory_.NewCallback(&TCPSocket::Write, &result));
216 return;
217 }
218 assert(out_buf_.size());
219 write_buf_.swap(out_buf_);
220 result = socket_->Write(&write_buf_[0], write_buf_.size(),
221 factory_.NewCallback(&TCPSocket::OnWrite, pres));
222 if (result != PP_OK_COMPLETIONPENDING) {
223 LOG("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
224 delete socket_;
225 socket_ = NULL;
226 if (pres)
227 *pres = result;
228 sys->cond().broadcast();
229 }
230 write_sent_ = false;
231 }
232
233 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
234 Mutex::Lock lock(sys->mutex());
235 if (!is_open())
236 return;
237
238 if (result < 0 || (size_t)result > write_buf_.size()) {
239 // Write error.
240 LOG("TCPSocket::OnWrite: close socket\n");
241 delete socket_;
242 socket_ = NULL;
243 } else if ((size_t)result < write_buf_.size()) {
244 // Partial write. Insert remaining bytes at the beginning of out_buf_.
245 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
246 }
247 if (pres)
248 *pres = result;
249 write_buf_.clear();
250 sys->cond().broadcast();
251 }
252
253 void TCPSocket::Close(int32_t result, int32_t* pres) {
254 Mutex::Lock lock(sys->mutex());
255 delete socket_;
256 socket_ = NULL;
257 if (pres)
258 *pres = PP_OK;
259 sys->cond().broadcast();
260 }
261
262 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
263 Mutex::Lock lock(sys->mutex());
264 assert(!socket_);
265 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
266 Read(PP_OK, NULL);
267 *pres = PP_OK;
268 sys->cond().broadcast();
269 return true;
270 }
271
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698