Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(233)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifdef __GLIBC__
6 #include "net/TcpSocket.h"
7
8 #include <assert.h>
9 #include <string.h>
10
11 #include "net/SocketSubSystem.h"
12 #include "ppapi/c/pp_errors.h"
13 #include "ppapi/cpp/module.h"
14 #include "util/DebugPrint.h"
15
16 TCPSocket::TCPSocket(SocketSubSystem* sys, int oflag)
17 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
18 read_buf_(kBufSize), write_sent_(false), sys_(sys) {
19 }
20
21 TCPSocket::~TCPSocket() {
22 assert(!socket_);
23 assert(!ref_);
24 }
25
26 void TCPSocket::addref() {
27 ++ref_;
28 }
29
30 void TCPSocket::release() {
31 if (!--ref_)
32 delete this;
33 }
34
35 bool TCPSocket::connect(const char* host, uint16_t port) {
36 int32_t result = PP_OK_COMPLETIONPENDING;
37 pp::Module::Get()->core()->CallOnMainThread(0,
38 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
39 dbgprintf("host: %s, port: %d\n", host, port);
Dmitry Polukhin 2012/05/31 14:06:45 Don't use debug functions directly use macro that
Evgeniy Stepanov 2012/06/01 07:59:23 AFAIK we don't have such macro atm. I've been usin
vissi 2012/06/01 09:12:57 Left as is for now.
40 while (result == PP_OK_COMPLETIONPENDING) {
41 sys_->cond().wait(sys_->mutex());
42 }
43 return result == PP_OK;
44 }
45
46 bool TCPSocket::acceptFrom(PP_Resource resource) {
47 int32_t result = PP_OK_COMPLETIONPENDING;
48 pp::Module::Get()->core()->CallOnMainThread(0,
49 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
50 while (result == PP_OK_COMPLETIONPENDING) {
51 sys_->cond().wait(sys_->mutex());
52 }
53 return result == PP_OK;
54 }
55
56 void TCPSocket::close() {
57 if (socket_) {
58 int32_t result = PP_OK_COMPLETIONPENDING;
59 pp::Module::Get()->core()->CallOnMainThread(0,
60 factory_.NewCallback(&TCPSocket::Close, &result));
61 while (result == PP_OK_COMPLETIONPENDING) {
62 sys_->cond().wait(sys_->mutex());
63 }
64 }
65 }
66
67 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
68 Mutex::Lock lock(sys_->mutex());
69 if (!is_open())
70 return EIO;
71
72 if (is_block()) {
73 while (in_buf_.empty() && is_open())
74 sys_->cond().wait(sys_->mutex());
75 }
76
77 *nread = 0;
78 while (*nread < count) {
79 if (in_buf_.empty())
80 break;
81
82 buf[(*nread)++] = in_buf_.front();
83 in_buf_.pop_front();
84 }
85
86 if (*nread == 0) {
87 if (!is_open()) {
88 return 0;
89 } else {
90 *nread = -1;
91 return EAGAIN;
92 }
93 }
94
95 return 0;
96 }
97
98 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
99 Mutex::Lock lock(sys_->mutex());
100 if (!is_open())
101 return EIO;
102
103 out_buf_.insert(out_buf_.end(), buf, buf + count);
104 if (is_block()) {
105 int32_t result = PP_OK_COMPLETIONPENDING;
106 pp::Module::Get()->core()->CallOnMainThread(0,
107 factory_.NewCallback(&TCPSocket::Write, &result));
108 while (result == PP_OK_COMPLETIONPENDING)
109 sys_->cond().wait(sys_->mutex());
110 if ((size_t)result != count) {
111 *nwrote = -1;
112 return EIO;
113 } else {
114 *nwrote = count;
115 return 0;
116 }
117 } else {
118 if (!write_sent_) {
119 write_sent_ = true;
120 pp::Module::Get()->core()->CallOnMainThread(0,
121 factory_.NewCallback(&TCPSocket::Write,
122 reinterpret_cast<int32_t*>(NULL)));
123 }
124 *nwrote = count;
125 return 0;
126 }
127 }
128
129 int TCPSocket::fcntl(int cmd, va_list ap) {
130 if (cmd == F_GETFL) {
131 return oflag_;
132 } else if (cmd == F_SETFL) {
133 oflag_ = va_arg(ap, long);
134 return 0;
135 } else {
136 return -1;
137 }
138 }
139
140 bool TCPSocket::is_read_ready() {
141 return !is_open() || !in_buf_.empty();
142 }
143
144 bool TCPSocket::is_write_ready() {
145 return !is_open() || out_buf_.size() < kBufSize;
146 }
147
148 bool TCPSocket::is_exception() {
149 return !is_open();
150 }
151
152 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
153 int32_t* pres) {
154 Mutex::Lock lock(sys_->mutex());
155 assert(!socket_);
156 socket_ = new pp::TCPSocketPrivate(sys_->instance());
157 *pres = socket_->Connect(host, port,
158 factory_.NewCallback(&TCPSocket::OnConnect, pres));
159 if (*pres != PP_OK_COMPLETIONPENDING) {
160 sys_->cond().broadcast();
161 }
162 }
163
164 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
165 Mutex::Lock lock(sys_->mutex());
166 if (result == PP_OK) {
167 Read(PP_OK, NULL);
168 } else {
169 delete socket_;
170 socket_ = NULL;
171 }
172 *pres = result;
173 sys_->cond().broadcast();
174 }
175
176 void TCPSocket::Read(int32_t result, int32_t* pres) {
177 Mutex::Lock lock(sys_->mutex());
178 if (!is_open())
179 return;
180
181 result = socket_->Read(&read_buf_[0], read_buf_.size(),
182 factory_.NewCallback(&TCPSocket::OnRead, pres));
183 if (result != PP_OK_COMPLETIONPENDING) {
184 delete socket_;
185 socket_ = NULL;
186 if (pres)
187 *pres = result;
188 sys_->cond().broadcast();
189 }
190 }
191
192 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
193 Mutex::Lock lock(sys_->mutex());
194 if (!is_open())
195 return;
196
197 if (result > 0) {
198 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
199 Read(PP_OK, NULL);
200 } else {
201 delete socket_;
202 socket_ = NULL;
203 }
204 if (pres)
205 *pres = result;
206 sys_->cond().broadcast();
207 }
208
209 void TCPSocket::Write(int32_t result, int32_t* pres) {
210 Mutex::Lock lock(sys_->mutex());
211 if (!is_open())
212 return;
213
214 if (write_buf_.size()) {
215 // Previous write operation is in progress.
216 pp::Module::Get()->core()->CallOnMainThread(1,
217 factory_.NewCallback(&TCPSocket::Write, &result));
218 return;
219 }
220 assert(out_buf_.size());
221 write_buf_.swap(out_buf_);
222 result = socket_->Write(&write_buf_[0], write_buf_.size(),
223 factory_.NewCallback(&TCPSocket::OnWrite, pres));
224 if (result != PP_OK_COMPLETIONPENDING) {
225 LOG("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
226 delete socket_;
227 socket_ = NULL;
228 if (pres)
229 *pres = result;
230 sys_->cond().broadcast();
231 }
232 write_sent_ = false;
233 }
234
235 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
236 Mutex::Lock lock(sys_->mutex());
237 if (!is_open())
238 return;
239
240 if (result < 0 || (size_t)result > write_buf_.size()) {
241 // Write error.
242 LOG("TCPSocket::OnWrite: close socket\n");
243 delete socket_;
244 socket_ = NULL;
245 } else if ((size_t)result < write_buf_.size()) {
246 // Partial write. Insert remaining bytes at the beginning of out_buf_.
247 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
248 }
249 if (pres)
250 *pres = result;
251 write_buf_.clear();
252 sys_->cond().broadcast();
253 }
254
255 void TCPSocket::Close(int32_t result, int32_t* pres) {
256 Mutex::Lock lock(sys_->mutex());
257 delete socket_;
258 socket_ = NULL;
259 if (pres)
260 *pres = PP_OK;
261 sys_->cond().broadcast();
262 }
263
264 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
265 Mutex::Lock lock(sys_->mutex());
266 assert(!socket_);
267 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
268 Read(PP_OK, NULL);
269 *pres = PP_OK;
270 sys_->cond().broadcast();
271 return true;
272 }
273
274 #endif // __GLIBC__
275
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698