SocketConnection.swift 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. //
  2. // SocketConnection.swift
  3. // Broadcast Extension
  4. //
  5. // Created by Alex-Dan Bumbu on 22/03/2021.
  6. // Copyright © 2021 Atlassian Inc. All rights reserved.
  7. //
  8. import Foundation
  9. class SocketConnection: NSObject {
  10. var didOpen: (() -> Void)?
  11. var didClose: ((Error?) -> Void)?
  12. var streamHasSpaceAvailable: (() -> Void)?
  13. private let filePath: String
  14. private var socketHandle: Int32 = -1
  15. private var address: sockaddr_un?
  16. private var inputStream: InputStream?
  17. private var outputStream: OutputStream?
  18. private var networkQueue: DispatchQueue?
  19. private var shouldKeepRunning = false
  20. init?(filePath path: String) {
  21. filePath = path
  22. socketHandle = Darwin.socket(AF_UNIX, SOCK_STREAM, 0)
  23. guard socketHandle != -1 else {
  24. print("failure: create socket")
  25. return nil
  26. }
  27. }
  28. func open() -> Bool {
  29. print("open socket connection")
  30. guard FileManager.default.fileExists(atPath: filePath) else {
  31. print("failure: socket file missing")
  32. return false
  33. }
  34. guard setupAddress() == true else {
  35. return false
  36. }
  37. guard connectSocket() == true else {
  38. return false
  39. }
  40. setupStreams()
  41. inputStream?.open()
  42. outputStream?.open()
  43. return true
  44. }
  45. func close() {
  46. unscheduleStreams()
  47. inputStream?.delegate = nil
  48. outputStream?.delegate = nil
  49. inputStream?.close()
  50. outputStream?.close()
  51. inputStream = nil
  52. outputStream = nil
  53. }
  54. func writeToStream(buffer: UnsafePointer<UInt8>, maxLength length: Int) -> Int {
  55. outputStream?.write(buffer, maxLength: length) ?? 0
  56. }
  57. }
  58. extension SocketConnection: StreamDelegate {
  59. func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
  60. switch eventCode {
  61. case .openCompleted:
  62. print("client stream open completed")
  63. if aStream == outputStream {
  64. didOpen?()
  65. }
  66. case .hasBytesAvailable:
  67. if aStream == inputStream {
  68. var buffer: UInt8 = 0
  69. let numberOfBytesRead = inputStream?.read(&buffer, maxLength: 1)
  70. if numberOfBytesRead == 0 && aStream.streamStatus == .atEnd {
  71. print("server socket closed")
  72. close()
  73. notifyDidClose(error: nil)
  74. }
  75. }
  76. case .hasSpaceAvailable:
  77. if aStream == outputStream {
  78. streamHasSpaceAvailable?()
  79. }
  80. case .errorOccurred:
  81. print("client stream error occured: \(String(describing: aStream.streamError))")
  82. close()
  83. notifyDidClose(error: aStream.streamError)
  84. default:
  85. break
  86. }
  87. }
  88. }
  89. private extension SocketConnection {
  90. func setupAddress() -> Bool {
  91. var addr = sockaddr_un()
  92. guard filePath.count < MemoryLayout.size(ofValue: addr.sun_path) else {
  93. print("failure: fd path is too long")
  94. return false
  95. }
  96. _ = withUnsafeMutablePointer(to: &addr.sun_path.0) { ptr in
  97. filePath.withCString {
  98. strncpy(ptr, $0, filePath.count)
  99. }
  100. }
  101. address = addr
  102. return true
  103. }
  104. func connectSocket() -> Bool {
  105. guard var addr = address else {
  106. return false
  107. }
  108. let status = withUnsafePointer(to: &addr) { ptr in
  109. ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  110. Darwin.connect(socketHandle, $0, socklen_t(MemoryLayout<sockaddr_un>.size))
  111. }
  112. }
  113. guard status == noErr else {
  114. print("failure: \(status)")
  115. return false
  116. }
  117. return true
  118. }
  119. func setupStreams() {
  120. var readStream: Unmanaged<CFReadStream>?
  121. var writeStream: Unmanaged<CFWriteStream>?
  122. CFStreamCreatePairWithSocket(kCFAllocatorDefault, socketHandle, &readStream, &writeStream)
  123. inputStream = readStream?.takeRetainedValue()
  124. inputStream?.delegate = self
  125. inputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  126. outputStream = writeStream?.takeRetainedValue()
  127. outputStream?.delegate = self
  128. outputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  129. scheduleStreams()
  130. }
  131. func scheduleStreams() {
  132. shouldKeepRunning = true
  133. networkQueue = DispatchQueue.global(qos: .userInitiated)
  134. networkQueue?.async { [weak self] in
  135. self?.inputStream?.schedule(in: .current, forMode: .common)
  136. self?.outputStream?.schedule(in: .current, forMode: .common)
  137. RunLoop.current.run()
  138. var isRunning = false
  139. repeat {
  140. isRunning = self?.shouldKeepRunning ?? false && RunLoop.current.run(mode: .default, before: .distantFuture)
  141. } while (isRunning)
  142. }
  143. }
  144. func unscheduleStreams() {
  145. networkQueue?.sync { [weak self] in
  146. self?.inputStream?.remove(from: .current, forMode: .common)
  147. self?.outputStream?.remove(from: .current, forMode: .common)
  148. }
  149. shouldKeepRunning = false
  150. }
  151. func notifyDidClose(error: Error?) {
  152. if didClose != nil {
  153. didClose?(error)
  154. }
  155. }
  156. }