Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(473)

Unified Diff: media/cast/transport/transport/transport.cc

Issue 125713002: Implement UdpTransport for Cast (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/cast/transport/transport/transport.cc
diff --git a/media/cast/transport/transport/transport.cc b/media/cast/transport/transport/transport.cc
index dfa155a4c72cad8e3cb46f3117e50ae0e5aff560..3050e8b5d1a61f0a3a565f6361b59dd6c3632397 100644
--- a/media/cast/transport/transport/transport.cc
+++ b/media/cast/transport/transport/transport.cc
@@ -4,6 +4,7 @@
#include "media/cast/transport/transport/transport.h"
+#include <algorithm>
#include <string>
#include "base/bind.h"
@@ -20,184 +21,131 @@ namespace media {
namespace cast {
namespace transport {
+namespace {
const int kMaxPacketSize = 1500;
-class LocalUdpTransportData;
-
-void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) {
- net::IPAddressNumber ip_number;
- bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number);
- if (!rv)
- return;
- *address = net::IPEndPoint(ip_number, port);
+bool IsEmpty(const net::IPEndPoint& addr) {
+ net::IPAddressNumber empty_addr(addr.address().size());
+ return std::equal(empty_addr.begin(),
+ empty_addr.end(),
+ addr.address().begin());
}
-class LocalUdpTransportData
- : public base::RefCountedThreadSafe<LocalUdpTransportData> {
- public:
- LocalUdpTransportData(net::UDPServerSocket* udp_socket,
- scoped_refptr<base::TaskRunner> io_thread_proxy)
- : udp_socket_(udp_socket),
- buffer_(new net::IOBufferWithSize(kMaxPacketSize)),
- io_thread_proxy_(io_thread_proxy) {
- }
-
- void ListenTo(net::IPEndPoint bind_address) {
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
+ return addr1.port() == addr2.port() &&
+ std::equal(addr1.address().begin(),
+ addr1.address().end(),
+ addr2.address().begin());
+}
- bind_address_ = bind_address;
- io_thread_proxy_->PostTask(FROM_HERE,
- base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this));
- }
+} // namespace
+
+UdpTransport::UdpTransport(
+ const scoped_refptr<base::TaskRunner>& io_thread_proxy,
+ const net::IPEndPoint& local_end_point,
+ const net::IPEndPoint& remote_end_point)
+ : io_thread_proxy_(io_thread_proxy),
+ local_addr_(local_end_point),
+ remote_addr_(remote_end_point),
+ udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())),
+ recv_buf_(new net::IOBuffer(kMaxPacketSize)),
+ packet_receiver_(NULL),
+ weak_factory_(this) {
+}
- void DeletePacket(uint8* data) {
- // Should be called from the receiver (not on the transport thread).
- DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread()));
- delete [] data;
- }
+UdpTransport::~UdpTransport() {
+}
- void PacketReceived(int size) {
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- // Got a packet with length result.
- uint8* data = new uint8[size];
- memcpy(data, buffer_->data(), size);
- packet_receiver_->ReceivedPacket(data, size,
- base::Bind(&LocalUdpTransportData::DeletePacket, this, data));
- RecvFromSocketLoop();
+void UdpTransport::StartReceiving(PacketReceiver* packet_receiver) {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+ DCHECK(!packet_receiver_);
- }
+ packet_receiver_ = packet_receiver;
+ udp_socket_->AllowAddressReuse();
+ udp_socket_->SetMulticastLoopbackMode(true);
+ udp_socket_->Listen(local_addr_);
+ ReceiveOnePacket();
+}
- void RecvFromSocketLoop() {
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- // Callback should always trigger with a packet.
- int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize,
- &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived,
- this));
- DCHECK(res >= net::ERR_IO_PENDING);
- if (res > 0) {
- PacketReceived(res);
- }
- }
+void UdpTransport::ReceiveOnePacket() {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- void set_packet_receiver(PacketReceiver* packet_receiver) {
- packet_receiver_ = packet_receiver;
+ int result = udp_socket_->RecvFrom(
+ recv_buf_,
+ kMaxPacketSize,
+ &recv_addr_,
+ base::Bind(&UdpTransport::OnReceived,
+ weak_factory_.GetWeakPtr()));
mikhal1 2014/01/07 20:50:06 nit:Can't this be in one line?
Alpha Left Google 2014/01/08 00:37:11 Done.
+ if (result > 0) {
+ OnReceived(result);
+ } else if (result != net::ERR_IO_PENDING) {
+ LOG(ERROR) << "Failed to receive packet: " << result << "."
+ << " Stop receiving packets.";
}
+}
- void Close() {
- udp_socket_->Close();
- }
+void UdpTransport::OnReceived(int result) {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- protected:
- virtual ~LocalUdpTransportData() {}
-
- private:
- friend class base::RefCountedThreadSafe<LocalUdpTransportData>;
-
- net::UDPServerSocket* udp_socket_;
- net::IPEndPoint bind_address_;
- PacketReceiver* packet_receiver_;
- scoped_refptr<net::IOBufferWithSize> buffer_;
- scoped_refptr<base::TaskRunner> io_thread_proxy_;
-
- DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData);
-};
-
-class LocalPacketSender : public PacketSender,
- public base::RefCountedThreadSafe<LocalPacketSender> {
- public:
- LocalPacketSender(net::UDPServerSocket* udp_socket,
- scoped_refptr<base::TaskRunner> io_thread_proxy)
- : udp_socket_(udp_socket),
- send_address_(),
- io_thread_proxy_(io_thread_proxy) {}
-
- virtual bool SendPacket(const Packet& packet) OVERRIDE {
- io_thread_proxy_->PostTask(FROM_HERE,
- base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet));
- return true;
+ if (result < 0) {
+ LOG(ERROR) << "Failed to receive packet: " << result << "."
+ << " Stop receiving packets.";
+ return;
}
- virtual void SendPacketToNetwork(const Packet& packet) {
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- const uint8* data = &packet[0];
- scoped_refptr<net::WrappedIOBuffer> buffer(
- new net::WrappedIOBuffer(reinterpret_cast<const char*>(data)));
- udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()),
- send_address_,
- base::Bind(&LocalPacketSender::OnSendCompleted,
- base::Unretained(this)));
+ if (IsEmpty(remote_addr_)) {
+ remote_addr_ = recv_addr_;
+ VLOG(1) << "First packet received from: "
+ << remote_addr_.ToString() << ".";
+ } else if (!IsEqual(remote_addr_, recv_addr_)) {
+ VLOG(1) << "Received from an unrecognized address: "
+ << recv_addr_.ToString() << ".";
+ return;
}
- virtual void OnSendCompleted(int result) {
- if (result < 0) {
- // TODO(mikhal): Add to error messages.
- VLOG(0) << "Send failed on UDP socket : " << result;
- }
- }
+ // TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy.
+ uint8* data = new uint8[result];
+ memcpy(data, recv_buf_->data(), result);
+ packet_receiver_->ReceivedPacket(
+ data,
+ result,
+ base::Bind(&PacketReceiver::DeletePacket, data));
+ ReceiveOnePacket();
+}
- virtual bool SendPackets(const PacketList& packets) OVERRIDE {
- bool out_val = true;
- for (size_t i = 0; i < packets.size(); ++i) {
- const Packet& packet = packets[i];
- out_val |= SendPacket(packet);
- }
- return out_val;
- }
+bool UdpTransport::SendPackets(const PacketList& packets) {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- void SetSendAddress(const net::IPEndPoint& send_address) {
- send_address_ = send_address;
+ bool result = true;
+ for (size_t i = 0; i < packets.size(); ++i) {
+ result |= SendPacket(packets[i]);
}
-
- protected:
- virtual ~LocalPacketSender() {}
-
- private:
- friend class base::RefCountedThreadSafe<LocalPacketSender>;
-
- net::UDPServerSocket* udp_socket_; // Not owned by this class.
- net::IPEndPoint send_address_;
- scoped_refptr<base::TaskRunner> io_thread_proxy_;
-};
-
-Transport::Transport(
- scoped_refptr<base::TaskRunner> io_thread_proxy)
- : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())),
- local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(),
- io_thread_proxy)),
- packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)),
- io_thread_proxy_(io_thread_proxy) {}
-
-Transport::~Transport() {}
-
-PacketSender* Transport::packet_sender() {
- return static_cast<PacketSender*>(packet_sender_.get());
+ return result;
}
-void Transport::StopReceiving() {
- local_udp_transport_data_->Close();
-}
-
-void Transport::SetLocalReceiver(PacketReceiver* packet_receiver,
- std::string ip_address,
- std::string local_ip_address,
- int port) {
+bool UdpTransport::SendPacket(const Packet& packet) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- net::IPEndPoint bind_address, local_bind_address;
- CreateUDPAddress(ip_address, port, &bind_address);
- CreateUDPAddress(local_ip_address, port, &local_bind_address);
- local_udp_transport_data_->set_packet_receiver(packet_receiver);
- udp_socket_->AllowAddressReuse();
- udp_socket_->SetMulticastLoopbackMode(true);
- udp_socket_->Listen(local_bind_address);
- // Start listening once receiver has been set.
- local_udp_transport_data_->ListenTo(bind_address);
+ // TODO(hclam): This interface should take a net::IOBuffer to minimize
+ // memcpy.
+ scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(packet.size());
+ memcpy(buf->data(), &packet[0], packet.size());
+ bool ret = udp_socket_->SendTo(
+ buf,
+ static_cast<int>(packet.size()),
+ remote_addr_,
+ base::Bind(&UdpTransport::OnSent,
+ weak_factory_.GetWeakPtr(), buf));
+ return ret == net::OK;
}
-void Transport::SetSendDestination(std::string ip_address, int port) {
- net::IPEndPoint send_address;
- CreateUDPAddress(ip_address, port, &send_address);
- packet_sender_->SetSendAddress(send_address);
+void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
+ int result) {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+
+ if (result < 0) {
+ VLOG(1) << "Failed to send packet: " << result << ".";
+ }
}
} // namespace transport

Powered by Google App Engine
This is Rietveld 408576698