Index: media/cast/transport/transport/transport.cc |
diff --git a/media/cast/transport/transport/transport.cc b/media/cast/transport/transport/transport.cc |
index dfa155a4c72cad8e3cb46f3117e50ae0e5aff560..3050e8b5d1a61f0a3a565f6361b59dd6c3632397 100644 |
--- a/media/cast/transport/transport/transport.cc |
+++ b/media/cast/transport/transport/transport.cc |
@@ -4,6 +4,7 @@ |
#include "media/cast/transport/transport/transport.h" |
+#include <algorithm> |
#include <string> |
#include "base/bind.h" |
@@ -20,184 +21,131 @@ namespace media { |
namespace cast { |
namespace transport { |
+namespace { |
const int kMaxPacketSize = 1500; |
-class LocalUdpTransportData; |
- |
-void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) { |
- net::IPAddressNumber ip_number; |
- bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number); |
- if (!rv) |
- return; |
- *address = net::IPEndPoint(ip_number, port); |
+bool IsEmpty(const net::IPEndPoint& addr) { |
+ net::IPAddressNumber empty_addr(addr.address().size()); |
+ return std::equal(empty_addr.begin(), |
+ empty_addr.end(), |
+ addr.address().begin()); |
} |
-class LocalUdpTransportData |
- : public base::RefCountedThreadSafe<LocalUdpTransportData> { |
- public: |
- LocalUdpTransportData(net::UDPServerSocket* udp_socket, |
- scoped_refptr<base::TaskRunner> io_thread_proxy) |
- : udp_socket_(udp_socket), |
- buffer_(new net::IOBufferWithSize(kMaxPacketSize)), |
- io_thread_proxy_(io_thread_proxy) { |
- } |
- |
- void ListenTo(net::IPEndPoint bind_address) { |
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
+bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) { |
+ return addr1.port() == addr2.port() && |
+ std::equal(addr1.address().begin(), |
+ addr1.address().end(), |
+ addr2.address().begin()); |
+} |
- bind_address_ = bind_address; |
- io_thread_proxy_->PostTask(FROM_HERE, |
- base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this)); |
- } |
+} // namespace |
+ |
+UdpTransport::UdpTransport( |
+ const scoped_refptr<base::TaskRunner>& io_thread_proxy, |
+ const net::IPEndPoint& local_end_point, |
+ const net::IPEndPoint& remote_end_point) |
+ : io_thread_proxy_(io_thread_proxy), |
+ local_addr_(local_end_point), |
+ remote_addr_(remote_end_point), |
+ udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), |
+ recv_buf_(new net::IOBuffer(kMaxPacketSize)), |
+ packet_receiver_(NULL), |
+ weak_factory_(this) { |
+} |
- void DeletePacket(uint8* data) { |
- // Should be called from the receiver (not on the transport thread). |
- DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread())); |
- delete [] data; |
- } |
+UdpTransport::~UdpTransport() { |
+} |
- void PacketReceived(int size) { |
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- // Got a packet with length result. |
- uint8* data = new uint8[size]; |
- memcpy(data, buffer_->data(), size); |
- packet_receiver_->ReceivedPacket(data, size, |
- base::Bind(&LocalUdpTransportData::DeletePacket, this, data)); |
- RecvFromSocketLoop(); |
+void UdpTransport::StartReceiving(PacketReceiver* packet_receiver) { |
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
+ DCHECK(!packet_receiver_); |
- } |
+ packet_receiver_ = packet_receiver; |
+ udp_socket_->AllowAddressReuse(); |
+ udp_socket_->SetMulticastLoopbackMode(true); |
+ udp_socket_->Listen(local_addr_); |
+ ReceiveOnePacket(); |
+} |
- void RecvFromSocketLoop() { |
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- // Callback should always trigger with a packet. |
- int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize, |
- &bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived, |
- this)); |
- DCHECK(res >= net::ERR_IO_PENDING); |
- if (res > 0) { |
- PacketReceived(res); |
- } |
- } |
+void UdpTransport::ReceiveOnePacket() { |
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- void set_packet_receiver(PacketReceiver* packet_receiver) { |
- packet_receiver_ = packet_receiver; |
+ int result = udp_socket_->RecvFrom( |
+ recv_buf_, |
+ kMaxPacketSize, |
+ &recv_addr_, |
+ base::Bind(&UdpTransport::OnReceived, |
+ weak_factory_.GetWeakPtr())); |
mikhal1
2014/01/07 20:50:06
nit:Can't this be in one line?
Alpha Left Google
2014/01/08 00:37:11
Done.
|
+ if (result > 0) { |
+ OnReceived(result); |
+ } else if (result != net::ERR_IO_PENDING) { |
+ LOG(ERROR) << "Failed to receive packet: " << result << "." |
+ << " Stop receiving packets."; |
} |
+} |
- void Close() { |
- udp_socket_->Close(); |
- } |
+void UdpTransport::OnReceived(int result) { |
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- protected: |
- virtual ~LocalUdpTransportData() {} |
- |
- private: |
- friend class base::RefCountedThreadSafe<LocalUdpTransportData>; |
- |
- net::UDPServerSocket* udp_socket_; |
- net::IPEndPoint bind_address_; |
- PacketReceiver* packet_receiver_; |
- scoped_refptr<net::IOBufferWithSize> buffer_; |
- scoped_refptr<base::TaskRunner> io_thread_proxy_; |
- |
- DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData); |
-}; |
- |
-class LocalPacketSender : public PacketSender, |
- public base::RefCountedThreadSafe<LocalPacketSender> { |
- public: |
- LocalPacketSender(net::UDPServerSocket* udp_socket, |
- scoped_refptr<base::TaskRunner> io_thread_proxy) |
- : udp_socket_(udp_socket), |
- send_address_(), |
- io_thread_proxy_(io_thread_proxy) {} |
- |
- virtual bool SendPacket(const Packet& packet) OVERRIDE { |
- io_thread_proxy_->PostTask(FROM_HERE, |
- base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet)); |
- return true; |
+ if (result < 0) { |
+ LOG(ERROR) << "Failed to receive packet: " << result << "." |
+ << " Stop receiving packets."; |
+ return; |
} |
- virtual void SendPacketToNetwork(const Packet& packet) { |
- DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- const uint8* data = &packet[0]; |
- scoped_refptr<net::WrappedIOBuffer> buffer( |
- new net::WrappedIOBuffer(reinterpret_cast<const char*>(data))); |
- udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()), |
- send_address_, |
- base::Bind(&LocalPacketSender::OnSendCompleted, |
- base::Unretained(this))); |
+ if (IsEmpty(remote_addr_)) { |
+ remote_addr_ = recv_addr_; |
+ VLOG(1) << "First packet received from: " |
+ << remote_addr_.ToString() << "."; |
+ } else if (!IsEqual(remote_addr_, recv_addr_)) { |
+ VLOG(1) << "Received from an unrecognized address: " |
+ << recv_addr_.ToString() << "."; |
+ return; |
} |
- virtual void OnSendCompleted(int result) { |
- if (result < 0) { |
- // TODO(mikhal): Add to error messages. |
- VLOG(0) << "Send failed on UDP socket : " << result; |
- } |
- } |
+ // TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy. |
+ uint8* data = new uint8[result]; |
+ memcpy(data, recv_buf_->data(), result); |
+ packet_receiver_->ReceivedPacket( |
+ data, |
+ result, |
+ base::Bind(&PacketReceiver::DeletePacket, data)); |
+ ReceiveOnePacket(); |
+} |
- virtual bool SendPackets(const PacketList& packets) OVERRIDE { |
- bool out_val = true; |
- for (size_t i = 0; i < packets.size(); ++i) { |
- const Packet& packet = packets[i]; |
- out_val |= SendPacket(packet); |
- } |
- return out_val; |
- } |
+bool UdpTransport::SendPackets(const PacketList& packets) { |
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- void SetSendAddress(const net::IPEndPoint& send_address) { |
- send_address_ = send_address; |
+ bool result = true; |
+ for (size_t i = 0; i < packets.size(); ++i) { |
+ result |= SendPacket(packets[i]); |
} |
- |
- protected: |
- virtual ~LocalPacketSender() {} |
- |
- private: |
- friend class base::RefCountedThreadSafe<LocalPacketSender>; |
- |
- net::UDPServerSocket* udp_socket_; // Not owned by this class. |
- net::IPEndPoint send_address_; |
- scoped_refptr<base::TaskRunner> io_thread_proxy_; |
-}; |
- |
-Transport::Transport( |
- scoped_refptr<base::TaskRunner> io_thread_proxy) |
- : udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())), |
- local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(), |
- io_thread_proxy)), |
- packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)), |
- io_thread_proxy_(io_thread_proxy) {} |
- |
-Transport::~Transport() {} |
- |
-PacketSender* Transport::packet_sender() { |
- return static_cast<PacketSender*>(packet_sender_.get()); |
+ return result; |
} |
-void Transport::StopReceiving() { |
- local_udp_transport_data_->Close(); |
-} |
- |
-void Transport::SetLocalReceiver(PacketReceiver* packet_receiver, |
- std::string ip_address, |
- std::string local_ip_address, |
- int port) { |
+bool UdpTransport::SendPacket(const Packet& packet) { |
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
- net::IPEndPoint bind_address, local_bind_address; |
- CreateUDPAddress(ip_address, port, &bind_address); |
- CreateUDPAddress(local_ip_address, port, &local_bind_address); |
- local_udp_transport_data_->set_packet_receiver(packet_receiver); |
- udp_socket_->AllowAddressReuse(); |
- udp_socket_->SetMulticastLoopbackMode(true); |
- udp_socket_->Listen(local_bind_address); |
- // Start listening once receiver has been set. |
- local_udp_transport_data_->ListenTo(bind_address); |
+ // TODO(hclam): This interface should take a net::IOBuffer to minimize |
+ // memcpy. |
+ scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(packet.size()); |
+ memcpy(buf->data(), &packet[0], packet.size()); |
+ bool ret = udp_socket_->SendTo( |
+ buf, |
+ static_cast<int>(packet.size()), |
+ remote_addr_, |
+ base::Bind(&UdpTransport::OnSent, |
+ weak_factory_.GetWeakPtr(), buf)); |
+ return ret == net::OK; |
} |
-void Transport::SetSendDestination(std::string ip_address, int port) { |
- net::IPEndPoint send_address; |
- CreateUDPAddress(ip_address, port, &send_address); |
- packet_sender_->SetSendAddress(send_address); |
+void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, |
+ int result) { |
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); |
+ |
+ if (result < 0) { |
+ VLOG(1) << "Failed to send packet: " << result << "."; |
+ } |
} |
} // namespace transport |