Convert all compressors to use a producer/consumer thread model (VTCompressionSessions sometimes block!)

When restoring an NDI source, asynchronously wait for the stored source name to appear on the network; the underlying framework doesn't populate the available list immediately on program start.
Don't double-restore live layout on launch.
If a input is designated as 'allowDedup' do not cache it unless the user explicitly checks the cache setting to on
This commit is contained in:
Zakk 2020-04-07 18:26:13 -04:00
parent b4ad6f56d0
commit c5bbb1b48a
17 changed files with 408 additions and 367 deletions

View file

@ -12,6 +12,7 @@
#import "NDIAudioOutputDelegateProtocol.h"
@interface CSNDICapture : CSCaptureBase <CSCaptureSourceProtocol, NDIVideoOutputDelegateProtocol, NDIAudioOutputDelegateProtocol>
{
NDIlib_v3 *_ndi_dispatch;

View file

@ -103,7 +103,7 @@
return @"NewTek NDI";
}
-(NDIlib_source_t *)current_ndi_sources:(uint32_t *)out_count
-(NDIlib_source_t *)current_ndi_sources:(uint32_t *)out_count asyncBloc:(void (^)(NDIlib_source_t *sources, uint32_t srcCount))asyncBlock
{
NDIlib_find_instance_t finder = NULL;
if (!_ndi_dispatch)
@ -118,6 +118,29 @@
}
uint32_t source_count = 0;
const NDIlib_source_t *sources = _ndi_dispatch->NDIlib_find_get_current_sources(finder, &source_count);
if (!source_count && asyncBlock)
{
NDIlib_v3 *dispatcher = _ndi_dispatch;
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
//This is probably a bad idea...
int tryCnt = 0;
while(tryCnt < 10 && dispatcher)
{
uint32_t retry_src_cnt = 0;
dispatcher->NDIlib_find_wait_for_sources(finder, 100);
const NDIlib_source_t *retrySrcs = dispatcher->NDIlib_find_get_current_sources(finder, &retry_src_cnt);
if (retry_src_cnt > 0)
{
asyncBlock(retrySrcs, retry_src_cnt);
break;
}
tryCnt++;
}
});
sources = _ndi_dispatch->NDIlib_find_get_current_sources(finder, &source_count);
}
*out_count = source_count;
return sources;
}
@ -128,16 +151,8 @@
uint32_t source_count = 0;
const NDIlib_source_t *sources = [self current_ndi_sources:&source_count];
if (!_ndi_dispatch)
{
return nil;
}
const NDIlib_source_t *sources = [self current_ndi_sources:&source_count asyncBloc:nil];
NSMutableArray *ret = [NSMutableArray array];
for (int i=0; i < source_count; i++)
{
NDIlib_source_t ndi_src = sources[i];
@ -236,57 +251,82 @@
-(NDIlib_source_t)find_ndi_source:(NSString *)forName
{
uint32_t source_count = 0;
const NDIlib_source_t *sources = [self current_ndi_sources:&source_count];
for (int i = 0; i < source_count; i++)
if (forName)
{
NDIlib_source_t ndi_src = sources[i];
if (!strncmp(ndi_src.p_ndi_name, forName.UTF8String, strlen(ndi_src.p_ndi_name)))
uint32_t source_count = 0;
const NDIlib_source_t *sources = [self current_ndi_sources:&source_count asyncBloc:nil];
for (int i = 0; i < source_count; i++)
{
return ndi_src;
NDIlib_source_t ndi_src = sources[i];
if (!strncmp(ndi_src.p_ndi_name, forName.UTF8String, strlen(ndi_src.p_ndi_name)))
{
return ndi_src;
}
}
}
return (NDIlib_source_t){0};
}
-(void)setupNdiSource:(CSNDISource *)ndiSource
{
self.captureName = ndiSource.name;
CSNDIReceiver *newRecv = [[CSNDIReceiver alloc] initWithSource:ndiSource];
@synchronized(self)
{
if (!_video_thread)
{
_video_thread = dispatch_queue_create("NDI Video Capture Delegate", DISPATCH_QUEUE_SERIAL);
}
if (!_audio_thread)
{
_audio_thread = dispatch_queue_create("NDI Audio Capture Delegate", DISPATCH_QUEUE_SERIAL);
}
if (_current_receiver)
{
[_current_receiver stopCapture];
}
_current_receiver = newRecv;
[_current_receiver registerVideoDelegate:self withQueue:_video_thread];
[_current_receiver registerAudioDelegate:self withQueue:_audio_thread];
[_current_receiver startCapture];
}
}
-(void)setActiveVideoDevice:(CSAbstractCaptureDevice *)activeVideoDevice
{
[super setActiveVideoDevice:activeVideoDevice];
CSNDISource *ndiSource = activeVideoDevice.captureDevice;
if (ndiSource)
if (activeVideoDevice)
{
self.captureName = ndiSource.name;
CSNDISource *ndiSource = activeVideoDevice.captureDevice;
CSNDIReceiver *newRecv = [[CSNDIReceiver alloc] initWithSource:ndiSource];
@synchronized(self)
if (ndiSource)
{
if (!_video_thread)
{
_video_thread = dispatch_queue_create("NDI Video Capture Delegate", DISPATCH_QUEUE_SERIAL);
}
if (!_audio_thread)
{
_audio_thread = dispatch_queue_create("NDI Audio Capture Delegate", DISPATCH_QUEUE_SERIAL);
}
if (_current_receiver)
{
[_current_receiver stopCapture];
}
_current_receiver = newRecv;
[_current_receiver registerVideoDelegate:self withQueue:_video_thread];
[_current_receiver registerAudioDelegate:self withQueue:_audio_thread];
[_current_receiver startCapture];
dispatch_async(dispatch_get_main_queue(), ^{
[self setupNdiSource:ndiSource];
});
return;
}
} else if (self.savedUniqueID) {
uint32_t srcCnt = 0;
NDIlib_source_t *srcs = [self current_ndi_sources:&srcCnt asyncBloc:^(NDIlib_source_t *sources, uint32_t srcCount) {
NDIlib_source_t ndiSrc = [self find_ndi_source:self.savedUniqueID];
if (ndiSrc.p_ndi_name)
{
CSNDISource *ndiSource = [[CSNDISource alloc] initWithSource:ndiSrc];
dispatch_async(dispatch_get_main_queue(), ^{
[self setDeviceForUniqueID:self.savedUniqueID];
});
}
}];
}
}
@ -319,4 +359,5 @@
}
@end

