1:
37:
38:
39: package ;
40:
41: import ;
42: import ;
43: import ;
44: import ;
45: import ;
46: import ;
47: import ;
48: import ;
49: import ;
50: import ;
51: import ;
52: import ;
53:
54: import ;
55: import ;
56: import ;
57: import ;
58: import ;
59: import ;
60: import ;
61: import ;
62: import ;
63: import ;
64: import ;
65: import ;
66: import ;
67: import ;
68: import ;
69: import ;
70: import ;
71:
72: import ;
73: import ;
74: import ;
75: import ;
76: import ;
77: import ;
78: import ;
79: import ;
80: import ;
81: import ;
82: import ;
83: import ;
84: import ;
85: import ;
86: import ;
87: import ;
88: import ;
89: import ;
90: import ;
91:
92:
100: public class OrbFunctional extends OrbRestricted
101: {
102:
107: protected class portServer
108: extends Thread
109: {
110:
113: int running_threads;
114:
115:
118: int s_port;
119:
120:
123: ServerSocket service;
124:
125:
128: boolean terminated;
129:
130:
133: portServer(int _port)
134: {
135: s_port = _port;
136: setDaemon(true);
137: try
138: {
139: service = socketFactory.createServerSocket(s_port);
140: }
141: catch (IOException ex)
142: {
143: BAD_OPERATION bad = new BAD_OPERATION(
144: "Unable to open the server socket at " + s_port);
145: bad.minor = Minor.Socket;
146: bad.initCause(ex);
147: throw bad;
148: }
149: }
150:
151:
155: public void run()
156: {
157: while (running)
158: {
159: try
160: {
161: tick();
162: }
163: catch (SocketException ex)
164: {
165:
166:
167: if (terminated)
168: return;
169: }
170: catch (Exception iex)
171: {
172:
173:
174: try
175: {
176: Thread.sleep(TWAIT_SERVER_ERROR_PAUSE);
177: }
178: catch (InterruptedException ex)
179: {
180: }
181: }
182: }
183: }
184:
185:
190: void tick()
191: throws Exception
192: {
193: serve(this, service);
194: }
195:
196:
199: public void close_now()
200: {
201: try
202: {
203: terminated = true;
204: service.close();
205: }
206: catch (Exception ex)
207: {
208:
209:
210: }
211: }
212:
213:
216: protected void finalize()
217: {
218: close_now();
219: }
220: }
221:
222:
227: protected class sharedPortServer extends portServer
228: {
229:
232: sharedPortServer(int _port)
233: {
234: super(_port);
235: }
236:
237:
242: void tick() throws Exception
243: {
244: Socket request = service.accept();
245: serveStep(request, false);
246: }
247: }
248:
249:
253: public static int DEFAULT_INITIAL_PORT = 1126;
254:
255:
259: public static int RANDOM_PORT_FROM = 1024;
260:
261:
265: public static int RANDOM_PORT_TO = 4024;
266:
267:
270: public static int RANDOM_PORT_ATTEMPTS = 64;
271:
272:
276: public static final String LISTEN_ON = "gnu.classpath.CORBA.ListenOn";
277:
278:
281: public static final String REFERENCE = "org.omg.CORBA.ORBInitRef";
282:
283:
287: public static final String NS_PORT = "org.omg.CORBA.ORBInitialPort";
288:
289:
293: public static final String NS_HOST = "org.omg.CORBA.ORBInitialHost";
294:
295:
298: public static final String NAME_SERVICE = "NameService";
299:
300:
303: public static final String ORB_ID = "org.omg.CORBA.ORBid";
304:
305:
306:
309: public static final String SERVER_ID = "org.omg.CORBA.ServerId";
310:
311:
317: public static String START_READING_MESSAGE =
318: "gnu.classpath.CORBA.TOUT_START_READING_MESSAGE";
319:
320:
324: public static String WHILE_READING =
325: "gnu.classpath.CORBA.TOUT_WHILE_READING";
326:
327:
332: public static String AFTER_RECEIVING =
333: "gnu.classpath.CORBA.TOUT_AFTER_RECEIVING";
334:
335:
339: public static String SERVER_ERROR_PAUSE =
340: "gnu.classpath.CORBA.SERVER_ERROR_PAUSE";
341:
342:
345: public final String LOCAL_HOST;
346:
347:
353: public int TOUT_START_READING_MESSAGE = 20 * 1000;
354:
355:
356:
357:
361: public int TOUT_WHILE_READING = 2 * 60 * 1000;
362:
363:
368: public int TOUT_AFTER_RECEIVING = 40 * 60 * 1000;
369:
370:
374: public int TWAIT_SERVER_ERROR_PAUSE = 5000;
375:
376:
383: public static int TANDEM_REQUESTS = 7000;
384:
385:
388: public String orb_id = "orb_"+hashCode();
389:
390:
394: public static String server_id = "server_"+OrbFunctional.class.hashCode();
395:
396:
399: protected final Connected_objects connected_objects =
400: new Connected_objects();
401:
402:
406: protected Version max_version;
407:
408:
412: protected boolean running;
413:
414:
417: protected Map initial_references = new TreeMap();
418:
419:
422: protected ArrayList portServers = new ArrayList();
423:
424:
427: private String ns_host;
428:
429:
437: private static int Port = DEFAULT_INITIAL_PORT;
438:
439:
442: private int ns_port = 900;
443:
444:
447: NameParser nameParser = new NameParser();
448:
449:
453: protected Asynchron asynchron = new Asynchron();
454:
455:
458: protected LinkedList freed_ports = new LinkedList();
459:
460:
463: protected Hashtable identities = new Hashtable();
464:
465:
470: private int MAX_RUNNING_THREADS = 256;
471:
472:
475: public SocketFactory socketFactory = DefaultSocketFactory.Singleton;
476:
477:
480: public OrbFunctional()
481: {
482: try
483: {
484: LOCAL_HOST = ns_host = InetAddress.getLocalHost().getHostAddress();
485: initial_references.put("CodecFactory", new gnuCodecFactory(this));
486: }
487: catch (UnknownHostException ex)
488: {
489: BAD_OPERATION bad =
490: new BAD_OPERATION("Unable to open the server socket.");
491: bad.initCause(ex);
492: throw bad;
493: }
494: }
495:
496:
502: public void setMaxVersion(Version max_supported)
503: {
504: max_version = max_supported;
505: }
506:
507:
511: public Version getMaxVersion()
512: {
513: return max_version;
514: }
515:
516:
525: public int getFreePort()
526: throws BAD_OPERATION
527: {
528: ServerSocket s;
529: int a_port;
530:
531: try
532: {
533:
534: if (!freed_ports.isEmpty())
535: {
536: Integer free = (Integer) freed_ports.getLast();
537: freed_ports.removeLast();
538: s = socketFactory.createServerSocket(free.intValue());
539: s.close();
540: return free.intValue();
541: }
542: }
543: catch (Exception ex)
544: {
545:
546:
547:
548: }
549:
550: for (a_port = Port; a_port < Port + 20; a_port++)
551: {
552: try
553: {
554: s = socketFactory.createServerSocket(a_port);
555: s.close();
556: Port = a_port + 1;
557: return a_port;
558: }
559: catch (IOException ex)
560: {
561:
562: }
563: }
564:
565: Random rand = new Random();
566:
567: int range = RANDOM_PORT_TO - RANDOM_PORT_FROM;
568: IOException ioex = null;
569: for (int i = 0; i < RANDOM_PORT_ATTEMPTS; i++)
570: {
571: try
572: {
573: a_port = RANDOM_PORT_FROM + rand.nextInt(range);
574: s = socketFactory.createServerSocket(a_port);
575: s.close();
576: return a_port;
577: }
578: catch (IOException ex)
579: {
580:
581: ioex = ex;
582: }
583: }
584:
585: NO_RESOURCES bad = new NO_RESOURCES("Unable to open the server socket.");
586: bad.minor = Minor.Ports;
587: if (ioex != null)
588: bad.initCause(ioex);
589: throw bad;
590: }
591:
592:
601: public static void setPort(int a_Port)
602: {
603: Port = a_Port;
604: }
605:
606:
621: public void connect(org.omg.CORBA.Object object)
622: {
623: int a_port = getFreePort();
624:
625: Connected_objects.cObject ref = connected_objects.add(object, a_port);
626: IOR ior = createIOR(ref);
627: prepareObject(object, ior);
628: if (running)
629: startService(ior);
630: }
631:
632:
649: public void connect(org.omg.CORBA.Object object, byte[] key)
650: {
651: int a_port = getFreePort();
652:
653: Connected_objects.cObject ref =
654: connected_objects.add(key, object, a_port, null);
655: IOR ior = createIOR(ref);
656: prepareObject(object, ior);
657: if (running)
658: startService(ior);
659: }
660:
661:
683: public void connect_1_thread(org.omg.CORBA.Object object, byte[] key,
684: java.lang.Object identity
685: )
686: {
687: sharedPortServer shared = (sharedPortServer) identities.get(identity);
688: if (shared == null)
689: {
690: int a_port = getFreePort();
691: shared = new sharedPortServer(a_port);
692: identities.put(identity, shared);
693: if (running)
694: {
695: portServers.add(shared);
696: shared.start();
697: }
698: }
699:
700: Connected_objects.cObject ref =
701: connected_objects.add(key, object, shared.s_port, identity);
702: IOR ior = createIOR(ref);
703: prepareObject(object, ior);
704: }
705:
706:
711: public void startService(IOR ior)
712: {
713: portServer p = new portServer(ior.Internet.port);
714: portServers.add(p);
715: p.start();
716: }
717:
718:
721: public void destroy()
722: {
723: portServer p;
724: for (int i = 0; i < portServers.size(); i++)
725: {
726: p = (portServer) portServers.get(i);
727: p.close_now();
728: }
729: super.destroy();
730: }
731:
732:
741: public void disconnect(org.omg.CORBA.Object object)
742: {
743: Connected_objects.cObject rmKey = null;
744:
745:
746:
747: if (object instanceof ObjectImpl)
748: {
749: Delegate delegate = ((ObjectImpl) object)._get_delegate();
750: if (delegate instanceof SimpleDelegate)
751: {
752: byte[] key = ((SimpleDelegate) delegate).getIor().key;
753: rmKey = connected_objects.get(key);
754: }
755: }
756:
757:
758:
759: if (rmKey == null)
760: rmKey = connected_objects.getKey(object);
761: if (rmKey != null)
762: {
763:
764: portServer p;
765: StopService:
766: for (int i = 0; i < portServers.size(); i++)
767: {
768: p = (portServer) portServers.get(i);
769: if (p.s_port == rmKey.port && !(p instanceof sharedPortServer))
770: {
771: p.close_now();
772: freed_ports.addFirst(new Integer(rmKey.port));
773: break StopService;
774: }
775: connected_objects.remove(rmKey.key);
776: }
777: }
778: }
779:
780:
788: public void identityDestroyed(java.lang.Object identity)
789: {
790: if (identity == null)
791: return;
792:
793: sharedPortServer ise = (sharedPortServer) identities.get(identity);
794: if (ise != null)
795: {
796: synchronized (connected_objects)
797: {
798: ise.close_now();
799: identities.remove(identity);
800:
801: Connected_objects.cObject obj;
802: Map.Entry m;
803: Iterator iter = connected_objects.entrySet().iterator();
804: while (iter.hasNext())
805: {
806: m = (Map.Entry) iter.next();
807: obj = (Connected_objects.cObject) m.getValue();
808: if (obj.identity == identity)
809: iter.remove();
810: }
811: }
812: }
813: }
814:
815:
823: public org.omg.CORBA.Object find_local_object(IOR ior)
824: {
825:
826: if (!ior.Internet.host.equals(LOCAL_HOST))
827: return null;
828:
829: return find_connected_object(ior.key, ior.Internet.port);
830: }
831:
832:
839: public String[] list_initial_services()
840: {
841: String[] refs = new String[ initial_references.size() ];
842: int p = 0;
843:
844: Iterator iter = initial_references.keySet().iterator();
845: while (iter.hasNext())
846: {
847: refs [ p++ ] = (String) iter.next();
848: }
849: return refs;
850: }
851:
852:
869: public String object_to_string(org.omg.CORBA.Object forObject)
870: {
871:
872: if (forObject instanceof ObjectImpl)
873: {
874: Delegate delegate = ((ObjectImpl) forObject)._get_delegate();
875: if (delegate instanceof SimpleDelegate)
876: return ((SimpleDelegate) delegate).getIor().toStringifiedReference();
877: }
878:
879:
880: Connected_objects.cObject rec = connected_objects.getKey(forObject);
881:
882: if (rec == null)
883: throw new BAD_PARAM("The object " + forObject +
884: " has not been previously connected to this ORB"
885: );
886:
887: IOR ior = createIOR(rec);
888:
889: return ior.toStringifiedReference();
890: }
891:
892:
895: public IOR getLocalIor(org.omg.CORBA.Object forObject)
896: {
897: Connected_objects.cObject rec = connected_objects.getKey(forObject);
898: if (rec == null)
899: return null;
900: else
901: return createIOR(rec);
902: }
903:
904:
913: public org.omg.CORBA.Object resolve_initial_references(String name)
914: throws InvalidName
915: {
916: org.omg.CORBA.Object object = null;
917: try
918: {
919: object = (org.omg.CORBA.Object) initial_references.get(name);
920: if (object == null && name.equals(NAME_SERVICE))
921: {
922: object = getDefaultNameService();
923: if (object != null)
924: initial_references.put(NAME_SERVICE, object);
925: }
926: }
927: catch (Exception ex)
928: {
929: InvalidName err = new InvalidName(name);
930: err.initCause(ex);
931: throw err;
932: }
933: if (object != null)
934: return object;
935: else
936: throw new InvalidName("Not found: '" + name + "'");
937: }
938:
939:
945: public void run()
946: {
947: running = true;
948:
949:
950: Iterator iter = connected_objects.entrySet().iterator();
951: Map.Entry m;
952: Connected_objects.cObject obj;
953:
954: while (iter.hasNext())
955: {
956: m = (Map.Entry) iter.next();
957: obj = (Connected_objects.cObject) m.getValue();
958:
959: portServer subserver;
960:
961: if (obj.identity == null)
962: {
963: subserver = new portServer(obj.port);
964: portServers.add(subserver);
965: }
966: else
967: subserver = (portServer) identities.get(obj.identity);
968:
969: if (!subserver.isAlive())
970: {
971:
972: if (!iter.hasNext())
973: {
974:
975: iter = null;
976: subserver.run();
977: return;
978: }
979: else
980: subserver.start();
981: }
982: }
983: }
984:
985:
991: public void ensureRunning()
992: {
993: final OrbFunctional THIS = this;
994:
995: if (!running)
996: {
997: Thread t = new Thread()
998: {
999: public void run()
1000: {
1001: THIS.run();
1002: }
1003: };
1004: t.setDaemon(true);
1005: t.start();
1006: }
1007: }
1008:
1009:
1015: public void shutdown(boolean wait_for_completion)
1016: {
1017: super.shutdown(wait_for_completion);
1018: running = false;
1019:
1020: if (!wait_for_completion)
1021: {
1022: for (int i = 0; i < portServers.size(); i++)
1023: {
1024: portServer p = (portServer) portServers.get(i);
1025: p.close_now();
1026: }
1027: }
1028: }
1029:
1030:
1041: public org.omg.CORBA.Object string_to_object(String an_ior)
1042: {
1043: return nameParser.corbaloc(an_ior, this);
1044: }
1045:
1046:
1049: public org.omg.CORBA.Object ior_to_object(IOR ior)
1050: {
1051: org.omg.CORBA.Object object = find_local_object(ior);
1052: if (object == null)
1053: {
1054: ObjectImpl impl = StubLocator.search(this, ior);
1055: try
1056: {
1057: if (impl._get_delegate() == null)
1058: impl._set_delegate(new IorDelegate(this, ior));
1059: }
1060: catch (BAD_OPERATION ex)
1061: {
1062:
1063:
1064: impl._set_delegate(new IorDelegate(this, ior));
1065: }
1066:
1067: object = impl;
1068:
1069:
1070: }
1071: return object;
1072: }
1073:
1074:
1078: protected org.omg.CORBA.Object getDefaultNameService()
1079: {
1080: if (initial_references.containsKey(NAME_SERVICE))
1081: return (org.omg.CORBA.Object) initial_references.get(NAME_SERVICE);
1082:
1083: IOR ior = new IOR();
1084: ior.Id = NamingContextExtHelper.id();
1085: ior.Internet.host = ns_host;
1086: ior.Internet.port = ns_port;
1087: ior.key = NamingServiceTransient.getDefaultKey();
1088:
1089: IorObject iorc = new IorObject(this, ior);
1090: NamingContextExt namer = NamingContextExtHelper.narrow(iorc);
1091: initial_references.put(NAME_SERVICE, namer);
1092: return namer;
1093: }
1094:
1095:
1104: protected org.omg.CORBA.Object find_connected_object(byte[] key, int port)
1105: {
1106: Connected_objects.cObject ref = connected_objects.get(key);
1107: if (ref == null)
1108: return null;
1109: if (port >= 0 && ref.port != port)
1110: return null;
1111: else
1112: return ref.object;
1113: }
1114:
1115:
1124: protected void set_parameters(Applet app, Properties props)
1125: {
1126: useProperties(props);
1127:
1128: String[][] para = app.getParameterInfo();
1129: if (para != null)
1130: {
1131: for (int i = 0; i < para.length; i++)
1132: {
1133: if (para[i][0].equals(LISTEN_ON))
1134: Port = Integer.parseInt(para[i][1]);
1135: if (para[i][0].equals(REFERENCE))
1136: {
1137: StringTokenizer st = new StringTokenizer(para[i][1], "=");
1138: initial_references.put(st.nextToken(),
1139: string_to_object(st.nextToken()));
1140: }
1141:
1142: if (para[i][0].equals(ORB_ID))
1143: orb_id = para[i][1];
1144:
1145: if (para[i][0].equals(SERVER_ID))
1146: server_id = para[i][1];
1147:
1148: if (para[i][0].equals(NS_HOST))
1149: ns_host = para[i][1];
1150: if (para[i][0].equals(START_READING_MESSAGE))
1151: TOUT_START_READING_MESSAGE = Integer.parseInt(para[i][1]);
1152: if (para[i][0].equals(WHILE_READING))
1153: TOUT_WHILE_READING = Integer.parseInt(para[i][1]);
1154: if (para[i][0].equals(AFTER_RECEIVING))
1155: TOUT_AFTER_RECEIVING = Integer.parseInt(para[i][1]);
1156: try
1157: {
1158: if (para[i][0].equals(NS_PORT))
1159: ns_port = Integer.parseInt(para[i][1]);
1160: }
1161: catch (NumberFormatException ex)
1162: {
1163: BAD_PARAM bad = new BAD_PARAM("Invalid " + NS_PORT
1164: + "property, unable to parse '" + props.getProperty(NS_PORT)
1165: + "'");
1166: bad.initCause(ex);
1167: throw bad;
1168: }
1169: }
1170: }
1171: }
1172:
1173:
1184: protected void set_parameters(String[] para, Properties props)
1185: {
1186: if (para.length > 1)
1187: {
1188: for (int i = 0; i < para.length - 1; i++)
1189: {
1190: if (para[i].endsWith("ListenOn"))
1191: Port = Integer.parseInt(para[i + 1]);
1192: if (para[i].endsWith("ORBInitRef"))
1193: {
1194: StringTokenizer st = new StringTokenizer(para[i + 1], "=");
1195: initial_references.put(st.nextToken(),
1196: string_to_object(st.nextToken()));
1197: }
1198:
1199: if (para[i].endsWith("ORBInitialHost"))
1200: ns_host = para[i + 1];
1201:
1202: if (para[i].endsWith("ServerId"))
1203: server_id = para[i++];
1204: else if (para[i].endsWith("ORBid"))
1205: orb_id = para[i++];
1206:
1207: try
1208: {
1209: if (para[i].endsWith("ORBInitialPort"))
1210: ns_port = Integer.parseInt(para[i + 1]);
1211: }
1212: catch (NumberFormatException ex)
1213: {
1214: throw new BAD_PARAM("Invalid " + para[i]
1215: + "parameter, unable to parse '"
1216: + props.getProperty(para[i + 1]) + "'");
1217: }
1218: }
1219: }
1220:
1221: useProperties(props);
1222: }
1223:
1224:
1227: protected IOR createIOR(Connected_objects.cObject ref)
1228: throws BAD_OPERATION
1229: {
1230: IOR ior = new IOR();
1231: ior.key = ref.key;
1232: ior.Internet.port = ref.port;
1233:
1234: if (ref.object instanceof ObjectImpl)
1235: {
1236: ObjectImpl imp = (ObjectImpl) ref.object;
1237: if (imp._ids().length > 0)
1238: ior.Id = imp._ids() [ 0 ];
1239: }
1240: if (ior.Id == null)
1241: ior.Id = ref.object.getClass().getName();
1242: try
1243: {
1244: ior.Internet.host = InetAddress.getLocalHost().getHostAddress();
1245: ior.Internet.port = ref.port;
1246: }
1247: catch (UnknownHostException ex)
1248: {
1249: throw new BAD_OPERATION("Cannot resolve the local host address");
1250: }
1251: return ior;
1252: }
1253:
1254:
1262: protected void prepareObject(org.omg.CORBA.Object object, IOR ior)
1263: throws BAD_PARAM
1264: {
1265:
1270:
1271:
1272: if (object instanceof ObjectImpl)
1273: {
1274: ObjectImpl impl = (ObjectImpl) object;
1275: try
1276: {
1277: if (impl._get_delegate() == null)
1278: impl._set_delegate(new SimpleDelegate(this, ior));
1279: }
1280: catch (BAD_OPERATION ex)
1281: {
1282:
1283: impl._set_delegate(new SimpleDelegate(this, ior));
1284: }
1285: }
1286: }
1287:
1288:
1301: private void respond_to_client(OutputStream net_out,
1302: MessageHeader msh_request, RequestHeader rh_request,
1303: ResponseHandlerImpl handler, SystemException sysEx
1304: ) throws IOException
1305: {
1306:
1307: ReplyHeader reply = handler.reply_header;
1308:
1309: if (sysEx != null)
1310: reply.reply_status = ReplyHeader.SYSTEM_EXCEPTION;
1311: else if (handler.isExceptionReply())
1312: reply.reply_status = ReplyHeader.USER_EXCEPTION;
1313: else
1314: reply.reply_status = ReplyHeader.NO_EXCEPTION;
1315: reply.request_id = rh_request.request_id;
1316:
1317: BufferedCdrOutput out =
1318: new BufferedCdrOutput(50 + handler.getBuffer().buffer.size());
1319: out.setOrb(this);
1320:
1321: out.setOffset(msh_request.getHeaderSize());
1322:
1323: reply.write(out);
1324:
1325: if (msh_request.version.since_inclusive(1, 2))
1326: {
1327: out.align(8);
1328:
1329:
1330:
1331: }
1332: handler.getBuffer().buffer.writeTo(out);
1333:
1334: MessageHeader msh_reply = new MessageHeader();
1335:
1336: msh_reply.version = msh_request.version;
1337: msh_reply.message_type = MessageHeader.REPLY;
1338: msh_reply.message_size = out.buffer.size();
1339:
1340:
1341: msh_reply.write(net_out);
1342: out.buffer.writeTo(net_out);
1343: net_out.flush();
1344: }
1345:
1346:
1349: private void forward_request(OutputStream net_out,
1350: MessageHeader msh_request, RequestHeader rh_request, gnuForwardRequest info
1351: ) throws IOException
1352: {
1353: MessageHeader msh_forward = new MessageHeader();
1354: msh_forward.version = msh_request.version;
1355:
1356: ReplyHeader rh_forward = msh_forward.create_reply_header();
1357: msh_forward.message_type = MessageHeader.REPLY;
1358: rh_forward.reply_status = info.forwarding_code;
1359: rh_forward.request_id = rh_request.request_id;
1360:
1361:
1362: BufferedCdrOutput out = new BufferedCdrOutput();
1363: out.setOrb(this);
1364: out.setOffset(msh_forward.getHeaderSize());
1365:
1366: rh_forward.write(out);
1367:
1368: if (msh_forward.version.since_inclusive(1, 2))
1369: out.align(8);
1370: out.write_Object(info.forward_reference);
1371:
1372: msh_forward.message_size = out.buffer.size();
1373:
1374:
1375: msh_forward.write(net_out);
1376: out.buffer.writeTo(net_out);
1377: net_out.flush();
1378: }
1379:
1380:
1392: void serve(final portServer p, ServerSocket serverSocket)
1393: throws MARSHAL, IOException
1394: {
1395: final Socket service;
1396: service = serverSocket.accept();
1397:
1398:
1399: if (p.running_threads >= MAX_RUNNING_THREADS)
1400: {
1401: serveStep(service, true);
1402: return;
1403: }
1404:
1405: new Thread()
1406: {
1407: public void run()
1408: {
1409: try
1410: {
1411: synchronized (p)
1412: {
1413: p.running_threads++;
1414: }
1415: serveStep(service, false);
1416: }
1417: finally
1418: {
1419: synchronized (p)
1420: {
1421: p.running_threads--;
1422: }
1423: }
1424: }
1425: }.start();
1426: }
1427:
1428:
1439: void serveStep(Socket service, boolean no_resources)
1440: {
1441: try
1442: {
1443: Serving: while (true)
1444: {
1445: InputStream in = service.getInputStream();
1446: service.setSoTimeout(TOUT_START_READING_MESSAGE);
1447:
1448: MessageHeader msh_request = new MessageHeader();
1449:
1450: try
1451: {
1452: msh_request.read(in);
1453: }
1454: catch (MARSHAL ex)
1455: {
1456:
1457: return;
1458: }
1459:
1460: if (max_version != null)
1461: {
1462: if (!msh_request.version.until_inclusive(max_version.major,
1463: max_version.minor))
1464: {
1465: OutputStream out = service.getOutputStream();
1466: new ErrorMessage(max_version).write(out);
1467: return;
1468: }
1469: }
1470:
1471: byte[] r = msh_request.readMessage(in, service, TOUT_WHILE_READING,
1472: TOUT_AFTER_RECEIVING);
1473:
1474: if (msh_request.message_type == MessageHeader.REQUEST)
1475: {
1476: RequestHeader rh_request;
1477:
1478: BufferredCdrInput cin = new BufferredCdrInput(r);
1479: cin.setOrb(this);
1480: cin.setVersion(msh_request.version);
1481: cin.setOffset(msh_request.getHeaderSize());
1482: cin.setBigEndian(msh_request.isBigEndian());
1483:
1484: rh_request = msh_request.create_request_header();
1485:
1486:
1487: rh_request.read(cin);
1488:
1489:
1490:
1491: if (msh_request.version.since_inclusive(1, 2))
1492: {
1493: cin.align(8);
1494:
1495:
1496: }
1497:
1498: InvokeHandler target = (InvokeHandler) find_connected_object(
1499: rh_request.object_key, -1);
1500:
1501:
1502:
1503:
1504: ReplyHeader rh_reply = msh_request.create_reply_header();
1505:
1506:
1507: ResponseHandlerImpl handler = new ResponseHandlerImpl(
1508: this, msh_request, rh_reply, rh_request);
1509:
1510: SystemException sysEx = null;
1511:
1512: try
1513: {
1514: if (no_resources)
1515: {
1516: NO_RESOURCES no = new NO_RESOURCES("Too many parallel calls");
1517: no.minor = Minor.Threads;
1518: throw no;
1519: }
1520: if (target == null)
1521: throw new OBJECT_NOT_EXIST();
1522: target._invoke(rh_request.operation, cin, handler);
1523: }
1524: catch (gnuForwardRequest forwarded)
1525: {
1526: OutputStream sou = service.getOutputStream();
1527: forward_request(sou, msh_request, rh_request, forwarded);
1528: if (service != null && !service.isClosed())
1529: {
1530:
1531:
1532: service.setSoTimeout(TANDEM_REQUESTS);
1533: continue Serving;
1534: }
1535: }
1536: catch (UnknownException uex)
1537: {
1538: sysEx = new UNKNOWN("Unknown", 2,
1539: CompletionStatus.COMPLETED_MAYBE);
1540: sysEx.initCause(uex.originalEx);
1541:
1542: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1543:
1544: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1545: rh_reply.service_context, uex.originalEx, ech);
1546:
1547: ObjectCreator.writeSystemException(ech, sysEx);
1548: }
1549: catch (SystemException ex)
1550: {
1551: sysEx = ex;
1552:
1553: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1554:
1555: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1556: rh_reply.service_context, ex, ech);
1557:
1558: ObjectCreator.writeSystemException(ech, ex);
1559: }
1560: catch (Exception except)
1561: {
1562:
1563:
1564:
1565: except.printStackTrace();
1566:
1567: sysEx = new UNKNOWN("Unknown", 2,
1568: CompletionStatus.COMPLETED_MAYBE);
1569: sysEx.initCause(except);
1570:
1571: org.omg.CORBA.portable.OutputStream ech = handler.createExceptionReply();
1572:
1573: rh_reply.service_context = UnknownExceptionCtxHandler.addExceptionContext(
1574: rh_reply.service_context, except, ech);
1575:
1576: ObjectCreator.writeSystemException(ech, sysEx);
1577: }
1578:
1579:
1580: if (rh_request.isResponseExpected())
1581: {
1582: OutputStream sou = service.getOutputStream();
1583: respond_to_client(sou, msh_request, rh_request, handler,
1584: sysEx);
1585: }
1586: }
1587: else if (msh_request.message_type == MessageHeader.CLOSE_CONNECTION
1588: || msh_request.message_type == MessageHeader.MESSAGE_ERROR)
1589: {
1590: CloseMessage.close(service.getOutputStream());
1591: service.close();
1592: return;
1593: }
1594:
1595: if (service != null && !service.isClosed())
1596:
1597:
1598:
1599: service.setSoTimeout(TANDEM_REQUESTS);
1600: else
1601: return;
1602: }
1603: }
1604: catch (SocketException ex)
1605: {
1606:
1607: return;
1608: }
1609: catch (IOException ioex)
1610: {
1611:
1612:
1613: return;
1614: }
1615: finally
1616: {
1617: try
1618: {
1619: if (service!=null && !service.isClosed())
1620: service.close();
1621: }
1622: catch (IOException ioex)
1623: {
1624:
1625: }
1626: }
1627: }
1628:
1629:
1633: protected void useProperties(Properties props)
1634: {
1635: if (props != null)
1636: {
1637: if (props.containsKey(LISTEN_ON))
1638: Port = Integer.parseInt(props.getProperty(LISTEN_ON));
1639: if (props.containsKey(NS_HOST))
1640: ns_host = props.getProperty(NS_HOST);
1641: try
1642: {
1643: if (props.containsKey(NS_PORT))
1644: ns_port = Integer.parseInt(props.getProperty(NS_PORT));
1645: if (props.containsKey(START_READING_MESSAGE))
1646: TOUT_START_READING_MESSAGE =
1647: Integer.parseInt(props.getProperty(START_READING_MESSAGE));
1648: if (props.containsKey(WHILE_READING))
1649: TOUT_WHILE_READING =
1650: Integer.parseInt(props.getProperty(WHILE_READING));
1651: if (props.containsKey(AFTER_RECEIVING))
1652: TOUT_AFTER_RECEIVING =
1653: Integer.parseInt(props.getProperty(AFTER_RECEIVING));
1654: if (props.containsKey(SERVER_ERROR_PAUSE))
1655: TWAIT_SERVER_ERROR_PAUSE =
1656: Integer.parseInt(props.getProperty(SERVER_ERROR_PAUSE));
1657: }
1658: catch (NumberFormatException ex)
1659: {
1660: throw new BAD_PARAM("Invalid " + NS_PORT +
1661: "property, unable to parse '" + props.getProperty(NS_PORT) +
1662: "'"
1663: );
1664: }
1665:
1666: if (props.containsKey(SocketFactory.PROPERTY))
1667: {
1668: String factory = null;
1669: try
1670: {
1671: factory = props.getProperty(SocketFactory.PROPERTY);
1672: if (factory!=null)
1673: socketFactory = (SocketFactory)
1674: ObjectCreator.forName(factory).newInstance();
1675: }
1676: catch (Exception ex)
1677: {
1678: BAD_PARAM p = new BAD_PARAM("Bad socket factory "+factory);
1679: p.initCause(ex);
1680: throw p;
1681: }
1682: }
1683:
1684: if (props.containsKey(ORB_ID))
1685: orb_id = props.getProperty(ORB_ID);
1686:
1687: if (props.containsKey(SERVER_ID))
1688: server_id = props.getProperty(SERVER_ID);
1689:
1690: Enumeration en = props.elements();
1691: while (en.hasMoreElements())
1692: {
1693: String item = (String) en.nextElement();
1694: if (item.equals(REFERENCE))
1695: initial_references.put(item,
1696: string_to_object(props.getProperty(item))
1697: );
1698: }
1699: }
1700: }
1701:
1702:
1720: public Request get_next_response() throws org.omg.CORBA.WrongTransaction
1721: {
1722: return asynchron.get_next_response();
1723: }
1724:
1725:
1732: public boolean poll_next_response()
1733: {
1734: return asynchron.poll_next_response();
1735: }
1736:
1737:
1751: public void send_multiple_requests_deferred(Request[] requests)
1752: {
1753: asynchron.send_multiple_requests_deferred(requests);
1754: }
1755:
1756:
1765: public void send_multiple_requests_oneway(Request[] requests)
1766: {
1767: asynchron.send_multiple_requests_oneway(requests);
1768: }
1769:
1770:
1773: protected void finalize() throws java.lang.Throwable
1774: {
1775: running = false;
1776: super.finalize();
1777: }