source: trunk/Cocoa/F-Script Anywhere/Source/SCPatch/Common/SCPatchMessenger.cp@ 579

Last change on this file since 579 was 407, checked in by Nicholas Riley, 17 years ago

Fix a compiler warning.

File size: 12.8 KB
Line 
1#include <Carbon/Carbon.h>
2#include <sys/time.h>
3#include <syslog.h>
4#include <unistd.h>
5#include "SCPatchPrivate.h"
6#include "SCPatchMessenger.h"
7
8//-------------------------------------------------------------------------------------------------------------
9SCPatchMessenger::SCPatchMessenger(void)
10{
11 mThreaded = false;
12 mMessagePort = NULL;
13 mMessageSource = NULL;
14 pthread_mutex_init(&mMessageQueueMutex, NULL);
15 mMessageRetryInterval = DEFAULT_MESSAGE_RETRY_INTERVAL;
16 mMessageRetryLimit = DEFAULT_MESSAGE_RETRY_LIMIT;
17
18 mMessageQueueDict = CFDictionaryCreateMutable(NULL, 0, &kCFTypeDictionaryKeyCallBacks, NULL);
19 mMessageThreadDict = CFDictionaryCreateMutable(NULL, 0, NULL, &kCFTypeDictionaryValueCallBacks);
20}
21
22//-------------------------------------------------------------------------------------------------------------
23SCPatchMessenger::~SCPatchMessenger(void)
24{
25 CFRelease(mMessageQueueDict);
26 CFRelease(mMessageThreadDict);
27}
28
29//-------------------------------------------------------------------------------------------------------------
30OSErr SCPatchMessenger::StartListening(CFStringRef bundleIdentifier,
31 Boolean isPatch, Boolean threaded)
32{
33 OSErr err = noErr;
34 CFStringRef messagePortName;
35 CFMessagePortContext messageContext = { 0 };
36 Boolean messageShouldFree;
37 EventLoopRef mainLoop;
38
39 mThreaded = threaded;
40
41 // Create a port to receive messages
42 messageContext.info = this;
43
44 if(isPatch)
45 messagePortName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, getpid());
46 else
47 messagePortName = CFStringCreateCopy(NULL, bundleIdentifier);
48
49 if(messagePortName == NULL)
50 return memFullErr;
51
52 if(messagePortName)
53 mMessagePort = CFMessagePortCreateLocal(NULL, messagePortName, ReceiveCallback, &messageContext, &messageShouldFree);
54
55 CFRelease(messagePortName);
56
57 if(mMessagePort == NULL)
58 return destPortErr;
59
60 // Set up to receive messages from the controller
61 if((mMessageSource = CFMessagePortCreateRunLoopSource(NULL, mMessagePort, 0)) != NULL &&
62 (mainLoop = GetMainEventLoop()) != NULL)
63 {
64 OSErr err;
65 AEDesc desc;
66 OSType sig = 'SCPM';
67
68 CFRunLoopAddSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
69
70 // Tell the patch code that it has officially been started.
71 err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStart,
72 typeApplSignature, &sig, sizeof(sig),
73 kAutoGenerateReturnID, kAnyTransactionID,
74 &desc, NULL, "");
75 ReceiveMessage(&desc);
76 }
77 else
78 {
79 CFRelease(mMessagePort);
80 mMessagePort = NULL;
81 }
82
83 return err;
84}
85
86//-------------------------------------------------------------------------------------------------------------
87OSErr SCPatchMessenger::StopListening(void)
88{
89 EventLoopRef mainLoop;
90 OSErr err;
91 AEDesc desc;
92 OSType sig = 'SCPM';
93
94 if(mMessageSource && (mainLoop = GetMainEventLoop()) != NULL)
95 {
96 // Tell the patch code that it's about to be stopped.
97 err = AEBuildAppleEvent(kEventClassSCPatchMessenger, kEventSCPatchStop,
98 typeApplSignature, &sig, sizeof(sig),
99 kAutoGenerateReturnID, kAnyTransactionID,
100 &desc, NULL, "");
101 ReceiveMessage(&desc);
102
103 CFRunLoopRemoveSource((CFRunLoopRef)GetCFRunLoopFromEventLoop(mainLoop), mMessageSource, kCFRunLoopCommonModes);
104 CFRelease(mMessageSource);
105 mMessageSource = NULL;
106 }
107
108 if(mMessagePort)
109 {
110 CFRelease(mMessagePort);
111 mMessagePort = NULL;
112 }
113 return noErr;
114}
115
116//-------------------------------------------------------------------------------------------------------------
117OSErr SCPatchMessenger::SendPing(CFStringRef bundleIdentifier, ProcessSerialNumber *psn)
118{
119 OSErr err;
120 AEDesc desc;
121 OSType sig = 'SCPM';
122
123 err = AEBuildAppleEvent(kSCMessageClass, kSCPing,
124 typeApplSignature, &sig, sizeof(sig),
125 kAutoGenerateReturnID, kAnyTransactionID,
126 &desc, NULL, "");
127
128 if(err == noErr)
129 err = SendMessage(bundleIdentifier, psn, &desc, false);
130
131 return err;
132}
133
134//-------------------------------------------------------------------------------------------------------------
135OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier,
136 ProcessSerialNumber *psn,
137 AEEventClass evtClass, AEEventID evtID,
138 const char *paramsFmt, ...)
139{
140 OSErr err;
141 AEDesc desc;
142 va_list args;
143 OSType sig = 'SCPM';
144
145 va_start(args, paramsFmt);
146 // fprintf(stderr, "Sending message %.4s:%.4s\n", &evtClass, &evtID);
147 err = vAEBuildAppleEvent(evtClass, evtID,
148 typeApplSignature, &sig, sizeof(sig),
149 kAutoGenerateReturnID, kAnyTransactionID,
150 &desc, NULL, paramsFmt, args);
151 va_end(args);
152
153 if(err == noErr)
154 err = SendMessage(bundleIdentifier, psn, &desc, mThreaded);
155
156 return err;
157}
158
159
160//-------------------------------------------------------------------------------------------------------------
161OSErr SCPatchMessenger::SendMessage(CFStringRef bundleIdentifier,
162 ProcessSerialNumber *psn,
163 const AppleEvent *theAE,
164 Boolean threaded)
165{
166 OSErr err;
167 pid_t pid = 0;
168 AEMessageInfo *info;
169
170 if(psn && (err = GetProcessPID(psn, &pid)) != noErr)
171 return err;
172
173 // If we don't succeed or get an error, there's not enough memory
174 err = memFullErr;
175
176 if((info = (AEMessageInfo *)NewPtrClear(sizeof(AEMessageInfo))) != NULL &&
177 (info->descSize = AEGetDescDataSize(theAE)) > 0 &&
178 (info->descData = NewPtr(info->descSize)) != NULL &&
179 AEGetDescData(theAE, info->descData, info->descSize) == noErr)
180 {
181 if(pid)
182 info->portName = CFStringCreateWithFormat(NULL, NULL, CFSTR("%@.%ld"), bundleIdentifier, pid);
183 else
184 info->portName = CFStringCreateCopy(NULL, bundleIdentifier);
185
186 if(info->portName)
187 {
188 info->threaded = threaded;
189
190 if(!threaded)
191 {
192 err = SendMessage(info);
193 DisposePtr(info->descData);
194 DisposePtr((Ptr)info);
195 }
196 else if((err = QueueMessage(info)) != noErr)
197 {
198 err = queueFull;
199 }
200
201 }
202 }
203
204 return err;
205}
206
207//-------------------------------------------------------------------------------------------------------------
208OSErr SCPatchMessenger::SendMessage(AEMessageInfo *info)
209{
210 SInt32 result = kCFMessagePortTransportError;
211 OSErr err = noErr;
212 CFDataRef dataRef;
213 CFMessagePortRef portRef;
214
215 if((dataRef = CFDataCreate(NULL, (unsigned char*)info->descData, info->descSize)) != NULL)
216 {
217 if((portRef = CFMessagePortCreateRemote(NULL, info->portName)) != NULL)
218 {
219 result = CFMessagePortSendRequest(portRef, kSCMessageClass, dataRef, info->threaded ? 0.01 : 5, 0, NULL, NULL);
220 CFRelease(portRef);
221 }
222 else
223 {
224 err = destPortErr;
225 }
226 CFRelease(dataRef);
227 }
228 else
229 {
230 err = memFullErr;
231 }
232
233 // If we're not going to retry, if we've retried too many times, if the message was sent successfully,
234 // or there was some unrecoverable error like destPortErr (no one's listening) give up and let the
235 // error pass to the thread loop so it can clean up.
236 if(!info->threaded || info->retryCount >= mMessageRetryLimit ||
237 (err == noErr && result == kCFMessagePortSuccess) ||
238 err == destPortErr || err == memFullErr)
239 {
240 if(err == noErr && result != kCFMessagePortSuccess)
241 err = (result == kCFMessagePortSendTimeout ? noResponseErr : networkErr);
242#if 0
243 if(err != noErr || result != kCFMessagePortSuccess)
244 syslog(LOG_ALERT, "%d: Message to %s failed (err = %d result = %d)\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), err, result);
245 else
246 syslog(LOG_ALERT, "%d: Message sent to %s\n", getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman));
247
248#endif
249 }
250 else if(info->threaded) // drop the sending thread's priority and hang around for mMessageRetryInterval
251 {
252 int policy;
253 struct sched_param param;
254 struct timeval startTime, endTime;
255
256 info->retryCount++;
257 pthread_getschedparam(pthread_self(), &policy, &param);
258 param.sched_priority = sched_get_priority_min(policy);
259 pthread_setschedparam(pthread_self(), policy, &param);
260 gettimeofday(&startTime, NULL);
261 for(;;)
262 {
263 gettimeofday(&endTime, NULL);
264 if((endTime.tv_sec - startTime.tv_sec) * 1000000L + (endTime.tv_usec - startTime.tv_usec) >
265 mMessageRetryInterval * 1000000)
266 {
267 // syslog(LOG_ALERT, "%d: waiting to resend on %s (%d)\n",
268 // getpid(), CFStringGetCStringPtr(info->portName, kCFStringEncodingMacRoman), info->retryCount);
269 break;
270 }
271 }
272
273 err = channelBusy;
274 }
275
276 return err;
277}
278
279//-------------------------------------------------------------------------------------------------------------
280void *SCPatchMessenger::SendThreadEntryPoint(void *infoPtr)
281{
282 SCPatchMessenger *self = (SCPatchMessenger *)infoPtr;
283 AEMessageInfo *info;
284 OSStatus err = noErr;
285
286 while((info = self->GetNextMessage(pthread_self())) != NULL)
287 {
288 // This will block for mMessageRetryInterval if the send fails
289 err = self->SendMessage(info);
290
291 // If we've retried too many times, if the message was sent successfully, or if there
292 // was some unrecoverable error like destPortErr (no one's listening) give up.
293 if(err == noErr || err == destPortErr || err == memFullErr || err == noResponseErr)
294 {
295 if((info = self->DequeueMessage(pthread_self())) != NULL)
296 {
297 CFRelease(info->portName);
298 DisposePtr(info->descData);
299 DisposePtr((Ptr)info);
300 }
301 }
302 }
303
304 return (void *)err;
305}
306
307//-------------------------------------------------------------------------------------------------------------
308OSErr SCPatchMessenger::QueueMessage(AEMessageInfo *info)
309{
310 CFMutableArrayRef queue;
311 OSErr result = noErr;
312
313 pthread_mutex_lock(&mMessageQueueMutex);
314
315 if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, info->portName)) == NULL)
316 {
317 if((queue = CFArrayCreateMutable(NULL, 0, NULL)) != NULL)
318 {
319 pthread_t messageThread;
320
321 CFDictionaryAddValue(mMessageQueueDict, info->portName, queue);
322 pthread_create(&messageThread, NULL, SendThreadEntryPoint, (void *)this);
323 pthread_detach(messageThread);
324 CFDictionaryAddValue(mMessageThreadDict, messageThread, info->portName);
325 }
326 }
327
328 if(queue)
329 CFArrayAppendValue(queue, info);
330 else
331 result = queueFull;
332
333 pthread_mutex_unlock(&mMessageQueueMutex);
334 return result;
335}
336
337//-------------------------------------------------------------------------------------------------------------
338AEMessageInfo *SCPatchMessenger::GetNextMessage(pthread_t threadID)
339{
340 CFMutableArrayRef queue;
341 CFStringRef queueName;
342 AEMessageInfo *result = NULL;
343
344 pthread_mutex_lock(&mMessageQueueMutex);
345
346 if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL)
347 if((queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
348 if(CFArrayGetCount(queue) > 0)
349 result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
350
351 pthread_mutex_unlock(&mMessageQueueMutex);
352
353 return result;
354}
355
356//-------------------------------------------------------------------------------------------------------------
357AEMessageInfo *SCPatchMessenger::DequeueMessage(pthread_t threadID)
358{
359 CFMutableArrayRef queue;
360 CFStringRef queueName;
361 Boolean stopThread = false;
362 AEMessageInfo *result = NULL;
363
364 pthread_mutex_lock(&mMessageQueueMutex);
365
366 if((queueName = (CFStringRef)CFDictionaryGetValue(mMessageThreadDict, threadID)) != NULL &&
367 (queue = (CFMutableArrayRef)CFDictionaryGetValue(mMessageQueueDict, queueName)) != NULL)
368 {
369 if(CFArrayGetCount(queue) > 0)
370 {
371 result = (AEMessageInfo *)CFArrayGetValueAtIndex(queue, 0);
372 CFArrayRemoveValueAtIndex(queue, 0);
373 }
374 if(CFArrayGetCount(queue) == 0)
375 {
376 CFDictionaryRemoveValue(mMessageThreadDict, threadID);
377 CFDictionaryRemoveValue(mMessageQueueDict, queueName);
378 stopThread = true;
379 }
380 }
381
382 pthread_mutex_unlock(&mMessageQueueMutex);
383
384 if(stopThread)
385 pthread_cancel(threadID);
386
387 return result;
388}
389
390//-------------------------------------------------------------------------------------------------------------
391CFDataRef SCPatchMessenger::ReceiveCallback(CFMessagePortRef local,
392 SInt32 message,
393 CFDataRef dataRef,
394 void *info)
395{
396 #pragma unused(local)
397
398 SCPatchMessenger *self = (SCPatchMessenger *)info;
399 Ptr descData = NULL;
400 AEDesc desc;
401 long length;
402
403 // fprintf(stderr, "%d: Receiving started\n", getpid());
404 if(message == kSCMessageClass)
405 {
406 if((length = CFDataGetLength(dataRef)) > 0 && (descData = NewPtr(length)) != NULL)
407 {
408 CFDataGetBytes(dataRef, CFRangeMake(0, length), (unsigned char *)descData);
409
410 if(AECreateDesc(typeAppleEvent, descData, length, &desc) == noErr)
411 {
412 // fprintf(stderr, "%d: handling message\n", getpid());
413 if (self->HandleMessage(&desc) != noErr)
414 self->ReceiveMessage(&desc);
415 // fprintf(stderr, "%d: done handling message\n", getpid());
416 AEDisposeDesc(&desc);
417 }
418 else
419 {
420 fprintf(stderr, "%d: Error %d building AppleEvent", getpid(), memFullErr);
421 }
422 }
423 }
424
425 if(descData)
426 DisposePtr(descData);
427
428 // fprintf(stderr, "%d: Receiving finished\n", getpid());
429 return NULL;
430}
431
Note: See TracBrowser for help on using the repository browser.