#include #include #include #include #include "SCPatchPrivate.h" #include "SCPatchMessenger.h" //------------------------------------------------------------------------------------------------------------- SCPatchMessenger::SCPatchMessenger(void) { mThreaded = false; mMessagePort = NULL; mMessageSource = NULL; pthread_mutex_init(&mMessageQueueMutex, NULL); mMessageRetryInterval = DEFAULT_MESSAGE_RETRY_INTERVAL; mMessageRetryLimit = DEFAULT_MESSAGE_RETRY_LIMIT; mMessageQueueDict = CFDictionaryCreateMutable(NULL, 0, &kCFTypeDictionaryKeyCallBacks, NULL); mMessageThreadDict = CFDictionaryCreateMutable(NULL, 0, NULL, &kCFTypeDictionaryValueCallBacks); } //------------------------------------------------------------------------------------------------------------- SCPatchMessenger::~SCPatchMessenger(void) { CFRelease(mMessageQueueDict); CFRelease(mMessageThreadDict); } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::StartListening(CFStringRef bundleIdentifier, Boolean isPatch, Boolean threaded) { OSErr err = noErr; CFStringRef messagePortName; CFMessagePortContext messageContext = { 0 }; Boolean messageShouldFree; EventLoopRef mainLoop; mThreaded = threaded; // Create a port to receive messages messageContext.info = this; if(isPatch) messagePortName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, getpid()); else messagePortName = CFStringCreateCopy(NULL, bundleIdentifier); if(messagePortName == NULL) return memFullErr; if(messagePortName) mMessagePort = CFMessagePortCreateLocal(NULL, messagePortName, ReceiveCallback, &messageContext, &messageShouldFree); CFRelease(messagePortName); if(mMessagePort == NULL) return destPortErr; // Set up to receive messages from the controller if((mMessageSource = CFMessagePortCreateRunLoopSource(NULL, mMessagePort, 0)) != NULL && (mainLoop = GetMainEventLoop()) != NULL) { OSErr err; AEDesc desc; OSType sig = 'SCPM'; CFRunLoopAddSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes); // Tell the patch code that it has officially been started. err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStart, typeApplSignature, &sig, sizeof(sig), kAutoGenerateReturnID, kAnyTransactionID, &desc, NULL, ""); ReceiveMessage(&desc); } else { CFRelease(mMessagePort); mMessagePort = NULL; } return err; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::StopListening(void) { EventLoopRef mainLoop; OSErr err; AEDesc desc; OSType sig = 'SCPM'; if(mMessageSource && (mainLoop = GetMainEventLoop()) != NULL) { // Tell the patch code that it's about to be stopped. err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStop, typeApplSignature, &sig, sizeof(sig), kAutoGenerateReturnID, kAnyTransactionID, &desc, NULL, ""); ReceiveMessage(&desc); CFRunLoopRemoveSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes); CFRelease(mMessageSource); mMessageSource = NULL; } if(mMessagePort) { CFRelease(mMessagePort); mMessagePort = NULL; } return noErr; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::SendPing(CFStringRef bundleIdentifier, ProcessSerialNumber *psn) { OSErr err; AEDesc desc; OSType sig = 'SCPM'; err = AEBuildAppleEvent(kSCMessageClass, kSCPing, typeApplSignature, &sig, sizeof(sig), kAutoGenerateReturnID, kAnyTransactionID, &desc, NULL, ""); if(err == noErr) err = SendMessage(bundleIdentifier, psn, &desc, false); return err; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier, ProcessSerialNumber *psn, AEEventClass evtClass, AEEventID evtID, const char *paramsFmt, ...) { OSErr err; AEDesc desc; va_list args; OSType sig = 'SCPM'; va_start(args, paramsFmt); // fprintf(stderr, "Sending message %.4s:%.4s\n", &evtClass, &evtID); err = vAEBuildAppleEvent(evtClass, evtID, typeApplSignature, &sig, sizeof(sig), kAutoGenerateReturnID, kAnyTransactionID, &desc, NULL, paramsFmt, args); va_end(args); if(err == noErr) err = SendMessage(bundleIdentifier, psn, &desc, mThreaded); return err; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier, ProcessSerialNumber *psn, const AppleEvent *theAE, Boolean threaded) { OSErr err; pid_t pid = 0; AEMessageInfo *info; if(psn && (err = GetProcessPID(psn, &pid)) != noErr) return err; // If we don't succeed or get an error, there's not enough memory err = memFullErr; if((info = (AEMessageInfo *)NewPtrClear(sizeof(AEMessageInfo))) != NULL && (info->descSize = AEGetDescDataSize(theAE)) > 0 && (info->descData = NewPtr(info->descSize)) != NULL && AEGetDescData(theAE, info->descData, info->descSize) == noErr) { if(pid) info->portName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, pid); else info->portName = CFStringCreateCopy(NULL, bundleIdentifier); if(info->portName) { info->threaded = threaded; if(!threaded) { err = SendMessage(info); DisposePtr(info->descData); DisposePtr((Ptr)info); } else if((err = QueueMessage(info)) != noErr) { err = queueFull; } } } return err; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::SendMessage(AEMessageInfo *info) { SInt32 result = kCFMessagePortTransportError; OSErr err = noErr; CFDataRef dataRef; CFMessagePortRef portRef; if((dataRef = CFDataCreate(NULL, (unsigned char*)info->descData, info->descSize)) != NULL) { if((portRef = CFMessagePortCreateRemote(NULL, info->portName)) != NULL) { result = CFMessagePortSendRequest(portRef, kSCMessageClass, dataRef, info->threaded ? 0.01 : 5, 0, NULL, NULL); CFRelease(portRef); } else { err = destPortErr; } CFRelease(dataRef); } else { err = memFullErr; } // If we're not going to retry, if we've retried too many times, if the message was sent successfully, // or there was some unrecoverable error like destPortErr (no one's listening) give up and let the // error pass to the thread loop so it can clean up. if(!info->threaded || info->retryCount >= mMessageRetryLimit || (err == noErr && result == kCFMessagePortSuccess) || err == destPortErr || err == memFullErr) { if(err == noErr && result != kCFMessagePortSuccess) err = (result == kCFMessagePortSendTimeout ? noResponseErr : networkErr); #if 0 if(err != noErr || result != kCFMessagePortSuccess) syslog(LOG_ALERT, "%d: Message to %s failed (err = %d result = %d)\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), err, result); else syslog(LOG_ALERT, "%d: Message sent to %s\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman)); #endif } else if(info->threaded) // drop the sending thread's priority and hang around for mMessageRetryInterval { int policy; struct sched_param param; struct timeval startTime, endTime; info->retryCount++; pthread_getschedparam(pthread_self(), &policy, ¶m); param.sched_priority = sched_get_priority_min(policy); pthread_setschedparam(pthread_self(), policy, ¶m); gettimeofday(&startTime, NULL); for(;;) { gettimeofday(&endTime, NULL); if((endTime.tv_sec - startTime.tv_sec) * 1000000L + (endTime.tv_usec - startTime.tv_usec) > mMessageRetryInterval * 1000000) { // syslog(LOG_ALERT, "%d: waiting to resend on %s (%d)\n", // getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), info->retryCount); break; } } err = channelBusy; } return err; } //------------------------------------------------------------------------------------------------------------- void *SCPatchMessenger::SendThreadEntryPoint(void *infoPtr) { SCPatchMessenger *self = (SCPatchMessenger *)infoPtr; AEMessageInfo *info; OSStatus err; while((info = self->GetNextMessage(pthread_self())) != NULL) { // This will block for mMessageRetryInterval if the send fails err = self->SendMessage(info); // If we've retried too many times, if the message was sent successfully, or if there // was some unrecoverable error like destPortErr (no one's listening) give up. if(err == noErr || err == destPortErr || err == memFullErr || err == noResponseErr) { if((info = self->DequeueMessage(pthread_self())) != NULL) { CFRelease(info->portName); DisposePtr(info->descData); DisposePtr((Ptr)info); } } } return (void *)err; } //------------------------------------------------------------------------------------------------------------- OSErr SCPatchMessenger::QueueMessage(AEMessageInfo *info) { CFMutableArrayRef queue; OSErr result = noErr; pthread_mutex_lock(&mMessageQueueMutex); if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, info->portName)) == NULL) { if((queue = CFArrayCreateMutable(NULL, 0, NULL)) != NULL) { pthread_t messageThread; CFDictionaryAddValue(mMessageQueueDict, info->portName, queue); pthread_create(&messageThread, NULL, SendThreadEntryPoint, (void *)this); pthread_detach(messageThread); CFDictionaryAddValue(mMessageThreadDict, messageThread, info->portName); } } if(queue) CFArrayAppendValue(queue, info); else result = queueFull; pthread_mutex_unlock(&mMessageQueueMutex); return result; } //------------------------------------------------------------------------------------------------------------- AEMessageInfo *SCPatchMessenger::GetNextMessage(pthread_t threadID) { CFMutableArrayRef queue; CFStringRef queueName; AEMessageInfo *result = NULL; pthread_mutex_lock(&mMessageQueueMutex); if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL) if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL) if(CFArrayGetCount(queue) > 0) result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0); pthread_mutex_unlock(&mMessageQueueMutex); return result; } //------------------------------------------------------------------------------------------------------------- AEMessageInfo *SCPatchMessenger::DequeueMessage(pthread_t threadID) { CFMutableArrayRef queue; CFStringRef queueName; Boolean stopThread = false; AEMessageInfo *result = NULL; pthread_mutex_lock(&mMessageQueueMutex); if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL && (queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL) { if(CFArrayGetCount(queue) > 0) { result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0); CFArrayRemoveValueAtIndex(queue, 0); } if(CFArrayGetCount(queue) == 0) { CFDictionaryRemoveValue(mMessageThreadDict, threadID); CFDictionaryRemoveValue(mMessageQueueDict, queueName); stopThread = true; } } pthread_mutex_unlock(&mMessageQueueMutex); if(stopThread) pthread_cancel(threadID); return result; } //------------------------------------------------------------------------------------------------------------- CFDataRef SCPatchMessenger::ReceiveCallback(CFMessagePortRef local, SInt32 message, CFDataRef dataRef, void *info) { #pragma unused(local) SCPatchMessenger *self = (SCPatchMessenger *)info; Ptr descData = NULL; AEDesc desc; long length; // fprintf(stderr, "%d: Receiving started\n", getpid()); if(message == kSCMessageClass) { if((length = CFDataGetLength(dataRef)) > 0 && (descData = NewPtr(length)) != NULL) { CFDataGetBytes(dataRef, CFRangeMake(0, length), (unsigned char *)descData); if(AECreateDesc(typeAppleEvent, descData, length, &desc) == noErr) { // fprintf(stderr, "%d: handling message\n", getpid()); if (self->HandleMessage(&desc) != noErr) self->ReceiveMessage(&desc); // fprintf(stderr, "%d: done handling message\n", getpid()); AEDisposeDesc(&desc); } else { fprintf(stderr, "%d: Error %d building AppleEvent", getpid(), memFullErr); } } } if(descData) DisposePtr(descData); // fprintf(stderr, "%d: Receiving finished\n", getpid()); return NULL; }