From a4a5e995342818f8b81d1d7f5e5ccd09edb156d2 Mon Sep 17 00:00:00 2001 From: Matthew Chen Date: Mon, 21 May 2018 10:55:48 -0400 Subject: [PATCH] PeerConnectionClient thread safety. --- Signal/src/call/PeerConnectionClient.swift | 619 ++++++++++++--------- 1 file changed, 365 insertions(+), 254 deletions(-) diff --git a/Signal/src/call/PeerConnectionClient.swift b/Signal/src/call/PeerConnectionClient.swift index 2a03883b4..26cd26163 100644 --- a/Signal/src/call/PeerConnectionClient.swift +++ b/Signal/src/call/PeerConnectionClient.swift @@ -74,7 +74,6 @@ protocol PeerConnectionClientDelegate: class { */ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelDelegate { - let TAG = "[PeerConnectionClient]" enum Identifiers: String { case mediaStream = "ARDAMS", videoTrack = "ARDAMSv0", @@ -95,7 +94,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD // Connection - private var peerConnection: RTCPeerConnection! + private var peerConnection: RTCPeerConnection? private let iceServers: [RTCIceServer] private let connectionConstraints: RTCMediaConstraints private let configuration: RTCConfiguration @@ -121,10 +120,6 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD // RTCVideoTrack is fragile and prone to throwing exceptions and/or // causing deadlock in its destructor. Therefore we take great care // with this property. - // - // We synchronize access to this property and ensure that we never - // set or use a strong reference to the remote video track if - // peerConnection is nil. private var remoteVideoTrack: RTCVideoTrack? private var cameraConstraints: RTCMediaConstraints @@ -139,10 +134,10 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD configuration.bundlePolicy = .maxBundle configuration.rtcpMuxPolicy = .require if useTurnOnly { - Logger.debug("\(TAG) using iceTransportPolicy: relay") + Logger.debug("\(PeerConnectionClient.logTag) using iceTransportPolicy: relay") configuration.iceTransportPolicy = .relay } else { - Logger.debug("\(TAG) using iceTransportPolicy: default") + Logger.debug("\(PeerConnectionClient.logTag) using iceTransportPolicy: default") } let connectionConstraintsDict = ["DtlsSrtpKeyAgreement": "true"] @@ -175,6 +170,11 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD private func createSignalingDataChannel() { SwiftAssertIsOnMainThread(#function) + guard let peerConnection = peerConnection else { + owsFail("\(logTag) in \(#function) peerConnection was unexpectedly nil") + return + } + let configuration = RTCDataChannelConfiguration() // Insist upon an "ordered" TCP data channel for delivery reliability. configuration.isOrdered = true @@ -191,11 +191,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD fileprivate func createVideoSender() { SwiftAssertIsOnMainThread(#function) - Logger.debug("\(TAG) in \(#function)") + Logger.debug("\(logTag) in \(#function)") assert(self.videoSender == nil, "\(#function) should only be called once.") guard !Platform.isSimulator else { - Logger.warn("\(TAG) Refusing to create local video track on simulator which has no capture device.") + Logger.warn("\(logTag) Refusing to create local video track on simulator which has no capture device.") + return + } + guard let peerConnection = peerConnection else { + owsFail("\(logTag) in \(#function) peerConnection was unexpectedly nil") return } @@ -227,15 +231,16 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setCameraSource(useBackCamera: Bool) { SwiftAssertIsOnMainThread(#function) - PeerConnectionClient.signalingQueue.async { - guard let localVideoSource = self.localVideoSource else { - owsFail("\(self.logTag) in \(#function) localVideoSource was unexpectedly nil") + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + guard let localVideoSource = strongSelf.localVideoSource else { + owsFail("\(strongSelf.logTag) in \(#function) localVideoSource was unexpectedly nil") return } // certain devices, e.g. 16GB iPod touch don't have a back camera guard localVideoSource.canUseBackCamera else { - owsFail("\(self.logTag) in \(#function) canUseBackCamera was unexpectedly false") + owsFail("\(strongSelf.logTag) in \(#function) canUseBackCamera was unexpectedly false") return } @@ -246,37 +251,40 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setLocalVideoEnabled(enabled: Bool) { SwiftAssertIsOnMainThread(#function) - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + let completion = { [weak self] in + guard let strongSelf = self else { return } + guard let localVideoTrack = strongSelf.localVideoTrack else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClient(strongSelf, didUpdateLocal: enabled ? localVideoTrack : nil) + } + + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") return } - guard let localVideoTrack = self.localVideoTrack else { + guard let localVideoTrack = strongSelf.localVideoTrack else { let action = enabled ? "enable" : "disable" - Logger.error("\(self.TAG)) trying to \(action) videoTrack which doesn't exist") + Logger.error("\(strongSelf.logTag)) trying to \(action) videoTrack which doesn't exist") return } - guard let videoCaptureSession = self.videoCaptureSession else { - Logger.debug("\(self.TAG) videoCaptureSession was unexpectedly nil") + guard let videoCaptureSession = strongSelf.videoCaptureSession else { + Logger.debug("\(strongSelf.logTag) videoCaptureSession was unexpectedly nil") return } localVideoTrack.isEnabled = enabled if enabled { - Logger.debug("\(self.TAG) in \(#function) starting videoCaptureSession") + Logger.debug("\(strongSelf.logTag) in \(#function) starting videoCaptureSession") videoCaptureSession.startRunning() } else { - Logger.debug("\(self.TAG) in \(#function) stopping videoCaptureSession") + Logger.debug("\(strongSelf.logTag) in \(#function) stopping videoCaptureSession") videoCaptureSession.stopRunning() } - DispatchQueue.main.async { [weak self, weak localVideoTrack] in - guard let strongSelf = self else { return } - guard let strongLocalVideoTrack = localVideoTrack else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClient(strongSelf, didUpdateLocal: enabled ? strongLocalVideoTrack : nil) - } + DispatchQueue.main.async(execute: completion) } } @@ -285,9 +293,14 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD fileprivate func createAudioSender() { SwiftAssertIsOnMainThread(#function) - Logger.debug("\(TAG) in \(#function)") + Logger.debug("\(logTag) in \(#function)") assert(self.audioSender == nil, "\(#function) should only be called once.") + guard let peerConnection = peerConnection else { + owsFail("\(logTag) in \(#function) peerConnection was unexpectedly nil") + return + } + let audioSource = factory.audioSource(with: self.audioConstraints) let audioTrack = factory.audioTrack(with: audioSource, trackId: Identifiers.audioTrack.rawValue) @@ -307,14 +320,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setAudioEnabled(enabled: Bool) { SwiftAssertIsOnMainThread(#function) - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") return } - guard let audioTrack = self.audioTrack else { + guard let audioTrack = strongSelf.audioTrack else { let action = enabled ? "enable" : "disable" - Logger.error("\(self.TAG) trying to \(action) audioTrack which doesn't exist.") + Logger.error("\(strongSelf.logTag) trying to \(action) audioTrack which doesn't exist.") return } @@ -332,72 +346,103 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD return RTCMediaConstraints(mandatoryConstraints: mandatoryConstraints, optionalConstraints: nil) } + // TODO: Review all self.peerConnection + // TODO: Review all .async + // TODO: Review all error == nil public func createOffer() -> Promise { SwiftAssertIsOnMainThread(#function) + let completion: ((((HardenedRTCSessionDescription) -> Void), ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") + reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) + return + } + if let error = error { + reject(error) + return + } + + guard let sessionDescription = sdp else { + Logger.error("\(strongSelf.logTag) No session description was obtained, even though there was no error reported.") + let error = OWSErrorMakeUnableToProcessServerResponseError() + reject(error) + return + } + + fulfill(HardenedRTCSessionDescription(rtcSessionDescription: sessionDescription)) + } + + let workBlock : ((@escaping ((HardenedRTCSessionDescription) -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + guard let peerConnection = strongSelf.peerConnection else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") + reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) + return + } + + peerConnection.offer(for: strongSelf.defaultOfferConstraints, completionHandler: { (sdp: RTCSessionDescription?, error: Error?) in + PeerConnectionClient.signalingQueue.async { + completion(fulfill, reject, sdp, error) + } + }) + } + return Promise { fulfill, reject in SwiftAssertIsOnMainThread(#function) PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") - reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) - return - } - - self.peerConnection.offer(for: self.defaultOfferConstraints, completionHandler: { (sdp: RTCSessionDescription?, error: Error?) in - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") - reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) - return - } - guard error == nil else { - reject(error!) - return - } - - guard let sessionDescription = sdp else { - Logger.error("\(self.TAG) No session description was obtained, even though there was no error reported.") - let error = OWSErrorMakeUnableToProcessServerResponseError() - reject(error) - return - } - - fulfill(HardenedRTCSessionDescription(rtcSessionDescription: sessionDescription)) - } - }) + workBlock(fulfill, reject) } } } + // TODO: Review all promises public func setLocalSessionDescriptionInternal(_ sessionDescription: HardenedRTCSessionDescription) -> Promise { - return PromiseKit.wrap { resolve in - self.assertOnSignalingQueue() - Logger.verbose("\(self.TAG) setting local session description: \(sessionDescription)") - self.peerConnection.setLocalDescription(sessionDescription.rtcSessionDescription, completionHandler: resolve) + return PromiseKit.wrap { [weak self] resolve in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + + guard let peerConnection = peerConnection else { + owsFail("\(strongSelf.logTag) in \(#function) peerConnection was unexpectedly nil") + return + } + + Logger.verbose("\(strongSelf.logTag) setting local session description: \(sessionDescription)") + peerConnection.setLocalDescription(sessionDescription.rtcSessionDescription, completionHandler: resolve) } } + // TODO: Review all self. public func setLocalSessionDescription(_ sessionDescription: HardenedRTCSessionDescription) -> Promise { SwiftAssertIsOnMainThread(#function) + let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + guard let peerConnection = strongSelf.peerConnection else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") + reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) + return + } + + Logger.verbose("\(strongSelf.logTag) setting local session description: \(sessionDescription)") + peerConnection.setLocalDescription(sessionDescription.rtcSessionDescription, + completionHandler: { error in + guard error == nil else { + reject(error!) + return + } + fulfill() + }) + } + return Promise { fulfill, reject in PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") - reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) - return - } - Logger.verbose("\(self.TAG) setting local session description: \(sessionDescription)") - self.peerConnection.setLocalDescription(sessionDescription.rtcSessionDescription, - completionHandler: { error in - guard error == nil else { - reject(error!) - return - } - fulfill() - }) + workBlock(fulfill, reject) } } } @@ -406,30 +451,41 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD SwiftAssertIsOnMainThread(#function) return setRemoteSessionDescription(remoteDescription) - .then(on: PeerConnectionClient.signalingQueue) { - return self.negotiateAnswerSessionDescription(constraints: constraints) + .then(on: PeerConnectionClient.signalingQueue) { [weak self] in + guard let strongSelf = self else { + return Promise { _, reject in + reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) + } + } + return strongSelf.negotiateAnswerSessionDescription(constraints: constraints) } } public func setRemoteSessionDescription(_ sessionDescription: RTCSessionDescription) -> Promise { SwiftAssertIsOnMainThread(#function) + let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + guard let peerConnection = strongSelf.peerConnection else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") + reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) + return + } + Logger.verbose("\(strongSelf.logTag) setting remote description: \(sessionDescription)") + peerConnection.setRemoteDescription(sessionDescription, + completionHandler: { error in + guard error == nil else { + reject(error!) + return + } + fulfill() + }) + } + return Promise { fulfill, reject in PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") - reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) - return - } - Logger.verbose("\(self.TAG) setting remote description: \(sessionDescription)") - self.peerConnection.setRemoteDescription(sessionDescription, - completionHandler: { error in - guard error == nil else { - reject(error!) - return - } - fulfill() - }) + workBlock(fulfill, reject) } } } @@ -437,78 +493,87 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD private func negotiateAnswerSessionDescription(constraints: RTCMediaConstraints) -> Promise { assertOnSignalingQueue() - return Promise { fulfill, reject in - assertOnSignalingQueue() - - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + let completion : ((@escaping ((HardenedRTCSessionDescription) -> Void), @escaping ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) return } + if let error = error { + reject(error) + return + } - Logger.debug("\(self.TAG) negotiating answer session.") + guard let sessionDescription = sdp else { + Logger.error("\(strongSelf.logTag) unexpected empty session description, even though no error was reported.") + let error = OWSErrorMakeUnableToProcessServerResponseError() + reject(error) + return + } - peerConnection.answer(for: constraints, completionHandler: { (sdp: RTCSessionDescription?, error: Error?) in - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") - reject(NSError(domain: "Obsolete client", code: 0, userInfo: nil)) - return - } - guard error == nil else { - reject(error!) - return - } + let hardenedSessionDescription = HardenedRTCSessionDescription(rtcSessionDescription: sessionDescription) - guard let sessionDescription = sdp else { - Logger.error("\(self.TAG) unexpected empty session description, even though no error was reported.") - let error = OWSErrorMakeUnableToProcessServerResponseError() - reject(error) - return - } + strongSelf.setLocalSessionDescriptionInternal(hardenedSessionDescription) + .then(on: PeerConnectionClient.signalingQueue) { _ in + fulfill(hardenedSessionDescription) + }.catch { error in + reject(error) + } + } - let hardenedSessionDescription = HardenedRTCSessionDescription(rtcSessionDescription: sessionDescription) + return Promise { [weak self] fulfill, reject in + guard let strongSelf = self else { return } + strongSelf.assertOnSignalingQueue() - self.setLocalSessionDescriptionInternal(hardenedSessionDescription) - .then(on: PeerConnectionClient.signalingQueue) { - fulfill(hardenedSessionDescription) - }.catch { error in - reject(error) - } + guard let peerConnection = strongSelf.peerConnection else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") + return + } + + Logger.debug("\(strongSelf.logTag) negotiating answer session.") + + peerConnection.answer(for: constraints, completionHandler: { (sdp: RTCSessionDescription?, error: Error?) in + PeerConnectionClient.signalingQueue.async { + completion(fulfill, reject, sdp, error) } }) } } public func addRemoteIceCandidate(_ candidate: RTCIceCandidate) { - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard let peerConnection = strongSelf.peerConnection else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") return } - Logger.info("\(self.TAG) adding remote ICE candidate: \(candidate.sdp)") - self.peerConnection.add(candidate) + Logger.info("\(strongSelf.logTag) adding remote ICE candidate: \(candidate.sdp)") + peerConnection.add(candidate) } } public func terminate() { SwiftAssertIsOnMainThread(#function) - Logger.debug("\(TAG) in \(#function)") + Logger.debug("\(logTag) in \(#function)") // Clear the delegate immediately so that we can guarantee that // no delegate methods are called after terminate() returns. delegate = nil - PeerConnectionClient.signalingQueue.async { - assert(self.peerConnection != nil) - self.terminateInternal() + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + strongSelf.terminateInternal() } } private func terminateInternal() { assertOnSignalingQueue() - Logger.debug("\(TAG) in \(#function)") + Logger.debug("\(logTag) in \(#function)") // Some notes on preventing crashes while disposing of peerConnection for video calls // from: https://groups.google.com/forum/#!searchin/discuss-webrtc/objc$20crash$20dealloc%7Csort:relevance/discuss-webrtc/7D-vk5yLjn8/rBW2D6EW4GYJ @@ -523,9 +588,6 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD // become nil when it was only a weak property. So we retain it and manually nil the reference here, because // we are likely to crash if we retain any peer connection properties when the peerconnection is released - // See the comments on the remoteVideoTrack property. - objc_sync_enter(self) - localVideoTrack?.isEnabled = false remoteVideoTrack?.isEnabled = false @@ -537,11 +599,11 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD localVideoTrack = nil remoteVideoTrack = nil - peerConnection.delegate = nil - peerConnection.close() + if let peerConnection = peerConnection { + peerConnection.delegate = nil + peerConnection.close() + } peerConnection = nil - - objc_sync_exit(self) } // MARK: - Data Channel @@ -557,31 +619,33 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func sendDataChannelMessage(data: Data, description: String, isCritical: Bool) { SwiftAssertIsOnMainThread(#function) - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client: \(description)") + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client: \(description)") return } - guard let dataChannel = self.dataChannel else { + guard let dataChannel = strongSelf.dataChannel else { if isCritical { - Logger.info("\(self.TAG) in \(#function) enqueuing critical data channel message for after we have a dataChannel: \(description)") - self.pendingDataChannelMessages.append(PendingDataChannelMessage(data: data, description: description, isCritical: isCritical)) + Logger.info("\(strongSelf.logTag) in \(#function) enqueuing critical data channel message for after we have a dataChannel: \(description)") + strongSelf.pendingDataChannelMessages.append(PendingDataChannelMessage(data: data, description: description, isCritical: isCritical)) } else { - Logger.error("\(self.TAG) in \(#function) ignoring sending \(data) for nil dataChannel: \(description)") + Logger.error("\(strongSelf.logTag) in \(#function) ignoring sending \(data) for nil dataChannel: \(description)") } return } - Logger.debug("\(self.TAG) sendDataChannelMessage trying: \(description)") + Logger.debug("\(strongSelf.logTag) sendDataChannelMessage trying: \(description)") let buffer = RTCDataBuffer(data: data, isBinary: false) let result = dataChannel.sendData(buffer) if result { - Logger.debug("\(self.TAG) sendDataChannelMessage succeeded: \(description)") + Logger.debug("\(strongSelf.logTag) sendDataChannelMessage succeeded: \(description)") } else { - Logger.warn("\(self.TAG) sendDataChannelMessage failed: \(description)") + Logger.warn("\(strongSelf.logTag) sendDataChannelMessage failed: \(description)") if isCritical { OWSProdError(OWSAnalyticsEvents.peerConnectionClientErrorSendDataChannelMessageFailed(), file: #file, function: #function, line: #line) } @@ -593,177 +657,224 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** The data channel state changed. */ internal func dataChannelDidChangeState(_ dataChannel: RTCDataChannel) { - Logger.debug("\(TAG) dataChannelDidChangeState: \(dataChannel)") + Logger.debug("\(logTag) dataChannelDidChangeState: \(dataChannel)") } /** The data channel successfully received a data buffer. */ internal func dataChannel(_ dataChannel: RTCDataChannel, didReceiveMessageWith buffer: RTCDataBuffer) { - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + + let completion: (OWSWebRTCProtosData) -> Void = { [weak self] (dataChannelMessage) in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClient(strongSelf, received: dataChannelMessage) + } + + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard strongSelf.peerConnection != nil else { + Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") return } - Logger.debug("\(self.TAG) dataChannel didReceiveMessageWith buffer:\(buffer)") + Logger.debug("\(strongSelf.logTag) dataChannel didReceiveMessageWith buffer:\(buffer)") guard let dataChannelMessage = OWSWebRTCProtosData.parse(from: buffer.data) else { // TODO can't proto parsings throw an exception? Is it just being lost in the Objc->Swift? - Logger.error("\(self.TAG) failed to parse dataProto") + Logger.error("\(strongSelf.logTag) failed to parse dataProto") return } - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClient(strongSelf, received: dataChannelMessage) + DispatchQueue.main.async { + completion(dataChannelMessage) } } } /** The data channel's |bufferedAmount| changed. */ internal func dataChannel(_ dataChannel: RTCDataChannel, didChangeBufferedAmount amount: UInt64) { - Logger.debug("\(TAG) didChangeBufferedAmount: \(amount)") + Logger.debug("\(logTag) didChangeBufferedAmount: \(amount)") } // MARK: - RTCPeerConnectionDelegate /** Called when the SignalingState changed. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didChange stateChanged: RTCSignalingState) { - Logger.debug("\(TAG) didChange signalingState:\(stateChanged.debugDescription)") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange stateChanged: RTCSignalingState) { + Logger.debug("\(logTag) didChange signalingState:\(stateChanged.debugDescription)") } + // TODO: Review all peerConnection params + // TODO: There's work being done here on the wrong queue. /** Called when media is received on a new stream from remote peer. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didAdd stream: RTCMediaStream) { - guard stream.videoTracks.count > 0 else { - return - } - let remoteVideoTrack = stream.videoTracks[0] - Logger.debug("\(self.TAG) didAdd stream:\(stream) video tracks: \(stream.videoTracks.count) audio tracks: \(stream.audioTracks.count)") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didAdd stream: RTCMediaStream) { - // See the comments on the remoteVideoTrack property. - // - // We only set the remoteVideoTrack property if peerConnection is non-nil. - objc_sync_enter(self) - if self.peerConnection != nil { - self.remoteVideoTrack = remoteVideoTrack + let completion: (RTCVideoTrack) -> Void = { [weak self] (remoteVideoTrack) in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + + // TODO: Consider checking for termination here. + + strongDelegate.peerConnectionClient(strongSelf, didUpdateRemote: remoteVideoTrack) } - objc_sync_exit(self) - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard let peerConnection = strongSelf.peerConnection else { + owsFail("\(strongSelf.logTag) in \(#function) peerConnection was unexpectedly nil") return } + guard peerConnection == peerConnectionParam else { + owsFail("\(strongSelf.logTag) in \(#function) mismatched peerConnection callback.") + return + } + guard stream.videoTracks.count > 0 else { + owsFail("\(strongSelf.logTag) in \(#function) didAdd stream missing stream.") + return + } + let remoteVideoTrack = stream.videoTracks[0] + Logger.debug("\(strongSelf.logTag) didAdd stream:\(stream) video tracks: \(stream.videoTracks.count) audio tracks: \(stream.audioTracks.count)") - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - - // See the comments on the remoteVideoTrack property. - // - // We only access the remoteVideoTrack property if peerConnection is non-nil. - var remoteVideoTrack: RTCVideoTrack? - objc_sync_enter(strongSelf) - if strongSelf.peerConnection != nil { - remoteVideoTrack = strongSelf.remoteVideoTrack - } - objc_sync_exit(strongSelf) + strongSelf.remoteVideoTrack = remoteVideoTrack - strongDelegate.peerConnectionClient(strongSelf, didUpdateRemote: remoteVideoTrack) + DispatchQueue.main.async { + completion(remoteVideoTrack) } } } /** Called when a remote peer closes a stream. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didRemove stream: RTCMediaStream) { - Logger.debug("\(TAG) didRemove Stream:\(stream)") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove stream: RTCMediaStream) { + Logger.debug("\(logTag) didRemove Stream:\(stream)") } /** Called when negotiation is needed, for example ICE has restarted. */ - internal func peerConnectionShouldNegotiate(_ peerConnection: RTCPeerConnection) { - Logger.debug("\(TAG) shouldNegotiate") + internal func peerConnectionShouldNegotiate(_ peerConnectionParam: RTCPeerConnection) { + Logger.debug("\(logTag) shouldNegotiate") } /** Called any time the IceConnectionState changes. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didChange newState: RTCIceConnectionState) { - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceConnectionState) { + + let connectedCompletion : () -> Void = { [weak self] in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClientIceConnected(strongSelf) + } + let failedCompletion : () -> Void = { [weak self] in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClientIceFailed(strongSelf) + } + let disconnectedCompletion : () -> Void = { [weak self] in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClientIceDisconnected(strongSelf) + } + + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard let peerConnection = strongSelf.peerConnection else { + owsFail("\(strongSelf.logTag) in \(#function) peerConnection was unexpectedly nil") return } - Logger.info("\(self.TAG) didChange IceConnectionState:\(newState.debugDescription)") + guard peerConnection == peerConnectionParam else { + owsFail("\(strongSelf.logTag) in \(#function) mismatched peerConnection callback.") + return + } + + Logger.info("\(strongSelf.logTag) didChange IceConnectionState:\(newState.debugDescription)") switch newState { case .connected, .completed: - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClientIceConnected(strongSelf) - } + DispatchQueue.main.async(execute: connectedCompletion) case .failed: - Logger.warn("\(self.TAG) RTCIceConnection failed.") - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClientIceFailed(strongSelf) - } + Logger.warn("\(strongSelf.logTag) RTCIceConnection failed.") + DispatchQueue.main.async(execute: failedCompletion) case .disconnected: - Logger.warn("\(self.TAG) RTCIceConnection disconnected.") - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClientIceDisconnected(strongSelf) - } + Logger.warn("\(strongSelf.logTag) RTCIceConnection disconnected.") + DispatchQueue.main.async(execute: disconnectedCompletion) default: - Logger.debug("\(self.TAG) ignoring change IceConnectionState:\(newState.debugDescription)") + Logger.debug("\(strongSelf.logTag) ignoring change IceConnectionState:\(newState.debugDescription)") } } } /** Called any time the IceGatheringState changes. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didChange newState: RTCIceGatheringState) { - Logger.info("\(TAG) didChange IceGatheringState:\(newState.debugDescription)") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceGatheringState) { + Logger.info("\(logTag) didChange IceGatheringState:\(newState.debugDescription)") } /** New ice candidate has been found. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didGenerate candidate: RTCIceCandidate) { - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didGenerate candidate: RTCIceCandidate) { + + let completion: (RTCIceCandidate) -> Void = { [weak self] (candidate) in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + guard let strongDelegate = strongSelf.delegate else { return } + strongDelegate.peerConnectionClient(strongSelf, addedLocalIceCandidate: candidate) + } + + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard let peerConnection = strongSelf.peerConnection else { + owsFail("\(strongSelf.logTag) in \(#function) peerConnection was unexpectedly nil") + return + } + guard peerConnection == peerConnectionParam else { + owsFail("\(strongSelf.logTag) in \(#function) mismatched peerConnection callback.") return } - Logger.info("\(self.TAG) adding local ICE candidate:\(candidate.sdp)") - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - guard let strongDelegate = strongSelf.delegate else { return } - strongDelegate.peerConnectionClient(strongSelf, addedLocalIceCandidate: candidate) + Logger.info("\(strongSelf.logTag) adding local ICE candidate:\(candidate.sdp)") + DispatchQueue.main.async { + completion(candidate) } } } /** Called when a group of local Ice candidates have been removed. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didRemove candidates: [RTCIceCandidate]) { - Logger.debug("\(TAG) didRemove IceCandidates:\(candidates)") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove candidates: [RTCIceCandidate]) { + Logger.debug("\(logTag) didRemove IceCandidates:\(candidates)") } /** New data channel has been opened. */ - internal func peerConnection(_ peerConnection: RTCPeerConnection, didOpen dataChannel: RTCDataChannel) { - PeerConnectionClient.signalingQueue.async { - guard self.peerConnection != nil else { - Logger.debug("\(self.TAG) \(#function) Ignoring obsolete event in terminated client") + internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didOpen dataChannel: RTCDataChannel) { + + let completion: ([PendingDataChannelMessage]) -> Void = { [weak self] (pendingMessages) in + SwiftAssertIsOnMainThread(#function) + guard let strongSelf = self else { return } + pendingMessages.forEach { message in + strongSelf.sendDataChannelMessage(data: message.data, description: message.description, isCritical: message.isCritical) + } + } + + PeerConnectionClient.signalingQueue.async { [weak self] in + guard let strongSelf = self else { return } + + guard let peerConnection = strongSelf.peerConnection else { + owsFail("\(strongSelf.logTag) in \(#function) peerConnection was unexpectedly nil") + return + } + guard peerConnection == peerConnectionParam else { + owsFail("\(strongSelf.logTag) in \(#function) mismatched peerConnection callback.") return } - Logger.info("\(self.TAG) didOpen dataChannel:\(dataChannel)") - assert(self.dataChannel == nil) - self.dataChannel = dataChannel + Logger.info("\(strongSelf.logTag) didOpen dataChannel:\(dataChannel)") + if strongSelf.dataChannel != nil { + owsFail("\(strongSelf.logTag) in \(#function) dataChannel unexpectedly set twice.") + } + strongSelf.dataChannel = dataChannel dataChannel.delegate = self - let pendingMessages = self.pendingDataChannelMessages - self.pendingDataChannelMessages = [] - DispatchQueue.main.async { [weak self] in - guard let strongSelf = self else { return } - - pendingMessages.forEach { message in - strongSelf.sendDataChannelMessage(data: message.data, description: message.description, isCritical: message.isCritical) - } + let pendingMessages = strongSelf.pendingDataChannelMessages + strongSelf.pendingDataChannelMessages = [] + DispatchQueue.main.async { + completion(pendingMessages) } } } @@ -785,7 +896,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD var result: RTCPeerConnection? = nil PeerConnectionClient.signalingQueue.sync { result = peerConnection - Logger.info("\(self.TAG) called \(#function)") + Logger.info("\(self.logTag) called \(#function)") } return result! } @@ -796,7 +907,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD var result: RTCDataChannel? = nil PeerConnectionClient.signalingQueue.sync { result = dataChannel - Logger.info("\(self.TAG) called \(#function)") + Logger.info("\(self.logTag) called \(#function)") } return result! }