Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: libraries/nacl-mounts/net/TcpSocket.cc

Issue 10392070: Socket subsystem implementation (Closed) Base URL: http://naclports.googlecode.com/svn/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « libraries/nacl-mounts/net/TcpSocket.h ('k') | libraries/nacl-mounts/util/SlotAllocator.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/TcpSocket.h"
6
7 #include <assert.h>
8 #include <string.h>
9
10 #include "net/SocketSubSystem.h"
11 #include "ppapi/c/pp_errors.h"
12 #include "ppapi/cpp/module.h"
13 #include "util/DebugPrint.h"
14
15 TCPSocket::TCPSocket(BaseSocketSubSystem* sys, int oflag)
Evgeniy Stepanov 2012/05/25 15:08:08 Make it SocketSubSustem. This will need to be ref
vissi 2012/05/28 13:34:26 Done.
16 : ref_(1), oflag_(oflag), factory_(this), socket_(NULL),
17 read_buf_(kBufSize), write_sent_(false), sys(sys) {
18 }
19
20 TCPSocket::~TCPSocket() {
21 assert(!socket_);
22 assert(!ref_);
23 }
24
25 void TCPSocket::addref() {
26 ++ref_;
27 }
28
29 void TCPSocket::release() {
30 if (!--ref_)
31 delete this;
32 }
33
34 bool TCPSocket::connect(const char* host, uint16_t port) {
35 int32_t result = PP_OK_COMPLETIONPENDING;
36 pp::Module::Get()->core()->CallOnMainThread(0,
37 factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
38 dbgprintf("host: %s, port: %d\n", host, port);
39 while (result == PP_OK_COMPLETIONPENDING)
40 sys->cond().wait(sys->mutex());
41 return result == PP_OK;
42 }
43
44 bool TCPSocket::acceptFrom(PP_Resource resource) {
45 int32_t result = PP_OK_COMPLETIONPENDING;
46 pp::Module::Get()->core()->CallOnMainThread(0,
47 factory_.NewCallback(&TCPSocket::Accept, resource, &result));
48 while (result == PP_OK_COMPLETIONPENDING)
49 sys->cond().wait(sys->mutex());
50 return result == PP_OK;
51 }
52
53 void TCPSocket::close() {
54 if (socket_) {
55 int32_t result = PP_OK_COMPLETIONPENDING;
56 pp::Module::Get()->core()->CallOnMainThread(0,
57 factory_.NewCallback(&TCPSocket::Close, &result));
58 while (result == PP_OK_COMPLETIONPENDING)
59 sys->cond().wait(sys->mutex());
60 }
61 }
62
63 int TCPSocket::read(char* buf, size_t count, size_t* nread) {
64 if (!is_open())
65 return EIO;
66
67 if (is_block()) {
68 while (in_buf_.empty() && is_open())
69 sys->cond().wait(sys->mutex());
70 }
71
72 *nread = 0;
73 while (*nread < count) {
74 if (in_buf_.empty())
75 break;
76
77 buf[(*nread)++] = in_buf_.front();
78 in_buf_.pop_front();
79 }
80
81 if (*nread == 0) {
82 if (!is_open()) {
83 return 0;
84 } else {
85 *nread = -1;
86 return EAGAIN;
87 }
88 }
89
90 return 0;
91 }
92
93 int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
94 if (!is_open())
95 return EIO;
96
97 out_buf_.insert(out_buf_.end(), buf, buf + count);
98 if (is_block()) {
99 int32_t result = PP_OK_COMPLETIONPENDING;
100 pp::Module::Get()->core()->CallOnMainThread(0,
101 factory_.NewCallback(&TCPSocket::Write, &result));
102 while (result == PP_OK_COMPLETIONPENDING)
103 sys->cond().wait(sys->mutex());
104 if ((size_t)result != count) {
105 *nwrote = -1;
106 return EIO;
107 } else {
108 *nwrote = count;
109 return 0;
110 }
111 } else {
112 if (!write_sent_) {
113 write_sent_ = true;
114 pp::Module::Get()->core()->CallOnMainThread(0,
115 factory_.NewCallback(&TCPSocket::Write,
116 reinterpret_cast<int32_t*>(NULL)));
117 }
118 *nwrote = count;
119 return 0;
120 }
121 }
122
123 int TCPSocket::fcntl(int cmd, va_list ap) {
124 if (cmd == F_GETFL) {
125 return oflag_;
126 } else if (cmd == F_SETFL) {
127 oflag_ = va_arg(ap, long);
128 return 0;
129 } else {
130 return -1;
131 }
132 }
133
134 bool TCPSocket::is_read_ready() {
135 return !is_open() || !in_buf_.empty();
136 }
137
138 bool TCPSocket::is_write_ready() {
139 return !is_open() || out_buf_.size() < kBufSize;
140 }
141
142 bool TCPSocket::is_exception() {
143 return !is_open();
144 }
145
146 void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
147 int32_t* pres) {
148 Mutex::Lock lock(sys->mutex());
149 assert(!socket_);
150 socket_ = new pp::TCPSocketPrivate(sys->instance());
151 *pres = socket_->Connect(host, port,
152 factory_.NewCallback(&TCPSocket::OnConnect, pres));
153 if (*pres != PP_OK_COMPLETIONPENDING)
154 sys->cond().broadcast();
155 }
156
157 void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
158 Mutex::Lock lock(sys->mutex());
159 if (result == PP_OK) {
160 Read(PP_OK, NULL);
161 } else {
162 delete socket_;
163 socket_ = NULL;
164 }
165 *pres = result;
166 sys->cond().broadcast();
167 }
168
169 void TCPSocket::Read(int32_t result, int32_t* pres) {
170 Mutex::Lock lock(sys->mutex());
171 if (!is_open())
172 return;
173
174 result = socket_->Read(&read_buf_[0], read_buf_.size(),
175 factory_.NewCallback(&TCPSocket::OnRead, pres));
176 if (result != PP_OK_COMPLETIONPENDING) {
177 delete socket_;
178 socket_ = NULL;
179 if (pres)
180 *pres = result;
181 sys->cond().broadcast();
182 }
183 }
184
185 void TCPSocket::OnRead(int32_t result, int32_t* pres) {
186 Mutex::Lock lock(sys->mutex());
187 if (!is_open())
188 return;
189
190 if (result > 0) {
191 in_buf_.insert(in_buf_.end(), &read_buf_[0], &read_buf_[0]+result);
192 Read(PP_OK, NULL);
193 } else {
194 delete socket_;
195 socket_ = NULL;
196 }
197 if (pres)
198 *pres = result;
199 sys->cond().broadcast();
200 }
201
202 void TCPSocket::Write(int32_t result, int32_t* pres) {
203 Mutex::Lock lock(sys->mutex());
204 if (!is_open())
205 return;
206
207 if (write_buf_.size()) {
208 // Previous write operation is in progress.
209 pp::Module::Get()->core()->CallOnMainThread(1,
210 factory_.NewCallback(&TCPSocket::Write, &result));
211 return;
212 }
213 assert(out_buf_.size());
214 write_buf_.swap(out_buf_);
215 result = socket_->Write(&write_buf_[0], write_buf_.size(),
216 factory_.NewCallback(&TCPSocket::OnWrite, pres));
217 if (result != PP_OK_COMPLETIONPENDING) {
218 LOG("TCPSocket::Write: failed %d %d\n", result, write_buf_.size());
219 delete socket_;
220 socket_ = NULL;
221 if (pres)
222 *pres = result;
223 sys->cond().broadcast();
224 }
225 write_sent_ = false;
226 }
227
228 void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
229 Mutex::Lock lock(sys->mutex());
230 if (!is_open())
231 return;
232
233 if (result < 0 || (size_t)result > write_buf_.size()) {
234 // Write error.
235 LOG("TCPSocket::OnWrite: close socket\n");
236 delete socket_;
237 socket_ = NULL;
238 } else if ((size_t)result < write_buf_.size()) {
239 // Partial write. Insert remaining bytes at the beginning of out_buf_.
240 out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
241 }
242 if (pres)
243 *pres = result;
244 write_buf_.clear();
245 sys->cond().broadcast();
246 }
247
248 void TCPSocket::Close(int32_t result, int32_t* pres) {
249 Mutex::Lock lock(sys->mutex());
250 delete socket_;
251 socket_ = NULL;
252 if (pres)
253 *pres = PP_OK;
254 sys->cond().broadcast();
255 }
256
257 bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
258 Mutex::Lock lock(sys->mutex());
259 assert(!socket_);
260 socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
261 Read(PP_OK, NULL);
262 *pres = PP_OK;
263 sys->cond().broadcast();
264 return true;
265 }
266
OLDNEW
« no previous file with comments | « libraries/nacl-mounts/net/TcpSocket.h ('k') | libraries/nacl-mounts/util/SlotAllocator.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698