SampleUploader.swift 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. //
  2. // SampleUploader.swift
  3. // Broadcast Extension
  4. //
  5. // Created by Alex-Dan Bumbu on 22/03/2021.
  6. // Copyright © 2021 8x8, Inc. All rights reserved.
  7. //
  8. import Foundation
  9. import ReplayKit
  10. private enum Constants {
  11. static let bufferMaxLength = 10240
  12. }
  13. class SampleUploader {
  14. private static var imageContext = CIContext(options: nil)
  15. @Atomic private var isReady = false
  16. private var connection: SocketConnection
  17. private var dataToSend: Data?
  18. private var byteIndex = 0
  19. private let serialQueue: DispatchQueue
  20. init(connection: SocketConnection) {
  21. self.connection = connection
  22. self.serialQueue = DispatchQueue(label: "org.jitsi.meet.broadcast.sampleUploader")
  23. setupConnection()
  24. }
  25. @discardableResult func send(sample buffer: CMSampleBuffer) -> Bool {
  26. guard isReady else {
  27. return false
  28. }
  29. isReady = false
  30. dataToSend = prepare(sample: buffer)
  31. byteIndex = 0
  32. serialQueue.async { [weak self] in
  33. self?.sendDataChunk()
  34. }
  35. return true
  36. }
  37. }
  38. private extension SampleUploader {
  39. func setupConnection() {
  40. connection.didOpen = { [weak self] in
  41. self?.isReady = true
  42. }
  43. connection.streamHasSpaceAvailable = { [weak self] in
  44. self?.serialQueue.async {
  45. if let success = self?.sendDataChunk() {
  46. self?.isReady = !success
  47. }
  48. }
  49. }
  50. }
  51. @discardableResult func sendDataChunk() -> Bool {
  52. guard let dataToSend = dataToSend else {
  53. return false
  54. }
  55. var bytesLeft = dataToSend.count - byteIndex
  56. var length = bytesLeft > Constants.bufferMaxLength ? Constants.bufferMaxLength : bytesLeft
  57. length = dataToSend[byteIndex..<(byteIndex + length)].withUnsafeBytes {
  58. guard let ptr = $0.bindMemory(to: UInt8.self).baseAddress else {
  59. return 0
  60. }
  61. return connection.writeToStream(buffer: ptr, maxLength: length)
  62. }
  63. if length > 0 {
  64. byteIndex += length
  65. bytesLeft -= length
  66. if bytesLeft == 0 {
  67. self.dataToSend = nil
  68. byteIndex = 0
  69. }
  70. } else {
  71. print("writeBufferToStream failure")
  72. }
  73. return true
  74. }
  75. func prepare(sample buffer: CMSampleBuffer) -> Data? {
  76. guard let imageBuffer = CMSampleBufferGetImageBuffer(buffer) else {
  77. print("image buffer not available")
  78. return nil
  79. }
  80. CVPixelBufferLockBaseAddress(imageBuffer, .readOnly)
  81. let scaleFactor = 2.0
  82. let width = CVPixelBufferGetWidth(imageBuffer)/Int(scaleFactor)
  83. let height = CVPixelBufferGetHeight(imageBuffer)/Int(scaleFactor)
  84. let orientation = CMGetAttachment(buffer, key: RPVideoSampleOrientationKey as CFString, attachmentModeOut: nil)?.uintValue ?? 0
  85. let scaleTransform = CGAffineTransform(scaleX: CGFloat(1.0/scaleFactor), y: CGFloat(1.0/scaleFactor))
  86. let bufferData = self.jpegData(from: imageBuffer, scale: scaleTransform)
  87. CVPixelBufferUnlockBaseAddress(imageBuffer, .readOnly)
  88. guard let messageData = bufferData else {
  89. print("corrupted image buffer")
  90. return nil
  91. }
  92. let httpResponse = CFHTTPMessageCreateResponse(nil, 200, nil, kCFHTTPVersion1_1).takeRetainedValue()
  93. CFHTTPMessageSetHeaderFieldValue(httpResponse, "Content-Length" as CFString, String(messageData.count) as CFString)
  94. CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Width" as CFString, String(width) as CFString)
  95. CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Height" as CFString, String(height) as CFString)
  96. CFHTTPMessageSetHeaderFieldValue(httpResponse, "Buffer-Orientation" as CFString, String(orientation) as CFString)
  97. CFHTTPMessageSetBody(httpResponse, messageData as CFData)
  98. let serializedMessage = CFHTTPMessageCopySerializedMessage(httpResponse)?.takeRetainedValue() as Data?
  99. return serializedMessage
  100. }
  101. func jpegData(from buffer: CVPixelBuffer, scale scaleTransform: CGAffineTransform) -> Data? {
  102. let image = CIImage(cvPixelBuffer: buffer).transformed(by: scaleTransform)
  103. guard let colorSpace = image.colorSpace else {
  104. return nil
  105. }
  106. let options: [CIImageRepresentationOption: Float] = [kCGImageDestinationLossyCompressionQuality as CIImageRepresentationOption: 1.0]
  107. return SampleUploader.imageContext.jpegRepresentation(of: image, colorSpace: colorSpace, options: options)
  108. }
  109. }