Index: net/base/file_stream_win.cc |
=================================================================== |
--- net/base/file_stream_win.cc (revision 147715) |
+++ net/base/file_stream_win.cc (working copy) |
@@ -8,6 +8,7 @@ |
#include "base/file_path.h" |
#include "base/logging.h" |
+#include "base/memory/ref_counted.h" |
#include "base/message_loop.h" |
#include "base/metrics/histogram.h" |
#include "base/synchronization/waitable_event.h" |
@@ -40,160 +41,460 @@ |
SetOffset(overlapped, offset); |
} |
-int RecordAndMapError(int error, |
- FileErrorSource source, |
- bool record_uma, |
- const net::BoundNetLog& bound_net_log) { |
- net::Error net_error = MapSystemError(error); |
+} // namespace |
- bound_net_log.AddEvent( |
- net::NetLog::TYPE_FILE_STREAM_ERROR, |
- base::Bind(&NetLogFileStreamErrorCallback, |
- source, error, net_error)); |
+// FileStreamWin::AsyncContext ---------------------------------------------- |
- RecordFileError(error, source, record_uma); |
+class FileStreamWin::AsyncContext : public MessageLoopForIO::IOHandler { |
+ public: |
+ explicit AsyncContext(const BoundNetLog& bound_net_log); |
+ AsyncContext(base::PlatformFile file, |
+ const BoundNetLog& bound_net_log, |
+ int open_flags); |
+ // Destroys the context. It can be deleted in the method or deletion can be |
+ // deferred to WorkerPool if some asynchronous operation is now in progress |
+ // or if auto-closing is needed. |
+ void Destroy(); |
+ |
+ bool record_uma() { return record_uma_; } |
+ void set_record_uma(bool value) { record_uma_ = value; } |
+ base::PlatformFile file() { return file_; } |
+ bool async_in_progress() { return async_in_progress_; } |
+ |
+ int RecordAndMapError(int error, FileErrorSource source); |
+ |
+ // Sync and async versions of all operations |
+ void OpenAsync(const FilePath& path, |
+ int open_flags, |
+ const CompletionCallback& callback); |
+ int OpenSync(const FilePath& path, int open_flags); |
+ |
+ void CloseAsync(const CompletionCallback& callback); |
+ void CloseSync(); |
+ |
+ void SeekAsync(Whence whence, |
+ int64 offset, |
+ const Int64CompletionCallback& callback); |
+ int64 SeekSync(Whence whence, int64 offset); |
+ |
+ int ReadAsync(IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback); |
+ int ReadSync(char* buf, int buf_len); |
+ |
+ int WriteAsync(IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback); |
+ int WriteSync(const char* buf, int buf_len); |
+ |
+ private: |
+ // Map system error into network error code and log it with |bound_net_log_|. |
+ // Method should be called with |net_log_lock_| locked. |
+ int MapAndLogError(int error, FileErrorSource source); |
+ |
+ // Opens a file with some network logging. |
+ // The result code is written to |result|. |
+ void OpenFileImpl(const FilePath& path, int open_flags, int* result); |
+ |
+ // Called when asynchronous Open() is completed. |
+ void OnOpenCompleted(const CompletionCallback& callback, int* result); |
+ |
+ // Called after any Open() is completed on thread where AsyncContext |
+ // is created. |
+ void RegisterInMessageLoop(); |
+ |
+ // Closes a file with some network logging. |
+ void CloseFileImpl(); |
+ |
+ // A helper method for Seek. |
+ void SeekFileImpl(Whence whence, int64 offset, int64* result); |
+ |
+ void IOCompletionIsPending(const CompletionCallback& callback, |
+ IOBuffer* buf); |
+ |
+ // Implementation of MessageLoopForIO::IOHandler |
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, |
+ DWORD bytes_read, |
+ DWORD error) OVERRIDE; |
+ |
+ // Called when asynchronous Open(), Close() or Seek() |
+ // is completed. |result| contains the result or a network error code. |
+ template <typename R> |
+ void OnAsyncCompleted(const base::Callback<void(R)>& callback, R* result); |
+ |
+ // Delete the context with asynchronous closing if necessary. |
+ void DeleteAbandoned(); |
+ |
+ MessageLoopForIO::IOContext io_context_; |
+ CompletionCallback callback_; |
+ scoped_refptr<IOBuffer> in_flight_buf_; |
+ base::PlatformFile file_; |
+ bool record_uma_; |
+ bool async_in_progress_; |
+ bool destroyed_; |
+ base::Lock net_log_lock_; |
+ const net::BoundNetLog bound_net_log_; |
+ FileErrorSource error_source_; |
+}; |
+ |
+FileStreamWin::AsyncContext::AsyncContext(const BoundNetLog& bound_net_log) |
+ : io_context_(), |
+ file_(base::kInvalidPlatformFileValue), |
+ record_uma_(false), |
+ async_in_progress_(false), |
+ destroyed_(false), |
+ bound_net_log_(bound_net_log), |
+ error_source_(FILE_ERROR_SOURCE_COUNT) { |
+ io_context_.handler = this; |
+} |
+ |
+FileStreamWin::AsyncContext::AsyncContext(base::PlatformFile file, |
+ const BoundNetLog& bound_net_log, |
+ int open_flags) |
+ : io_context_(), |
+ file_(file), |
+ record_uma_(false), |
+ async_in_progress_(false), |
+ destroyed_(false), |
+ bound_net_log_(bound_net_log), |
+ error_source_(FILE_ERROR_SOURCE_COUNT) { |
+ io_context_.handler = this; |
+ if (open_flags & base::PLATFORM_FILE_ASYNC) |
+ RegisterInMessageLoop(); |
+} |
+ |
+void FileStreamWin::AsyncContext::Destroy() { |
+ { |
+ // By locking we don't allow any operation with |bound_net_log_| to be |
+ // in progress while this method is executed. Attempt to do something |
+ // with |bound_net_log_| will be done either before this method or after |
+ // we switch |context_->destroyed_| which will prohibit any operation on |
+ // |bound_net_log_|. |
+ base::AutoLock locked(net_log_lock_); |
+ destroyed_ = true; |
+ } |
+ CancelIo(file_); |
+ if (!async_in_progress_) |
+ DeleteAbandoned(); |
+} |
+ |
+int FileStreamWin::AsyncContext::RecordAndMapError(int error, |
+ FileErrorSource source) { |
+ int net_error; |
+ { |
+ base::AutoLock locked(net_log_lock_); |
+ net_error = MapAndLogError(error, source); |
+ } |
+ RecordFileError(error, source, record_uma_); |
return net_error; |
} |
-// Opens a file with some network logging. |
-// The opened file and the result code are written to |file| and |result|. |
-void OpenFile(const FilePath& path, |
- int open_flags, |
- bool record_uma, |
- base::PlatformFile* file, |
- int* result, |
- const net::BoundNetLog& bound_net_log) { |
- std::string file_name = path.AsUTF8Unsafe(); |
- bound_net_log.BeginEvent( |
- net::NetLog::TYPE_FILE_STREAM_OPEN, |
- NetLog::StringCallback("file_name", &file_name)); |
+void FileStreamWin::AsyncContext::OpenAsync( |
+ const FilePath& path, |
+ int open_flags, |
+ const CompletionCallback& callback) { |
+ DCHECK(!async_in_progress_); |
- *file = base::CreatePlatformFile(path, open_flags, NULL, NULL); |
- if (*file == base::kInvalidPlatformFileValue) { |
+ int* result = new int(OK); |
+ const bool posted = base::WorkerPool::PostTaskAndReply( |
+ FROM_HERE, |
+ base::Bind(&AsyncContext::OpenFileImpl, base::Unretained(this), |
+ path, open_flags, result), |
+ base::Bind(&AsyncContext::OnOpenCompleted, |
+ base::Unretained(this), |
+ callback, base::Owned(result)), |
+ true /* task_is_slow */); |
+ DCHECK(posted); |
+ |
+ async_in_progress_ = true; |
+} |
+ |
+int FileStreamWin::AsyncContext::OpenSync(const FilePath& path, |
+ int open_flags) { |
+ int result = OK; |
+ OpenFileImpl(path, open_flags, &result); |
+ // TODO(satorux): Remove this once all async clients are migrated to use |
+ // Open(). crbug.com/114783 |
+ if (open_flags & base::PLATFORM_FILE_ASYNC) |
+ RegisterInMessageLoop(); |
+ return result; |
+} |
+ |
+void FileStreamWin::AsyncContext::CloseAsync( |
+ const CompletionCallback& callback) { |
+ DCHECK(!async_in_progress_); |
+ |
+ // Value OK will never be changed in AsyncContext::CloseFile() and is needed |
+ // here just to use the same AsyncContext::OnAsyncCompleted(). |
+ int* result = new int(OK); |
+ const bool posted = base::WorkerPool::PostTaskAndReply( |
+ FROM_HERE, |
+ base::Bind(&AsyncContext::CloseFileImpl, base::Unretained(this)), |
+ base::Bind(&AsyncContext::OnAsyncCompleted<int>, |
+ base::Unretained(this), |
+ callback, base::Owned(result)), |
+ true /* task_is_slow */); |
+ DCHECK(posted); |
+ |
+ async_in_progress_ = true; |
+} |
+ |
+void FileStreamWin::AsyncContext::CloseSync() { |
+ DCHECK(!async_in_progress_); |
+ CloseFileImpl(); |
+} |
+ |
+void FileStreamWin::AsyncContext::SeekAsync( |
+ Whence whence, |
+ int64 offset, |
+ const Int64CompletionCallback& callback) { |
+ DCHECK(!async_in_progress_); |
+ |
+ int64* result = new int64(-1); |
+ const bool posted = base::WorkerPool::PostTaskAndReply( |
+ FROM_HERE, |
+ base::Bind(&AsyncContext::SeekFileImpl, base::Unretained(this), |
+ whence, offset, result), |
+ base::Bind(&AsyncContext::OnAsyncCompleted<int64>, |
+ base::Unretained(this), |
+ callback, base::Owned(result)), |
+ true /* task is slow */); |
+ DCHECK(posted); |
+ |
+ async_in_progress_ = true; |
+} |
+ |
+int64 FileStreamWin::AsyncContext::SeekSync(Whence whence, int64 offset) { |
+ int64 result = -1; |
+ SeekFileImpl(whence, offset, &result); |
+ return result; |
+} |
+ |
+int FileStreamWin::AsyncContext::ReadAsync( |
+ IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback) { |
+ DCHECK(!async_in_progress_); |
+ error_source_ = FILE_ERROR_SOURCE_READ; |
+ |
+ int rv = 0; |
+ |
+ DWORD bytes_read; |
+ if (!ReadFile(file_, buf->data(), buf_len, |
+ &bytes_read, &io_context_.overlapped)) { |
DWORD error = GetLastError(); |
- LOG(WARNING) << "Failed to open file: " << error; |
- *result = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_OPEN, |
- record_uma, |
- bound_net_log); |
- bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); |
- return; |
+ if (error == ERROR_IO_PENDING) { |
+ IOCompletionIsPending(callback, buf); |
+ rv = ERR_IO_PENDING; |
+ } else if (error == ERROR_HANDLE_EOF) { |
+ rv = 0; // Report EOF by returning 0 bytes read. |
+ } else { |
+ LOG(WARNING) << "ReadFile failed: " << error; |
+ rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ); |
+ } |
+ } else { |
+ IOCompletionIsPending(callback, buf); |
+ rv = ERR_IO_PENDING; |
} |
+ return rv; |
} |
-// Closes a file with some network logging. |
-void CloseFile(base::PlatformFile file, |
- const net::BoundNetLog& bound_net_log) { |
- bound_net_log.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE); |
- if (file == base::kInvalidPlatformFileValue) |
- return; |
+int FileStreamWin::AsyncContext::ReadSync(char* buf, int buf_len) { |
+ base::ThreadRestrictions::AssertIOAllowed(); |
- CancelIo(file); |
+ int rv = 0; |
- if (!base::ClosePlatformFile(file)) |
- NOTREACHED(); |
- bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); |
+ DWORD bytes_read; |
+ if (!ReadFile(file_, buf, buf_len, &bytes_read, NULL)) { |
+ DWORD error = GetLastError(); |
+ if (error == ERROR_HANDLE_EOF) { |
+ rv = 0; // Report EOF by returning 0 bytes read. |
+ } else { |
+ LOG(WARNING) << "ReadFile failed: " << error; |
+ rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ); |
+ } |
+ } else { |
+ rv = static_cast<int>(bytes_read); |
+ } |
+ return rv; |
} |
-// Closes a file with CloseFile() and signals the completion. |
-void CloseFileAndSignal(base::PlatformFile* file, |
- base::WaitableEvent* on_io_complete, |
- const net::BoundNetLog& bound_net_log) { |
- CloseFile(*file, bound_net_log); |
- *file = base::kInvalidPlatformFileValue; |
- on_io_complete->Signal(); |
+int FileStreamWin::AsyncContext::WriteAsync( |
+ IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback) { |
+ error_source_ = FILE_ERROR_SOURCE_WRITE; |
+ |
+ int rv = 0; |
+ DWORD bytes_written = 0; |
+ if (!WriteFile(file_, buf->data(), buf_len, |
+ &bytes_written, &io_context_.overlapped)) { |
+ DWORD error = GetLastError(); |
+ if (error == ERROR_IO_PENDING) { |
+ IOCompletionIsPending(callback, buf); |
+ rv = ERR_IO_PENDING; |
+ } else { |
+ LOG(WARNING) << "WriteFile failed: " << error; |
+ rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE); |
+ } |
+ } else { |
+ IOCompletionIsPending(callback, buf); |
+ rv = ERR_IO_PENDING; |
+ } |
+ return rv; |
} |
-// Invokes a given closure and signals the completion. |
-void InvokeAndSignal(const base::Closure& closure, |
- base::WaitableEvent* on_io_complete) { |
- closure.Run(); |
- on_io_complete->Signal(); |
+int FileStreamWin::AsyncContext::WriteSync(const char* buf, int buf_len) { |
+ base::ThreadRestrictions::AssertIOAllowed(); |
+ |
+ int rv = 0; |
+ DWORD bytes_written = 0; |
+ if (!WriteFile(file_, buf, buf_len, &bytes_written, NULL)) { |
+ DWORD error = GetLastError(); |
+ LOG(WARNING) << "WriteFile failed: " << error; |
+ rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE); |
+ } else { |
+ rv = static_cast<int>(bytes_written); |
+ } |
+ return rv; |
} |
-} // namespace |
+int FileStreamWin::AsyncContext::MapAndLogError(int error, |
+ FileErrorSource source) { |
+ net_log_lock_.AssertAcquired(); |
+ // The following check is against incorrect use or bug. File descriptor |
+ // shouldn't ever be closed outside of FileStream while it still tries to do |
+ // something with it. |
+ DCHECK(error != ERROR_INVALID_HANDLE); |
+ net::Error net_error = MapSystemError(error); |
-// FileStreamWin::AsyncContext ---------------------------------------------- |
- |
-class FileStreamWin::AsyncContext : public MessageLoopForIO::IOHandler { |
- public: |
- explicit AsyncContext(const net::BoundNetLog& bound_net_log) |
- : context_(), is_closing_(false), |
- record_uma_(false), bound_net_log_(bound_net_log), |
- error_source_(FILE_ERROR_SOURCE_COUNT) { |
- context_.handler = this; |
+ if (!destroyed_) { |
+ bound_net_log_.AddEvent( |
+ net::NetLog::TYPE_FILE_STREAM_ERROR, |
+ base::Bind(&NetLogFileStreamErrorCallback, |
+ source, error, net_error)); |
} |
- ~AsyncContext(); |
- void IOCompletionIsPending(const CompletionCallback& callback, |
- IOBuffer* buf); |
+ return net_error; |
+} |
- OVERLAPPED* overlapped() { return &context_.overlapped; } |
- const CompletionCallback& callback() const { return callback_; } |
+void FileStreamWin::AsyncContext::OpenFileImpl(const FilePath& path, |
+ int open_flags, |
+ int* result) { |
+ std::string file_name = path.AsUTF8Unsafe(); |
+ { |
+ base::AutoLock locked(net_log_lock_); |
+ // Bail out quickly if operation was already canceled |
+ if (destroyed_) |
+ return; |
- void set_error_source(FileErrorSource source) { error_source_ = source; } |
+ bound_net_log_.BeginEvent( |
+ net::NetLog::TYPE_FILE_STREAM_OPEN, |
+ NetLog::StringCallback("file_name", &file_name)); |
+ } |
- void EnableErrorStatistics() { |
- record_uma_ = true; |
+ file_ = base::CreatePlatformFile(path, open_flags, NULL, NULL); |
+ if (file_ == base::kInvalidPlatformFileValue) { |
+ DWORD error = GetLastError(); |
+ LOG(WARNING) << "Failed to open file: " << error; |
+ { |
+ base::AutoLock locked(net_log_lock_); |
+ if (!destroyed_) |
+ bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); |
+ *result = MapAndLogError(error, FILE_ERROR_SOURCE_OPEN); |
+ } |
+ RecordFileError(error, FILE_ERROR_SOURCE_OPEN, record_uma_); |
} |
+} |
- private: |
- virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, |
- DWORD bytes_read, DWORD error) OVERRIDE; |
+void FileStreamWin::AsyncContext::OnOpenCompleted( |
+ const CompletionCallback& callback, |
+ int* result) { |
+ if (!destroyed_) |
+ RegisterInMessageLoop(); |
+ OnAsyncCompleted(callback, result); |
+} |
- MessageLoopForIO::IOContext context_; |
- CompletionCallback callback_; |
- scoped_refptr<IOBuffer> in_flight_buf_; |
- bool is_closing_; |
- bool record_uma_; |
- const net::BoundNetLog bound_net_log_; |
- FileErrorSource error_source_; |
-}; |
+void FileStreamWin::AsyncContext::RegisterInMessageLoop() { |
+ if (file_ != base::kInvalidPlatformFileValue) |
+ MessageLoopForIO::current()->RegisterIOHandler(file_, this); |
+} |
-FileStreamWin::AsyncContext::~AsyncContext() { |
- is_closing_ = true; |
- bool waited = false; |
- base::TimeTicks start = base::TimeTicks::Now(); |
- while (!callback_.is_null()) { |
- waited = true; |
- MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); |
+void FileStreamWin::AsyncContext::CloseFileImpl() { |
+ { |
+ base::AutoLock locked(net_log_lock_); |
+ if (!destroyed_) |
+ bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE); |
} |
- if (waited) { |
- // We want to see if we block the message loop for too long. |
- UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", |
- base::TimeTicks::Now() - start); |
+ |
+ if (file_ == base::kInvalidPlatformFileValue) |
+ return; |
+ if (!base::ClosePlatformFile(file_)) |
+ NOTREACHED(); |
+ file_ = base::kInvalidPlatformFileValue; |
+ |
+ { |
+ base::AutoLock locked(net_log_lock_); |
+ if (!destroyed_) |
+ bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); |
} |
} |
+void FileStreamWin::AsyncContext::SeekFileImpl(Whence whence, |
+ int64 offset, |
+ int64* result) { |
+ base::ThreadRestrictions::AssertIOAllowed(); |
+ |
+ // If context has been already destroyed nobody waits for operation results. |
+ if (destroyed_) |
+ return; |
+ |
+ LARGE_INTEGER distance, res; |
+ distance.QuadPart = offset; |
+ DWORD move_method = static_cast<DWORD>(whence); |
+ if (!SetFilePointerEx(file_, distance, &res, move_method)) { |
+ DWORD error = GetLastError(); |
+ LOG(WARNING) << "SetFilePointerEx failed: " << error; |
+ *result = RecordAndMapError(error, FILE_ERROR_SOURCE_SEEK); |
+ return; |
+ } |
+ SetOffset(&io_context_.overlapped, res); |
+ *result = res.QuadPart; |
+} |
+ |
void FileStreamWin::AsyncContext::IOCompletionIsPending( |
const CompletionCallback& callback, |
IOBuffer* buf) { |
DCHECK(callback_.is_null()); |
callback_ = callback; |
in_flight_buf_ = buf; // Hold until the async operation ends. |
+ async_in_progress_ = true; |
} |
void FileStreamWin::AsyncContext::OnIOCompleted( |
- MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) { |
- DCHECK_EQ(&context_, context); |
+ MessageLoopForIO::IOContext* context, |
+ DWORD bytes_read, |
+ DWORD error) { |
+ DCHECK_EQ(&io_context_, context); |
DCHECK(!callback_.is_null()); |
- if (is_closing_) { |
+ if (destroyed_) { |
callback_.Reset(); |
in_flight_buf_ = NULL; |
+ DeleteAbandoned(); |
return; |
} |
int result = static_cast<int>(bytes_read); |
- if (error && error != ERROR_HANDLE_EOF) { |
- result = RecordAndMapError(error, error_source_, record_uma_, |
- bound_net_log_); |
- } |
+ if (error && error != ERROR_HANDLE_EOF) |
+ result = RecordAndMapError(error, error_source_); |
if (bytes_read) |
- IncrementOffset(&context->overlapped, bytes_read); |
+ IncrementOffset(&io_context_.overlapped, bytes_read); |
+ // Reset this before Run() as Run() may issue a new async operation. |
+ async_in_progress_ = false; |
CompletionCallback temp_callback = callback_; |
callback_.Reset(); |
scoped_refptr<IOBuffer> temp_buf = in_flight_buf_; |
@@ -201,114 +502,89 @@ |
temp_callback.Run(result); |
} |
+template <typename R> |
+void FileStreamWin::AsyncContext::OnAsyncCompleted( |
+ const base::Callback<void(R)>& callback, |
+ R* result) { |
+ if (destroyed_) { |
+ DeleteAbandoned(); |
+ } else { |
+ // Reset this before Run() as Run() may issue a new async operation. |
+ async_in_progress_ = false; |
+ callback.Run(*result); |
+ } |
+} |
+ |
+void FileStreamWin::AsyncContext::DeleteAbandoned() { |
+ if (file_ != base::kInvalidPlatformFileValue) { |
+ const bool posted = base::WorkerPool::PostTask( |
+ FROM_HERE, |
+ // Context should be deleted after closing, thus Owned(). |
+ base::Bind(&AsyncContext::CloseFileImpl, base::Owned(this)), |
+ true /* task_is_slow */); |
+ DCHECK(posted); |
+ } else { |
+ delete this; |
+ } |
+} |
+ |
// FileStream ------------------------------------------------------------ |
FileStreamWin::FileStreamWin(net::NetLog* net_log) |
- : file_(base::kInvalidPlatformFileValue), |
+ : context_(NULL), |
open_flags_(0), |
- auto_closed_(true), |
- record_uma_(false), |
bound_net_log_(net::BoundNetLog::Make(net_log, |
- net::NetLog::SOURCE_FILESTREAM)), |
- weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
+ net::NetLog::SOURCE_FILESTREAM)) { |
+ context_ = new AsyncContext(bound_net_log_); |
+ |
bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE); |
} |
-FileStreamWin::FileStreamWin( |
- base::PlatformFile file, int flags, net::NetLog* net_log) |
- : file_(file), |
+FileStreamWin::FileStreamWin(base::PlatformFile file, |
+ int flags, |
+ net::NetLog* net_log) |
+ : context_(NULL), |
open_flags_(flags), |
- auto_closed_(false), |
- record_uma_(false), |
bound_net_log_(net::BoundNetLog::Make(net_log, |
- net::NetLog::SOURCE_FILESTREAM)), |
- weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
+ net::NetLog::SOURCE_FILESTREAM)) { |
+ context_ = new AsyncContext(file, bound_net_log_, open_flags_); |
+ |
bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE); |
- |
- // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to |
- // make sure we will perform asynchronous File IO to it. |
- if (flags & base::PLATFORM_FILE_ASYNC) { |
- async_context_.reset(new AsyncContext(bound_net_log_)); |
- MessageLoopForIO::current()->RegisterIOHandler(file_, |
- async_context_.get()); |
- } |
} |
FileStreamWin::~FileStreamWin() { |
- if (open_flags_ & base::PLATFORM_FILE_ASYNC) { |
- // Block until the in-flight open/close operation is complete. |
- // TODO(satorux): Ideally we should not block. crbug.com/115067 |
- WaitForIOCompletion(); |
+ if (IsOpen() && !is_async()) |
+ CloseSync(); |
+ context_->Destroy(); |
- // Block until the last read/write operation is complete. |
- async_context_.reset(); |
- } |
- |
- if (auto_closed_) { |
- if (open_flags_ & base::PLATFORM_FILE_ASYNC) { |
- // Close the file in the background. |
- if (IsOpen()) { |
- const bool posted = base::WorkerPool::PostTask( |
- FROM_HERE, |
- base::Bind(&CloseFile, file_, bound_net_log_), |
- true /* task_is_slow */); |
- DCHECK(posted); |
- } |
- } else { |
- CloseSync(); |
- } |
- } |
- |
bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE); |
} |
void FileStreamWin::Close(const CompletionCallback& callback) { |
- DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); |
- DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
- DCHECK(!on_io_complete_.get()); |
- on_io_complete_.reset(new base::WaitableEvent( |
- false /* manual_reset */, false /* initially_signaled */)); |
- |
- // Passing &file_ to a thread pool looks unsafe but it's safe here as the |
- // destructor ensures that the close operation is complete with |
- // WaitForIOCompletion(). See also the destructor. |
- const bool posted = base::WorkerPool::PostTaskAndReply( |
- FROM_HERE, |
- base::Bind(&CloseFileAndSignal, &file_, on_io_complete_.get(), |
- bound_net_log_), |
- base::Bind(&FileStreamWin::OnClosed, |
- weak_ptr_factory_.GetWeakPtr(), |
- callback), |
- true /* task_is_slow */); |
- DCHECK(posted); |
+ DCHECK(is_async()); |
+ context_->CloseAsync(callback); |
} |
void FileStreamWin::CloseSync() { |
- // The logic here is similar to CloseFile() but async_context_.reset() is |
- // caled in this function. |
+ // CloseSync() should be called on the correct thread even if it eventually |
+ // ends up inside CloseAndCancelAsync(). |
+ base::ThreadRestrictions::AssertIOAllowed(); |
- // Block until the in-flight open operation is complete. |
- // TODO(satorux): Replace this with a DCHECK(open_flags & ASYNC) once this |
- // once all async clients are migrated to use Close(). crbug.com/114783 |
- WaitForIOCompletion(); |
- |
- bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE); |
- if (file_ != base::kInvalidPlatformFileValue) |
- CancelIo(file_); |
- |
- // Block until the last read/write operation is complete. |
- async_context_.reset(); |
- |
- if (file_ != base::kInvalidPlatformFileValue) { |
- if (!base::ClosePlatformFile(file_)) |
- NOTREACHED(); |
- file_ = base::kInvalidPlatformFileValue; |
- |
- bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); |
+ // TODO(satorux): Replace the following async stuff with a |
+ // DCHECK(is_async()) once all async clients are migrated to |
+ // use Close(). crbug.com/114783 |
+ if (!context_->async_in_progress()) { |
+ context_->CloseSync(); |
+ } else { |
+ AsyncContext* old_ctx = context_; |
+ context_ = new AsyncContext(bound_net_log_); |
+ context_->set_record_uma(old_ctx->record_uma()); |
+ old_ctx->Destroy(); |
} |
} |
-int FileStreamWin::Open(const FilePath& path, int open_flags, |
+int FileStreamWin::Open(const FilePath& path, |
+ int open_flags, |
const CompletionCallback& callback) { |
if (IsOpen()) { |
DLOG(FATAL) << "File is already open!"; |
@@ -316,27 +592,8 @@ |
} |
open_flags_ = open_flags; |
- DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); |
- DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
- DCHECK(!on_io_complete_.get()); |
- on_io_complete_.reset(new base::WaitableEvent( |
- false /* manual_reset */, false /* initially_signaled */)); |
- |
- // Passing &file_ to a thread pool looks unsafe but it's safe here as the |
- // destructor ensures that the open operation is complete with |
- // WaitForIOCompletion(). See also the destructor. |
- int* result = new int(OK); |
- const bool posted = base::WorkerPool::PostTaskAndReply( |
- FROM_HERE, |
- base::Bind(&InvokeAndSignal, |
- base::Bind(&OpenFile, path, open_flags, record_uma_, &file_, |
- result, bound_net_log_), |
- on_io_complete_.get()), |
- base::Bind(&FileStreamWin::OnOpened, |
- weak_ptr_factory_.GetWeakPtr(), |
- callback, base::Owned(result)), |
- true /* task_is_slow */); |
- DCHECK(posted); |
+ DCHECK(is_async()); |
+ context_->OpenAsync(path, open_flags, callback); |
return ERR_IO_PENDING; |
} |
@@ -347,27 +604,15 @@ |
} |
open_flags_ = open_flags; |
- |
- int result = OK; |
- OpenFile(path, open_flags_, record_uma_, &file_, &result, bound_net_log_); |
- if (result != OK) |
- return result; |
- |
- // TODO(satorux): Remove this once all async clients are migrated to use |
- // Open(). crbug.com/114783 |
- if (open_flags_ & base::PLATFORM_FILE_ASYNC) { |
- async_context_.reset(new AsyncContext(bound_net_log_)); |
- if (record_uma_) |
- async_context_->EnableErrorStatistics(); |
- MessageLoopForIO::current()->RegisterIOHandler(file_, |
- async_context_.get()); |
- } |
- |
- return OK; |
+ // TODO(satorux): Put a DCHECK once all async clients are migrated |
+ // to use Open(). crbug.com/114783 |
+ // |
+ // DCHECK(!is_async()); |
+ return context_->OpenSync(path, open_flags_); |
} |
bool FileStreamWin::IsOpen() const { |
- return file_ != base::kInvalidPlatformFileValue; |
+ return context_->file() != base::kInvalidPlatformFileValue; |
} |
int FileStreamWin::Seek(Whence whence, int64 offset, |
@@ -376,27 +621,8 @@ |
return ERR_UNEXPECTED; |
// Make sure we're async and we have no other in-flight async operations. |
- DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); |
- DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
- DCHECK(!on_io_complete_.get()); |
- |
- int64* result = new int64(-1); |
- on_io_complete_.reset(new base::WaitableEvent( |
- false /* manual_reset */, false /* initially_signaled */)); |
- |
- const bool posted = base::WorkerPool::PostTaskAndReply( |
- FROM_HERE, |
- base::Bind(&InvokeAndSignal, |
- // Unretained should be fine as we wait for a signal on |
- // on_io_complete_ at the destructor. |
- base::Bind(&FileStreamWin::SeekFile, base::Unretained(this), |
- whence, offset, result), |
- on_io_complete_.get()), |
- base::Bind(&FileStreamWin::OnSeeked, |
- weak_ptr_factory_.GetWeakPtr(), |
- callback, base::Owned(result)), |
- true /* task is slow */); |
- DCHECK(posted); |
+ DCHECK(is_async()); |
+ context_->SeekAsync(whence, offset, callback); |
return ERR_IO_PENDING; |
} |
@@ -404,10 +630,8 @@ |
if (!IsOpen()) |
return ERR_UNEXPECTED; |
- DCHECK(!async_context_.get() || async_context_->callback().is_null()); |
- int64 result = -1; |
- SeekFile(whence, offset, &result); |
- return result; |
+ DCHECK(!is_async() || !context_->async_in_progress()); |
+ return context_->SeekSync(whence, offset); |
} |
int64 FileStreamWin::Available() { |
@@ -421,86 +645,31 @@ |
return cur_pos; |
LARGE_INTEGER file_size; |
- if (!GetFileSizeEx(file_, &file_size)) { |
+ if (!GetFileSizeEx(context_->file(), &file_size)) { |
DWORD error = GetLastError(); |
LOG(WARNING) << "GetFileSizeEx failed: " << error; |
- return RecordAndMapError(error, |
- FILE_ERROR_SOURCE_GET_SIZE, |
- record_uma_, |
- bound_net_log_); |
+ return context_->RecordAndMapError(error, FILE_ERROR_SOURCE_GET_SIZE); |
} |
return file_size.QuadPart - cur_pos; |
} |
-int FileStreamWin::Read( |
- IOBuffer* buf, int buf_len, const CompletionCallback& callback) { |
- DCHECK(async_context_.get()); |
- |
+int FileStreamWin::Read(IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback) { |
if (!IsOpen()) |
return ERR_UNEXPECTED; |
DCHECK(open_flags_ & base::PLATFORM_FILE_READ); |
- |
- OVERLAPPED* overlapped = NULL; |
- DCHECK(!callback.is_null()); |
- DCHECK(async_context_->callback().is_null()); |
- overlapped = async_context_->overlapped(); |
- async_context_->set_error_source(FILE_ERROR_SOURCE_READ); |
- |
- int rv = 0; |
- |
- DWORD bytes_read; |
- if (!ReadFile(file_, buf->data(), buf_len, &bytes_read, overlapped)) { |
- DWORD error = GetLastError(); |
- if (error == ERROR_IO_PENDING) { |
- async_context_->IOCompletionIsPending(callback, buf); |
- rv = ERR_IO_PENDING; |
- } else if (error == ERROR_HANDLE_EOF) { |
- rv = 0; // Report EOF by returning 0 bytes read. |
- } else { |
- LOG(WARNING) << "ReadFile failed: " << error; |
- rv = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_READ, |
- record_uma_, |
- bound_net_log_); |
- } |
- } else if (overlapped) { |
- async_context_->IOCompletionIsPending(callback, buf); |
- rv = ERR_IO_PENDING; |
- } else { |
- rv = static_cast<int>(bytes_read); |
- } |
- return rv; |
+ return context_->ReadAsync(buf, buf_len, callback); |
} |
int FileStreamWin::ReadSync(char* buf, int buf_len) { |
- DCHECK(!async_context_.get()); |
- base::ThreadRestrictions::AssertIOAllowed(); |
- |
if (!IsOpen()) |
return ERR_UNEXPECTED; |
DCHECK(open_flags_ & base::PLATFORM_FILE_READ); |
- |
- int rv = 0; |
- |
- DWORD bytes_read; |
- if (!ReadFile(file_, buf, buf_len, &bytes_read, NULL)) { |
- DWORD error = GetLastError(); |
- if (error == ERROR_HANDLE_EOF) { |
- rv = 0; // Report EOF by returning 0 bytes read. |
- } else { |
- LOG(WARNING) << "ReadFile failed: " << error; |
- rv = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_READ, |
- record_uma_, |
- bound_net_log_); |
- } |
- } else { |
- rv = static_cast<int>(bytes_read); |
- } |
- return rv; |
+ return context_->ReadSync(buf, buf_len); |
} |
int FileStreamWin::ReadUntilComplete(char *buf, int buf_len) { |
@@ -524,67 +693,22 @@ |
return bytes_total; |
} |
-int FileStreamWin::Write( |
- IOBuffer* buf, int buf_len, const CompletionCallback& callback) { |
- DCHECK(async_context_.get()); |
- |
+int FileStreamWin::Write(IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback) { |
if (!IsOpen()) |
return ERR_UNEXPECTED; |
DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); |
- |
- OVERLAPPED* overlapped = NULL; |
- DCHECK(!callback.is_null()); |
- DCHECK(async_context_->callback().is_null()); |
- overlapped = async_context_->overlapped(); |
- async_context_->set_error_source(FILE_ERROR_SOURCE_WRITE); |
- |
- int rv = 0; |
- DWORD bytes_written = 0; |
- if (!WriteFile(file_, buf->data(), buf_len, &bytes_written, overlapped)) { |
- DWORD error = GetLastError(); |
- if (error == ERROR_IO_PENDING) { |
- async_context_->IOCompletionIsPending(callback, buf); |
- rv = ERR_IO_PENDING; |
- } else { |
- LOG(WARNING) << "WriteFile failed: " << error; |
- rv = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_WRITE, |
- record_uma_, |
- bound_net_log_); |
- } |
- } else if (overlapped) { |
- async_context_->IOCompletionIsPending(callback, buf); |
- rv = ERR_IO_PENDING; |
- } else { |
- rv = static_cast<int>(bytes_written); |
- } |
- return rv; |
+ return context_->WriteAsync(buf, buf_len, callback); |
} |
-int FileStreamWin::WriteSync( |
- const char* buf, int buf_len) { |
- DCHECK(!async_context_.get()); |
- base::ThreadRestrictions::AssertIOAllowed(); |
- |
+int FileStreamWin::WriteSync(const char* buf, int buf_len) { |
if (!IsOpen()) |
return ERR_UNEXPECTED; |
DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); |
- |
- int rv = 0; |
- DWORD bytes_written = 0; |
- if (!WriteFile(file_, buf, buf_len, &bytes_written, NULL)) { |
- DWORD error = GetLastError(); |
- LOG(WARNING) << "WriteFile failed: " << error; |
- rv = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_WRITE, |
- record_uma_, |
- bound_net_log_); |
- } else { |
- rv = static_cast<int>(bytes_written); |
- } |
- return rv; |
+ return context_->WriteSync(buf, buf_len); |
} |
int FileStreamWin::Flush() { |
@@ -594,14 +718,11 @@ |
return ERR_UNEXPECTED; |
DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); |
- if (FlushFileBuffers(file_)) { |
+ if (FlushFileBuffers(context_->file())) { |
return OK; |
} |
- return RecordAndMapError(GetLastError(), |
- FILE_ERROR_SOURCE_FLUSH, |
- record_uma_, |
- bound_net_log_); |
+ return context_->RecordAndMapError(GetLastError(), FILE_ERROR_SOURCE_FLUSH); |
} |
int64 FileStreamWin::Truncate(int64 bytes) { |
@@ -619,14 +740,11 @@ |
return ERR_UNEXPECTED; |
// And truncate the file. |
- BOOL result = SetEndOfFile(file_); |
+ BOOL result = SetEndOfFile(context_->file()); |
if (!result) { |
DWORD error = GetLastError(); |
LOG(WARNING) << "SetEndOfFile failed: " << error; |
- return RecordAndMapError(error, |
- FILE_ERROR_SOURCE_SET_EOF, |
- record_uma_, |
- bound_net_log_); |
+ return context_->RecordAndMapError(error, FILE_ERROR_SOURCE_SET_EOF); |
} |
// Success. |
@@ -634,10 +752,7 @@ |
} |
void FileStreamWin::EnableErrorStatistics() { |
- record_uma_ = true; |
- |
- if (async_context_.get()) |
- async_context_->EnableErrorStatistics(); |
+ context_->set_record_uma(true); |
} |
void FileStreamWin::SetBoundNetLogSource( |
@@ -661,71 +776,7 @@ |
} |
base::PlatformFile FileStreamWin::GetPlatformFileForTesting() { |
- return file_; |
+ return context_->file(); |
} |
-void FileStreamWin::OnClosed(const CompletionCallback& callback) { |
- file_ = base::kInvalidPlatformFileValue; |
- |
- // Reset this before Run() as Run() may issue a new async operation. |
- ResetOnIOComplete(); |
- callback.Run(OK); |
-} |
- |
-void FileStreamWin::SeekFile(Whence whence, int64 offset, int64* result) { |
- LARGE_INTEGER distance, res; |
- distance.QuadPart = offset; |
- DWORD move_method = static_cast<DWORD>(whence); |
- if (!SetFilePointerEx(file_, distance, &res, move_method)) { |
- DWORD error = GetLastError(); |
- LOG(WARNING) << "SetFilePointerEx failed: " << error; |
- *result = RecordAndMapError(error, |
- FILE_ERROR_SOURCE_SEEK, |
- record_uma_, |
- bound_net_log_); |
- return; |
- } |
- if (async_context_.get()) { |
- async_context_->set_error_source(FILE_ERROR_SOURCE_SEEK); |
- SetOffset(async_context_->overlapped(), res); |
- } |
- *result = res.QuadPart; |
-} |
- |
-void FileStreamWin::OnOpened(const CompletionCallback& callback, int* result) { |
- if (*result == OK) { |
- async_context_.reset(new AsyncContext(bound_net_log_)); |
- if (record_uma_) |
- async_context_->EnableErrorStatistics(); |
- MessageLoopForIO::current()->RegisterIOHandler(file_, |
- async_context_.get()); |
- } |
- |
- // Reset this before Run() as Run() may issue a new async operation. |
- ResetOnIOComplete(); |
- callback.Run(*result); |
-} |
- |
-void FileStreamWin::OnSeeked( |
- const Int64CompletionCallback& callback, |
- int64* result) { |
- // Reset this before Run() as Run() may issue a new async operation. |
- ResetOnIOComplete(); |
- callback.Run(*result); |
-} |
- |
-void FileStreamWin::ResetOnIOComplete() { |
- on_io_complete_.reset(); |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
-} |
- |
-void FileStreamWin::WaitForIOCompletion() { |
- // http://crbug.com/115067 |
- base::ThreadRestrictions::ScopedAllowWait allow_wait; |
- if (on_io_complete_.get()) { |
- on_io_complete_->Wait(); |
- on_io_complete_.reset(); |
- } |
-} |
- |
} // namespace net |