View file

@ -125,7 +125,6 @@
-(void)dealloc
{
[[NSNotificationCenter defaultCenter] removeObserver:self];
if (_capture_session)
{
@ -353,21 +352,6 @@
}
-(void)frameTick
{
if (self.renderType == kCSRenderOnFrameTick)
{
[self updateLayersWithBlock:^(CALayer *layer) {
[layer setNeedsDisplay];
}];
}
}
-(NSSize)captureSize
{

View file

@ -91,21 +91,6 @@
}
-(void)frameTick
{
if (self.renderType == kCSRenderOnFrameTick)
{
/*
[self updateLayersWithBlock:^(CALayer *layer) {
[((CSIOSurfaceLayer *)layer) setNeedsDisplay];
}];*/
}
}
-(CALayer *)createNewLayer
{

View file

@ -226,7 +226,7 @@
-(void)dealloc
{
[self flush];
//[self flush];
if (_pendingTimer)
{
dispatch_source_cancel(_pendingTimer);

View file

@ -536,7 +536,6 @@
}
*/
start_t = [CaptureController.sharedCaptureController mach_time_seconds];
if (newFrame && self.compressors && self.compressors.count > 0)
{
@ -565,6 +564,8 @@
newData.videoFrame = newFrame;
int used_compressor_count = 0;
start_t = [CaptureController.sharedCaptureController mach_time_seconds];
NSMutableDictionary *useCompressors = self.compressors.copy;
for(id cKey in useCompressors)
@ -573,8 +574,16 @@
id <VideoCompressor> compressor;
compressor = useCompressors[cKey];
CapturedFrameData *newFrameData = newData.copy;
double c_start = [CaptureController.sharedCaptureController mach_time_seconds];
[compressor compressFrame:newFrameData];
double c_end = [CaptureController.sharedCaptureController mach_time_seconds];
double c_elapsed = c_end - c_start;
if (c_elapsed > 1.0f/60.0f)
{
NSLog(@"COMPRESSOR %@ TOOK %f %@", compressor, c_elapsed, self.layout.name);
}
if ([compressor hasOutputs])
{
used_compressor_count++;
@ -582,9 +591,9 @@
}
end_t = [CaptureController.sharedCaptureController mach_time_seconds];
elapsed_t = end_t - start_t;
//if (elapsed_t > 1.0f/60.0f)
if (elapsed_t > 1.0f/60.0f)
{
//NSLog(@"COMPRESSOR STUFF TOOK %f %@", elapsed_t, self.layout.name);
NSLog(@"COMPRESSOR STUFF TOOK %f %@", elapsed_t, self.layout.name);
}
CVPixelBufferRelease(newFrame);

View file

@ -2622,7 +2622,7 @@ NSString *const CSAppearanceSystem = @"CSAppearanceSystem";
}
self.liveLayout.name = @"Live Layout";
self.stagingLayout.name = @"Staging Layout";
[self.liveLayout restoreSourceList:nil];
//[self.liveLayout restoreSourceList:nil];
[self setupMainRecorder];
}

View file

@ -125,7 +125,7 @@ OSStatus VTCompressionSessionCopySupportedPropertyDictionary(VTCompressionSessio
{
VTSessionSetProperty(session, kVTCompressionPropertyKey_AllowFrameReordering, kCFBooleanFalse);
VTSessionSetProperty(session, (__bridge CFStringRef)@"RealTime", kCFBooleanTrue);
VTSessionSetProperty(session, kVTCompressionPropertyKey_RealTime, kCFBooleanTrue);
int real_keyframe_interval = 2;
if (self.keyframe_interval && self.keyframe_interval > 0)

View file

@ -88,7 +88,10 @@ OSStatus VTCompressionSessionCopySupportedPropertyDictionary(VTCompressionSessio
}
-(void) reset
-(void) internal_reset
{
_resetPending = YES;
@ -126,7 +129,7 @@ void PixelBufferRelease( void *releaseRefCon, const void *baseAddress )
}
-(bool)compressFrame:(CapturedFrameData *)frameData
-(bool)real_compressFrame:(CapturedFrameData *)frameData
{
if (_resetPending)
@ -193,9 +196,8 @@ void PixelBufferRelease( void *releaseRefCon, const void *baseAddress )
//CVPixelBufferRelease(imageBuffer);
VTCompressionSessionEncodeFrame(_compression_session, frameData.videoFrame, frameData.videoPTS, frameData.videoDuration, frameProperties, (__bridge_retained void *)(frameData), NULL);
VTCompressionSessionEncodeFrame(_compression_session, frameData.videoFrame, frameData.videoPTS, frameData.videoDuration, frameProperties, (__bridge_retained void *)(frameData), NULL);
if (frameProperties)
{
CFRelease(frameProperties);
@ -207,6 +209,11 @@ void PixelBufferRelease( void *releaseRefCon, const void *baseAddress )
-(bool)needsSetup
{
return !_compression_session;
}
- (bool)setupCompressor:(CapturedFrameData *)videoFrame
{
@ -286,7 +293,6 @@ void VideoCompressorReceiveFrame(void *VTref, void *VTFrameRef, OSStatus status,
//@autoreleasepool {
if(!sampleBuffer)
return;

View file

@ -73,7 +73,7 @@
}
-(bool)compressFrame:(CapturedFrameData *)imageBuffer
-(bool)real_compressFrame:(CapturedFrameData *)imageBuffer
{
if (![self hasOutputs])

View file

@ -16,6 +16,10 @@
@interface CompressorBase : NSObject <VideoCompressor, NSCoding>
{
NSMutableArray *_audioBuffer;
bool _reset_flag;
dispatch_queue_t _consumerThread;
NSMutableArray *_compressQueue;
dispatch_semaphore_t _queueSemaphore;
}
@ -45,6 +49,8 @@
-(int) drainOutputBufferFrame;
-(void) reset;
-(void)reconfigureCompressor;
-(bool)needsSetup;
-(BOOL) setupResolution:(CVImageBufferRef)withFrame;
-(id <CSCompressorViewControllerProtocol>)getConfigurationView;

View file

@ -16,7 +16,9 @@
{
if (self = [super init])
{
_queueSemaphore = dispatch_semaphore_create(0);
_compressQueue = [NSMutableArray array];
_reset_flag = NO;
self.errored = NO;
self.name = [@"" mutableCopy];
@ -78,6 +80,10 @@
-(void)internal_reset
{
[self reset];
}
-(void) reset
{
@ -111,10 +117,123 @@
return self;
}
-(bool)queueFramedata:(CapturedFrameData *)frameData
{
if (!_consumerThread)
{
[self startConsumerThread];
}
@synchronized (self) {
[_compressQueue addObject:frameData];
dispatch_semaphore_signal(_queueSemaphore);
}
return YES;
}
-(void)clearFrameQueue
{
@synchronized (self) {
[_compressQueue removeAllObjects];
}
}
-(CapturedFrameData *)consumeframeData
{
CapturedFrameData *retData = nil;
@synchronized (self) {
if (_compressQueue.count > 0)
{
retData = [_compressQueue objectAtIndex:0];
[_compressQueue removeObjectAtIndex:0];
}
}
return retData;
}
-(void)startConsumerThread
{
if (!_consumerThread)
{
_consumerThread = dispatch_queue_create("Compressor consumer", DISPATCH_QUEUE_SERIAL);
dispatch_async(_consumerThread, ^{
while (1)
{
@autoreleasepool {
@synchronized (self) {
if (self->_reset_flag)
{
[self clearFrameQueue];
[self internal_reset];
}
}
CapturedFrameData *useData = [self consumeframeData];
if (!useData)
{
dispatch_semaphore_wait(self->_queueSemaphore, DISPATCH_TIME_FOREVER);
} else {
[self real_compressFrame:useData];
}
}
}
});
}
}
-(bool) compressFrame:(CapturedFrameData *)imageBuffer
-(bool)compressFrame:(CapturedFrameData *)frameData
{
if (![self hasOutputs])
{
return NO;
}
if ([self needsSetup] && !self.errored)
{
BOOL setupOK;
setupOK = [self setupCompressor:frameData];
if (!setupOK)
{
self.errored = YES;
return NO;
}
}
[self reconfigureCompressor];
/*
if (frameData.videoFrame)
{
CVPixelBufferRetain(frameData.videoFrame);
}*/
[self queueFramedata:frameData];
return YES;
}
-(void)reconfigureCompressor
{
return;
}
-(bool) real_compressFrame:(CapturedFrameData *)imageBuffer
{
return YES;
}

View file

@ -34,10 +34,6 @@
VTPixelTransferSessionRef _vtpt_ref;
double _next_keyframe_time;
int64_t _last_pts;
bool _reset_flag;
dispatch_queue_t _consumerThread;
NSMutableArray *_compressQueue;
dispatch_semaphore_t _queueSemaphore;
CFMutableDictionaryRef _formatExtensions;

View file

@ -159,15 +159,14 @@
-(void) reset
-(bool)needsSetup
{
@synchronized (self) {
_reset_flag = YES;
dispatch_semaphore_signal(_queueSemaphore);
}
return !_av_codec;
}
-(void) internal_reset
{
//_compressor_queue = nil;
@ -176,7 +175,6 @@
_last_pts = 0;
[self clearFrameQueue];
if (_av_codec_ctx)
{
@ -208,113 +206,12 @@
}
-(bool)queueFramedata:(CapturedFrameData *)frameData
{
if (!_consumerThread)
{
[self startConsumerThread];
}
@synchronized (self) {
[_compressQueue addObject:frameData];
dispatch_semaphore_signal(_queueSemaphore);
}
return YES;
}
-(void)clearFrameQueue
{
@synchronized (self) {
[_compressQueue removeAllObjects];
}
}
-(CapturedFrameData *)consumeframeData
{
CapturedFrameData *retData = nil;
@synchronized (self) {
if (_compressQueue.count > 0)
{
retData = [_compressQueue objectAtIndex:0];
[_compressQueue removeObjectAtIndex:0];
}
}
return retData;
}
-(void)startConsumerThread
{
if (!_consumerThread)
{
_consumerThread = dispatch_queue_create("x264 consumer", DISPATCH_QUEUE_SERIAL);
dispatch_async(_consumerThread, ^{
while (1)
{
@autoreleasepool {
@synchronized (self) {
if (self->_reset_flag)
{
[self internal_reset];
}
}
CapturedFrameData *useData = [self consumeframeData];
if (!useData)
{
dispatch_semaphore_wait(self->_queueSemaphore, DISPATCH_TIME_FOREVER);
} else {
[self real_compressFrame:useData];
}
}
}
});
}
}
-(bool)compressFrame:(CapturedFrameData *)frameData
{
if (![self hasOutputs])
{
return NO;
}
if (!_av_codec && !self.errored)
{
BOOL setupOK;
setupOK = [self setupCompressor:frameData];
if (!setupOK)
{
self.errored = YES;
return NO;
}
} else if (!_av_codec) {
return NO;
}
[self reconfigureCompressor];
if (frameData.videoFrame)
{
CVPixelBufferRetain(frameData.videoFrame);
}
[self queueFramedata:frameData];
return YES;
}
- (bool)real_compressFrame:(CapturedFrameData *)frameData
@ -394,7 +291,7 @@
VTPixelTransferSessionTransferImage(self->_vtpt_ref, imageBuffer, converted_frame);
CVPixelBufferRelease(imageBuffer);
//CVPixelBufferRelease(imageBuffer);
imageBuffer = nil;
//poke the frameData so it releases the video buffer

View file

@ -1566,7 +1566,6 @@ static NSArray *_sourceTypes = nil;
[self deregisterVideoInput:vInput];
}
self.layer = nil;
}

View file

@ -116,6 +116,7 @@
-(void)encodeWithCoder:(NSCoder *)aCoder
{
[aCoder encodeObject:self.activeVideoDevice.uniqueID forKey:@"active_uniqueID"];
[aCoder encodeBool:self.allowDedup forKey:@"allowDedup"];
[aCoder encodeBool:self.cachePersistent forKey:@"cachePersistent"];
@ -244,10 +245,7 @@
self.activeVideoDevice = nil;
} else {
self.activeVideoDevice = [currentAvailableDevices objectAtIndex:sidx];
if (self.allowDedup)
{
[[SourceCache sharedCache] cacheSourcePersistent:self];
}
}
}