2 * Copyright (C) 2011 Google Inc. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
14 * * Neither the name of Google Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived from
16 * this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
35 #include "WorkerThreadableWebSocketChannel.h"
37 #include "ArrayBuffer.h"
39 #include "CrossThreadTask.h"
40 #include "PlatformString.h"
41 #include "ScriptExecutionContext.h"
42 #include "ThreadableWebSocketChannelClientWrapper.h"
43 #include "WebSocketChannel.h"
44 #include "WebSocketChannelClient.h"
45 #include "WorkerContext.h"
46 #include "WorkerLoaderProxy.h"
47 #include "WorkerRunLoop.h"
48 #include "WorkerThread.h"
49 #include <wtf/MainThread.h>
50 #include <wtf/PassRefPtr.h>
54 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode)
55 : m_workerContext(context)
56 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
57 , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode))
61 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
64 m_bridge->disconnect();
67 bool WorkerThreadableWebSocketChannel::useHixie76Protocol()
69 ASSERT(m_workerClientWrapper);
70 return m_workerClientWrapper->useHixie76Protocol();
73 void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
76 m_bridge->connect(url, protocol);
79 String WorkerThreadableWebSocketChannel::subprotocol()
81 ASSERT(m_workerClientWrapper);
82 return m_workerClientWrapper->subprotocol();
85 bool WorkerThreadableWebSocketChannel::send(const String& message)
89 return m_bridge->send(message);
92 bool WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData)
96 return m_bridge->send(binaryData);
99 bool WorkerThreadableWebSocketChannel::send(const Blob& binaryData)
103 return m_bridge->send(binaryData);
106 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
110 return m_bridge->bufferedAmount();
113 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
116 m_bridge->close(code, reason);
119 void WorkerThreadableWebSocketChannel::fail(const String& reason)
122 m_bridge->fail(reason);
125 void WorkerThreadableWebSocketChannel::disconnect()
127 m_bridge->disconnect();
131 void WorkerThreadableWebSocketChannel::suspend()
133 m_workerClientWrapper->suspend();
138 void WorkerThreadableWebSocketChannel::resume()
140 m_workerClientWrapper->resume();
145 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode)
146 : m_workerClientWrapper(clientWrapper)
147 , m_loaderProxy(loaderProxy)
148 , m_mainWebSocketChannel(WebSocketChannel::create(context, this))
149 , m_taskMode(taskMode)
151 ASSERT(isMainThread());
154 WorkerThreadableWebSocketChannel::Peer::~Peer()
156 ASSERT(isMainThread());
157 if (m_mainWebSocketChannel)
158 m_mainWebSocketChannel->disconnect();
161 bool WorkerThreadableWebSocketChannel::Peer::useHixie76Protocol()
163 ASSERT(isMainThread());
164 ASSERT(m_mainWebSocketChannel);
165 return m_mainWebSocketChannel->useHixie76Protocol();
168 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
170 ASSERT(isMainThread());
171 if (!m_mainWebSocketChannel)
173 m_mainWebSocketChannel->connect(url, protocol);
176 static void workerContextDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sendRequestResult)
178 ASSERT_UNUSED(context, context->isWorkerContext());
179 workerClientWrapper->setSendRequestResult(sendRequestResult);
182 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
184 ASSERT(isMainThread());
185 if (!m_mainWebSocketChannel || !m_workerClientWrapper)
187 bool sendRequestResult = m_mainWebSocketChannel->send(message);
188 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
191 void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
193 ASSERT(isMainThread());
194 if (!m_mainWebSocketChannel || !m_workerClientWrapper)
196 bool sendRequestResult = m_mainWebSocketChannel->send(binaryData);
197 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
200 void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData)
202 ASSERT(isMainThread());
203 if (!m_mainWebSocketChannel || !m_workerClientWrapper)
205 bool sendRequestResult = m_mainWebSocketChannel->send(binaryData);
206 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
209 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
211 ASSERT_UNUSED(context, context->isWorkerContext());
212 workerClientWrapper->setBufferedAmount(bufferedAmount);
215 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
217 ASSERT(isMainThread());
218 if (!m_mainWebSocketChannel || !m_workerClientWrapper)
220 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
221 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
224 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
226 ASSERT(isMainThread());
227 if (!m_mainWebSocketChannel)
229 m_mainWebSocketChannel->close(code, reason);
232 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
234 ASSERT(isMainThread());
235 if (!m_mainWebSocketChannel)
237 m_mainWebSocketChannel->fail(reason);
240 void WorkerThreadableWebSocketChannel::Peer::disconnect()
242 ASSERT(isMainThread());
243 if (!m_mainWebSocketChannel)
245 m_mainWebSocketChannel->disconnect();
246 m_mainWebSocketChannel = 0;
249 void WorkerThreadableWebSocketChannel::Peer::suspend()
251 ASSERT(isMainThread());
252 if (!m_mainWebSocketChannel)
254 m_mainWebSocketChannel->suspend();
257 void WorkerThreadableWebSocketChannel::Peer::resume()
259 ASSERT(isMainThread());
260 if (!m_mainWebSocketChannel)
262 m_mainWebSocketChannel->resume();
265 static void workerContextDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol)
267 ASSERT_UNUSED(context, context->isWorkerContext());
268 workerClientWrapper->setSubprotocol(subprotocol);
269 workerClientWrapper->didConnect();
272 void WorkerThreadableWebSocketChannel::Peer::didConnect()
274 ASSERT(isMainThread());
275 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol()), m_taskMode);
278 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
280 ASSERT_UNUSED(context, context->isWorkerContext());
281 workerClientWrapper->didReceiveMessage(message);
284 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
286 ASSERT(isMainThread());
287 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
290 static void workerContextDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
292 ASSERT_UNUSED(context, context->isWorkerContext());
293 workerClientWrapper->didReceiveBinaryData(binaryData);
296 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
298 ASSERT(isMainThread());
299 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode);
302 static void workerContextDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
304 ASSERT_UNUSED(context, context->isWorkerContext());
305 workerClientWrapper->didStartClosingHandshake();
308 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
310 ASSERT(isMainThread());
311 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
314 static void workerContextDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
316 ASSERT_UNUSED(context, context->isWorkerContext());
317 workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
320 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
322 ASSERT(isMainThread());
323 m_mainWebSocketChannel = 0;
324 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode);
327 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool useHixie76Protocol)
329 ASSERT_UNUSED(context, context->isWorkerContext());
330 thisPtr->m_peer = peer;
331 workerClientWrapper->setUseHixie76Protocol(useHixie76Protocol);
332 workerClientWrapper->setSyncMethodDone();
335 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode)
337 ASSERT(isMainThread());
338 ASSERT_UNUSED(context, context->isDocument());
340 RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
342 Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode);
343 thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(
344 createCallbackTask(&Bridge::setWebSocketChannel,
345 AllowCrossThreadAccess(thisPtr),
346 AllowCrossThreadAccess(peer), clientWrapper, peer->useHixie76Protocol()), taskMode);
349 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode)
350 : m_workerClientWrapper(workerClientWrapper)
351 , m_workerContext(workerContext)
352 , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
353 , m_taskMode(taskMode)
356 ASSERT(m_workerClientWrapper.get());
357 setMethodNotCompleted();
358 m_loaderProxy.postTaskToLoader(
359 createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel,
360 AllowCrossThreadAccess(this), m_workerClientWrapper, m_taskMode));
361 waitForMethodCompletion();
365 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
370 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& protocol)
372 ASSERT(isMainThread());
373 ASSERT_UNUSED(context, context->isDocument());
376 peer->connect(url, protocol);
379 void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
381 ASSERT(m_workerClientWrapper);
383 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
386 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
388 ASSERT(isMainThread());
389 ASSERT_UNUSED(context, context->isDocument());
395 void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char> > data)
397 ASSERT(isMainThread());
398 ASSERT_UNUSED(context, context->isDocument());
401 RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
402 peer->send(*arrayBuffer);
405 void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& type, long long size)
407 ASSERT(isMainThread());
408 ASSERT_UNUSED(context, context->isDocument());
411 RefPtr<Blob> blob = Blob::create(url, type, size);
415 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
417 if (!m_workerClientWrapper)
420 setMethodNotCompleted();
421 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
422 RefPtr<Bridge> protect(this);
423 waitForMethodCompletion();
424 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
425 return clientWrapper && clientWrapper->sendRequestResult();
428 bool WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData)
430 if (!m_workerClientWrapper)
433 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
434 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(binaryData.byteLength()));
435 if (binaryData.byteLength())
436 memcpy(data->data(), binaryData.data(), binaryData.byteLength());
437 setMethodNotCompleted();
438 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release()));
439 RefPtr<Bridge> protect(this);
440 waitForMethodCompletion();
441 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
442 return clientWrapper && clientWrapper->sendRequestResult();
445 bool WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData)
447 if (!m_workerClientWrapper)
450 setMethodNotCompleted();
451 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size()));
452 RefPtr<Bridge> protect(this);
453 waitForMethodCompletion();
454 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
455 return clientWrapper && clientWrapper->sendRequestResult();
458 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
460 ASSERT(isMainThread());
461 ASSERT_UNUSED(context, context->isDocument());
464 peer->bufferedAmount();
467 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
469 if (!m_workerClientWrapper)
472 setMethodNotCompleted();
473 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
474 RefPtr<Bridge> protect(this);
475 waitForMethodCompletion();
476 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
478 return clientWrapper->bufferedAmount();
482 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String&reason)
484 ASSERT(isMainThread());
485 ASSERT_UNUSED(context, context->isDocument());
488 peer->close(code, reason);
491 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
494 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason));
497 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
499 ASSERT(isMainThread());
500 ASSERT_UNUSED(context, context->isDocument());
506 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
509 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
512 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
514 ASSERT(isMainThread());
515 ASSERT_UNUSED(context, context->isDocument());
521 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
523 clearClientWrapper();
527 m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, AllowCrossThreadAccess(peer)));
532 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
534 ASSERT(isMainThread());
535 ASSERT_UNUSED(context, context->isDocument());
541 void WorkerThreadableWebSocketChannel::Bridge::suspend()
544 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
547 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
549 ASSERT(isMainThread());
550 ASSERT_UNUSED(context, context->isDocument());
556 void WorkerThreadableWebSocketChannel::Bridge::resume()
559 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
562 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
564 m_workerClientWrapper->clearClient();
567 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
569 ASSERT(m_workerClientWrapper);
570 m_workerClientWrapper->clearSyncMethodDone();
573 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
574 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
575 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
577 if (!m_workerContext)
579 WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
580 MessageQueueWaitResult result = MessageQueueMessageReceived;
581 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
582 while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
583 result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
584 clientWrapper = m_workerClientWrapper.get();
588 } // namespace WebCore
590 #endif // ENABLE(WEB_SOCKETS)