Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/TcpSocket.h"
6
7 #include <assert.h>
8 #include <string.h>
9 #ifdef __GLIBC__
10
11 #include "../net/SocketSubSystem.h"
12 #include "ppapi/c/pp_errors.h"
13 #include "ppapi/cpp/module.h"
14 #include "../util/PthreadHelpers.h"
15 #include "../util/DebugPrint.h"
16
17 TCPSocket::TCPSocket(SocketSubSystem* sys, int oflag)
18 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
19 read_buf_(kBufSize), write_sent_(false), sys_(sys) {
20 }
21
22 TCPSocket::~TCPSocket() {
23 assert(!socket_);
24 assert(!ref_);
25 }
26
27 void TCPSocket::addref() {
28 ++ref_;
29 }
30
31 void TCPSocket::release() {
32 if (!--ref_)
33 delete this;
34 }
35
36 bool TCPSocket::connect(const char* host, uint16_t port) {
37 SimpleAutoLock lock(sys_->mutex());
38 int32_t result = PP_OK_COMPLETIONPENDING;
39 pp::Module::Get()->core()->CallOnMainThread(0,
40 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
41 dbgprintf("host: %s, port: %d\n", host, port);
42 while (result == PP_OK_COMPLETIONPENDING) {
43 sys_->cond().wait(sys_->mutex());
44 }
45 return result == PP_OK;
46 }
47
48 bool TCPSocket::acceptFrom(PP_Resource resource) {
49 SimpleAutoLock lock(sys_->mutex());
50 int32_t result = PP_OK_COMPLETIONPENDING;
51 pp::Module::Get()->core()->CallOnMainThread(0,
52 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
53 while (result == PP_OK_COMPLETIONPENDING) {
54 sys_->cond().wait(sys_->mutex());
55 }
56 return result == PP_OK;
57 }
58
59 void TCPSocket::close() {
60 SimpleAutoLock lock(sys_->mutex());
61 if (socket_) {
62 int32_t result = PP_OK_COMPLETIONPENDING;
63 pp::Module::Get()->core()->CallOnMainThread(0,
64 factory_.NewCallback(&TCPSocket::Close, &result));
65 while (result == PP_OK_COMPLETIONPENDING) {
66 sys_->cond().wait(sys_->mutex());
67 }
68 }
69 }
70
71 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
72 SimpleAutoLock lock(sys_->mutex());
73 if (!is_open())
74 return EIO;
75
76 if (is_block()) {
77 while (in_buf_.empty() && is_open())
78 sys_->cond().wait(sys_->mutex());
79 }
80
81 *nread = 0;
82 while (*nread < count) {
83 if (in_buf_.empty())
84 break;
85
86 buf[(*nread)++] = in_buf_.front();
87 in_buf_.pop_front();
88 }
89
90 if (*nread == 0) {
91 if (!is_open()) {
92 return 0;
93 } else {
94 *nread = -1;
95 return EAGAIN;
96 }
97 }
98
99 return 0;
100 }
101
102 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
103 SimpleAutoLock lock(sys_->mutex());
104 if (!is_open())
105 return EIO;
106
107 out_buf_.insert(out_buf_.end(), buf, buf + count);
108 if (is_block()) {
109 int32_t result = PP_OK_COMPLETIONPENDING;
110 pp::Module::Get()->core()->CallOnMainThread(0,
111 factory_.NewCallback(&TCPSocket::Write, &result));
112 while (result == PP_OK_COMPLETIONPENDING)
113 sys_->cond().wait(sys_->mutex());
114 if ((size_t)result != count) {
115 *nwrote = -1;
116 return EIO;
117 } else {
118 *nwrote = count;
119 return 0;
120 }
121 } else {
122 if (!write_sent_) {
123 write_sent_ = true;
124 pp::Module::Get()->core()->CallOnMainThread(0,
125 factory_.NewCallback(&TCPSocket::Write,
126 reinterpret_cast<int32_t*>(NULL)));
127 }
128 *nwrote = count;
129 return 0;
130 }
131 }
132
133 int TCPSocket::fcntl(int cmd, va_list ap) {
134 if (cmd == F_GETFL) {
135 return oflag_;
136 } else if (cmd == F_SETFL) {
137 oflag_ = va_arg(ap, long);
138 return 0;
139 } else {
140 return -1;
141 }
142 }
143
144 bool TCPSocket::is_read_ready() {
145 return !is_open() || !in_buf_.empty();
146 }
147
148 bool TCPSocket::is_write_ready() {
149 return !is_open() || out_buf_.size() < kBufSize;
150 }
151
152 bool TCPSocket::is_exception() {
153 return !is_open();
154 }
155
156 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
157 int32_t* pres) {
158 SimpleAutoLock lock(sys_->mutex());
159 assert(!socket_);
160 socket_ = new pp::TCPSocketPrivate(sys_->instance());
161 *pres = socket_->Connect(host, port,
162 factory_.NewCallback(&TCPSocket::OnConnect, pres));
163 if (*pres != PP_OK_COMPLETIONPENDING) {
164 sys_->cond().broadcast();
165 }
166 }
167
168 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
169 SimpleAutoLock lock(sys_->mutex());
170 if (result == PP_OK) {
171 Read(PP_OK, NULL);
172 } else {
173 delete socket_;
174 socket_ = NULL;
175 }
176 *pres = result;
177 sys_->cond().broadcast();
178 }
179
180 void TCPSocket::Read(int32_t result, int32_t* pres) {
181 SimpleAutoLock lock(sys_->mutex());
182 if (!is_open())
183 return;
184
185 result = socket_->Read(&read_buf_[0], read_buf_.size(),
186 factory_.NewCallback(&TCPSocket::OnRead, pres));
187 if (result != PP_OK_COMPLETIONPENDING) {
188 delete socket_;
189 socket_ = NULL;
190 if (pres)
191 *pres = result;
192 sys_->cond().broadcast();
193 }
194 }
195
196 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
197 SimpleAutoLock lock(sys_->mutex());
198 if (!is_open())
199 return;
200
201 if (result > 0) {
202 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
203 Read(PP_OK, NULL);
204 } else {
205 delete socket_;
206 socket_ = NULL;
207 }
208 if (pres)
209 *pres = result;
210 sys_->cond().broadcast();
211 }
212
213 void TCPSocket::Write(int32_t result, int32_t* pres) {
214 SimpleAutoLock lock(sys_->mutex());
215 if (!is_open())
216 return;
217
218 if (write_buf_.size()) {
219 // Previous write operation is in progress.
220 pp::Module::Get()->core()->CallOnMainThread(1,
221 factory_.NewCallback(&TCPSocket::Write, &result));
222 return;
223 }
224 assert(out_buf_.size());
225 write_buf_.swap(out_buf_);
226 result = socket_->Write(&write_buf_[0], write_buf_.size(),
227 factory_.NewCallback(&TCPSocket::OnWrite, pres));
228 if (result != PP_OK_COMPLETIONPENDING) {
229 dbgprintf("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
230 delete socket_;
231 socket_ = NULL;
232 if (pres)
233 *pres = result;
234 sys_->cond().broadcast();
235 }
236 write_sent_ = false;
237 }
238
239 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
240 SimpleAutoLock lock(sys_->mutex());
241 if (!is_open())
242 return;
243
244 if (result < 0 || (size_t)result > write_buf_.size()) {
245 // Write error.
246 dbgprintf("TCPSocket::OnWrite: close socket\n");
247 delete socket_;
248 socket_ = NULL;
249 } else if ((size_t)result < write_buf_.size()) {
250 // Partial write. Insert remaining bytes at the beginning of out_buf_.
251 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
252 }
253 if (pres)
254 *pres = result;
255 write_buf_.clear();
256 sys_->cond().broadcast();
257 }
258
259 void TCPSocket::Close(int32_t result, int32_t* pres) {
260 SimpleAutoLock lock(sys_->mutex());
261 delete socket_;
262 socket_ = NULL;
263 if (pres)
264 *pres = PP_OK;
265 sys_->cond().broadcast();
266 }
267
268 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
269 SimpleAutoLock lock(sys_->mutex());
270 assert(!socket_);
271 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
272 Read(PP_OK, NULL);
273 *pres = PP_OK;
274 sys_->cond().broadcast();
275 return true;
276 }
277
278 #endif // __GLIBC__
279
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698