home *** CD-ROM | disk | FTP | other *** search
/ Source Code 1994 March / Source_Code_CD-ROM_Walnut_Creek_March_1994.iso / compsrcs / unix / volume26 / mcast / part04 < prev    next >
Encoding:
Text File  |  1993-04-05  |  70.4 KB  |  2,068 lines

  1. Newsgroups: comp.sources.unix
  2. From: casey@gauss.llnl.gov (Casey Leedom)
  3. Subject: v26i108: mcast - LLNL IP multicast implementation, V1.2.3, Part04/04
  4. Sender: unix-sources-moderator@vix.com
  5. Approved: paul@vix.com
  6.  
  7. Submitted-By: casey@gauss.llnl.gov (Casey Leedom)
  8. Posting-Number: Volume 26, Issue 108
  9. Archive-Name: mcast/part04
  10.  
  11. #! /bin/sh
  12. # This is a shell archive.  Remove anything before this line, then unpack
  13. # it by saving it into a file and typing "sh file".  To overwrite existing
  14. # files, type "sh file -c".  You can also feed this as standard input via
  15. # unshar, or by typing "sh <file", e.g..  If this archive is complete, you
  16. # will see the following message at the end:
  17. #        "End of archive 4 (of 4)."
  18. # Contents:  doc/notes.me mcastd/mcastd.c
  19. # Wrapped by vixie@gw.home.vix.com on Tue Apr  6 12:49:52 1993
  20. PATH=/bin:/usr/bin:/usr/ucb ; export PATH
  21. if test -f 'doc/notes.me' -a "${1}" != "-c" ; then 
  22.   echo shar: Will not clobber existing file \"'doc/notes.me'\"
  23. else
  24. echo shar: Extracting \"'doc/notes.me'\" \(19495 characters\)
  25. sed "s/^X//" >'doc/notes.me' <<'END_OF_FILE'
  26. X.\" Copyright (c) 1992 The Regents of the University of California.
  27. X.\" All rights reserved.
  28. X.\"
  29. X.\" Redistribution and use in source and binary forms, with or without
  30. X.\" modification, are permitted provided that the following conditions
  31. X.\" are met:
  32. X.\" 1. Redistributions of source code must retain the above copyright
  33. X.\"    notice, this list of conditions and the following disclaimer.
  34. X.\" 2. Redistributions in binary form must reproduce the above copyright
  35. X.\"    notice, this list of conditions and the following disclaimer in the
  36. X.\"    documentation and/or other materials provided with the distribution.
  37. X.\" 3. All advertising materials mentioning features or use of this software
  38. X.\"    must display the following acknowledgement:
  39. X.\"    This product includes software developed by the University of
  40. X.\"    California, Lawrence Livermore National Laboratory and its
  41. X.\"    contributors.
  42. X.\" 4. Neither the name of the University nor the names of its contributors
  43. X.\"    may be used to endorse or promote products derived from this software
  44. X.\"    without specific prior written permission.
  45. X.\"
  46. X.\" THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
  47. X.\" ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  48. X.\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  49. X.\" ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  50. X.\" FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  51. X.\" DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  52. X.\" OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  53. X.\" HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  54. X.\" LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  55. X.\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  56. X.\" SUCH DAMAGE.
  57. X.\"
  58. X.po 1.0in
  59. X.ll 6.5in
  60. X.nr pi 0n
  61. X.de Hd
  62. X.\" .ds Vr \\$3
  63. X.ds Vr 1.2.3
  64. X.ds Dt \\$4
  65. X..
  66. X.Hd $Header: /u0/casey/src/mcast/doc/RCS/notes.me,v 1.2 93/04/06 11:27:24 casey Exp $
  67. X.of 'LLNL MCAST version \*(Vr''\*(Dt'
  68. X.ef '\*(Dt''LLNL MCAST version \*(Vr'
  69. X.sp 2in
  70. X.(c
  71. X.sz +2
  72. Implementation Notes on the LLNL MCAST Distribution
  73. X.sz -2
  74. X.)c
  75. X.\" We delay defining the header so it won't show up on the first page ...
  76. X.oh 'LLNL MCAST Implementation notes''%'
  77. X.eh '%''LLNL MCAST Implementation notes'
  78. X.sp 3v
  79. X.pp
  80. This paper contains notes regarding the LLNL MCAST multicast
  81. communication software distribution.  It covers the multicast abstraction,
  82. the API, the current implementation, thoughts about those, and our
  83. experience.
  84. X.sh 1 "The Abstraction"
  85. X.pp
  86. In this section we describe the abstraction that we present to multicast
  87. communications users.
  88. X.pp
  89. We use the ubiquitous
  90. X.i "multicast group"
  91. and
  92. X.i "multicast group membership"
  93. paradigm.  Sets of applications which wish to communicate with each
  94. other
  95. X.i subscribe
  96. to a multicast group created for that purpose and send messages
  97. amongst themselves on the multicast group.  Those messages may be either
  98. X.i broadcast
  99. X(all group members receive copies of the messages) or
  100. X.i unicast
  101. X(the message will only be received by one group member).  This closely
  102. models Ethernet and leads into our the next section of our abstraction
  103. outline ...
  104. X.sh 2 "Multicast groups and group members"
  105. X.pp
  106. The abstraction that we present is that a multicast group forms a virtual
  107. network which consists of some subset of a physical network.  This virtual
  108. network consists of those hosts which are subscribed to the multicast
  109. group.  A host would typically be subscribed to a multicast group because
  110. some application running on the host has requested to be subscribed to the
  111. group.
  112. X.pp
  113. XEvery multicast group has a multicast
  114. X.i "group address"
  115. unique from all other multicast groups.  Additionally, every entity
  116. participating in communication on a multicast group is a
  117. X.i member
  118. of that group and has a unique
  119. X.i "member address"
  120. within the multicast group.
  121. X.pp
  122. A multicast group is uniquely identified by the conjunction of its group
  123. address and the transport protocol being used.  Thus, the creation of a
  124. new group is basically equivalent to creating a new address.  At this
  125. point, only two transport protocols are envisioned: unreliable datagram
  126. and reliable datagram.  The receiving hosts may provide further
  127. presentation semantics via the access they provide applications to
  128. the received data.  e.g. reliable, sequenced byte stream; unreliable,
  129. sequenced message; etc.
  130. X.sh 2 "Potential usage patterns"
  131. X.pp
  132. It is expected that multicast groups will be created on demand and then
  133. thrown away just as easily.  One potential use is to treat multicast
  134. groups in much the same way as UNIX shells treat pipes: i.e.  some
  135. process creates one or more multicast groups, forks one or more children,
  136. and passes the multicast groups to the children for use in communicating
  137. among themselves.
  138. X.pp
  139. Since multicast group creations and member subscriptions and
  140. unsubscriptions may be very dynamic, it must be possible to generate new
  141. group and new member addresses very easily.  In fact, it must be possible
  142. to generate these new group and member addresses in a totally distributed
  143. manner.  There is no room for negotiation, voting, or any other means of
  144. coordinating group and member address assignments.  More information on
  145. this topic is contained below in a section on distributed address
  146. creation.
  147. X.pp
  148. It is also expected that well known groups and members will also be
  149. needed for services like distributed naming, service location, routing,
  150. and a variety of other applications.
  151. X.pp
  152. One of the most important services will be a distributed service that
  153. will allow other advertised services to be found.  Those names will
  154. represent both long term and transient objects.  For instance, an
  155. application may start up, create a multicast group, register its address
  156. with a well known name, operate for a while, and then terminate.
  157. X.pp
  158. While a distributed naming service lives above the multicast transport
  159. and therefore outside the main focus of our current work, it is important
  160. enough that it should be dealt with.  At a minimum, reserved addresses,
  161. etc. will have to be set aside.  Unfortunately, the current
  162. implementation does not address this issue at all.  The current scheme to
  163. translate names to multicast group addresses is a complete hack designed
  164. merely to get things up.  One possibility that seems attractive is to do
  165. something like the IP/Ethernet ARP and subnet mask discovery protocols
  166. where one simply broadcasts requests for resolution to a hardwired
  167. multicast group which all hosts subscribed to.  However, many different
  168. schemes are possible and we just haven't taken the time to work on this
  169. yet.  The need to handle wide area networking only makes the problem more
  170. difficult.
  171. X.sh 2 "Distributed address creation"
  172. X.pp
  173. We have implemented a scheme where addresses are formed by combining the
  174. creating host's ID, the host's idea of the current time, and an
  175. intra-time-tick sequence number to fulfill these requirements.  This
  176. requires a rather large address space, but it's difficult to imagine any
  177. scheme that fulfills the distributed creation requirements that can do so
  178. in a small address space.
  179. X.pp
  180. As it turns out, many distributed applications need to be able to
  181. generate unique identifiers in a distributed manner.  As such, we should
  182. have just invented a more generic distributed identifier.  However, since
  183. we wanted to get on with our work and since the identifiers directly
  184. corresponded to our multicast addresses, we decided not to lose time
  185. working on a more general scheme.
  186. X.sh 2 "Distributed address size and mapping onto link layer multicast addresses"
  187. X.pp
  188. Our current implementation of the distributed address creation algorithm
  189. lives in the user's application so a process identifier must be added to
  190. further distinguish distributed addresses.  This yields a four word
  191. distributed address of 128 bits on our 32 bit machines:
  192. X.(b
  193. X.ta \w'struct'u +\w'unsigned longXXXX'u +\w'sequenceXXXX'u
  194. struct mc_addr { /*MACHINE DEPENDENT */
  195. X    unsigned long    host;    /* host ID (probably too small) */
  196. X    unsigned long    process;    /* process ID */
  197. X    unsigned long    time;    /* time (definitely too small) */
  198. X    unsigned long    sequence;    /* sequence number (probably too small) */
  199. X};
  200. X.)b
  201. One could certainly put the distributed address creation algorithm in the
  202. host's kernel which would easily save one word and bring the address size
  203. down to 96 bits.  It's conceivable that with sufficient effort, another
  204. word might eventually be shaved off without impacting the distributed
  205. creation requirements bringing the total address size down to 64 bits.
  206. And perhaps one could even take a certain risk of address collision and
  207. squeeze addresses down to 48 bits.
  208. X.pp
  209. But even with that incredible optimism, you would still be stuck with two
  210. facts:
  211. X.ip 1.
  212. XEven 48 bits exceeds the number of address bits available for
  213. multicasting in most network link layers.
  214. X.ip 2.
  215. The number of hosts on today's networks are increasing at an ever more
  216. rapid rate.  The number of bits necessary to distinguish hosts is going
  217. to get bigger, not smaller.  There is even a fair chance that host
  218. addresses may be variable length in the future ...
  219. X.pp
  220. Some might still claim that small fixed size multicast group and group
  221. member addresses are possible, but we remain skeptical until proven
  222. wrong.  As such, we have decided to face the problem head on and just
  223. accept that multiple unique multicast groups will be mapped onto the same
  224. network link layer multicast addresses.  Thus, network link layer
  225. multicast facilities will provide a first level filtering of multicast
  226. packets after which host kernels will have to sort through those packets
  227. making it through to see which ones are really destined for applications
  228. on the host.  This is an unpleasant, but seemingly inescapable situation ...
  229. at least until network link layer multicast addresses become larger.
  230. One benefit of facing the problem and accepting its consequences is that
  231. it now becomes trivially easy to support multicast transport over link
  232. layer technologies which don't provide any multicast facilities.
  233. X.pp
  234. Note that while our multicast address contains information specific to
  235. the host the address was created on, the address, once created, isn't
  236. attached to the creating host in any way.  A process could freely migrate
  237. to another host and continue to use an address created on another host.
  238. X.sh 2 "Packet format"
  239. X.pp
  240. At this point it's difficult to be very specific about the format of
  241. multicast transport protocol packets.  The current implementation
  242. depends on a server for most of the transport semantics (see
  243. implementation section below).  The current packet format has no
  244. provisions for error detection, error correction, encryption, options
  245. processing, control, status, etc.
  246. X.pp
  247. However, what is in the packet header is very important: a multicast
  248. group address, a source member address, and a destination member address.
  249. One of the most important aspects of this is that there is only one
  250. multicast group address.  This leads to some interesting stretching of
  251. the Berkeley socket paradigm we chose to use for an application
  252. programming interface.  See the section below on the application
  253. programming interface for enlightenment on this topic ...
  254. X.pp
  255. A broadcast to a multicast group is accomplished by using a destination
  256. address of MCADDR_BROADCAST.  A unicast to a specified group member is
  257. handled by filling in the desired member address in the destination
  258. address field.  The source address is always filled in with the sending
  259. member's group address.
  260. X.pp
  261. This last point leads to the conclusion that an application must be a
  262. member of a multicast group before it can send messages on that group.
  263. This is an undesirable feature according to Dr. Kenneth Birman in his
  264. latest writings as of winter 1992.  We didn't allow this simply because
  265. we couldn't see the need and wanted to keep things simple.  If it becomes
  266. necessary to support such unsubscribed sending, perhaps it could be
  267. accommodated by filling in the source address with MCADDR_ANY ...
  268. X.pp
  269. The current client/server multicast transport protocol packet format is
  270. as follows:
  271. X.(b
  272. X.ta \w'struct'u +\w'struct mc_addrXXXX'u +\w'destinationXXXX'u
  273. struct mc_header { /* MACHINE DEPENDENT */
  274. X    unsigned int    version:8;    /* protocol version */
  275. X    unsigned int    hlength:8;    /* header length */
  276. X    unsigned int    :16;    /* pad to 32 bits */
  277. X    unsigned int    length:32;    /* length of header + data */
  278. X    struct mc_addr    group;    /* multicast group */
  279. X    struct mc_addr    source;    /* source of this message */
  280. X    struct mc_addr    destination;    /* destination of this message */
  281. X    /* message data follows this minimum header */
  282. X};
  283. X.)b
  284. Note that even the version and hlength fields serve very little purpose.
  285. They are generated and checked, but are always the same values for all
  286. packets.  Their main use is for detecting client/server communications
  287. which have lost framing synchronization.  It sure would be nice if
  288. transport protocols like TCP offered applications the ability to insert
  289. application framing marks!!!!
  290. X.sh 1 "The Application Programming Interface"
  291. X.pp
  292. In this section we describe the application programming interface we have
  293. provided to the abstraction outlined above.
  294. X.pp
  295. We chose to use the Berkeley socket interface for two simple reasons:
  296. X.ip 1.
  297. It's relatively simple (once you throw away the documentation) and widely
  298. known.  We hoped this would speed up development of the interface and
  299. help people already experienced in developing network applications with
  300. Berkeley get started with multicast programming.
  301. X.ip 2.
  302. It seemed to fit without too much bending.  The biggest problem came up
  303. in the area of binding naming information to multicast sockets.
  304. X.pp
  305. A complete Berkeley socket-style interface has been provided.  It uses
  306. names like
  307. X.i mc_socket ,
  308. X.i mc_bind ,
  309. and
  310. X.i mc_close .
  311. These emulation routines take the same arguments as the native
  312. X.i socket ,
  313. X.i bind ,
  314. X.i close ,
  315. etc.  There are a few routines special to the API.  Those routines'
  316. names start with
  317. X.i mcast_ .
  318. X.sh 2 "Multicast socket naming and the Berkeley socket paradigm"
  319. X.pp
  320. In traditional protocols supported under the Berkeley socket paradigm, a
  321. communicating socket is fully named by a protocol family, a transport
  322. protocol within that family, a local address part, and a remote address
  323. part.  The local and remote address parts usually consist of the
  324. conjunction of some form of host identifier and a port address.
  325. X.pp
  326. This name is built up in pieces: the protocol family and transport
  327. protocol are specified at socket creation time, the local address part is
  328. partially or fully defined with a bind call, and finally the remote
  329. address part and perhaps some remaining unspecified portion of the local
  330. address part are defined with a connect or accept call.  The local and
  331. remote address parts can also be specified obscurely via sendto calls,
  332. but we'll ignore that complication for now.
  333. X.pp
  334. This model of naming works pretty well for the multicast abstraction
  335. we've defined, but breaks down in two areas:
  336. X.ip 1.
  337. In multicast communication one doesn't tend to connect to or accept
  338. connections from multicast group peer members.  One tends to subscribe to
  339. a multicast group and then communicate with other subscribers of that
  340. group.  In our manual pages we sometimes refer to a multicast socket as
  341. being
  342. X.i connected
  343. when it's associated with a multicast group.  This is probably a
  344. dreadful misnomer that's destined only to confuse people.
  345. X.ip 2.
  346. XFor multicast sockets, the concepts of protocol family, transport
  347. protocol, local address part, and remote address part work pretty well.
  348. The protocol family is PF_MCAST, the protocol is either reliable or
  349. unreliable, the local address part is the application's group member
  350. address, and the remote address part is either MCADDR_BROADCAST or a
  351. specific group member's address.  But where is there room for the
  352. multicast group address in that naming???
  353. X.pp
  354. The first problem points to the need of a symmetric naming concept in the
  355. Berkeley socket paradigm.  We addressed this problem by allowing a bind
  356. X(mc_bind) call to put a multicast socket into a fully communicating
  357. state.  Since a bind call gets both the multicast group and the member
  358. addresses in a single call (see below), it can fill in the only remaining
  359. name component, the remote part, with a default value of MCADDR_BROADCAST.
  360. X.pp
  361. XFor the second problem, if we look at the traditional Berkeley socket
  362. naming sequence, we can see that we bind symmetric name components to the
  363. socket at socket creation time, followed by asymmetric components in
  364. later bind, connect, and accept calls.  In this abstraction, it's clear
  365. that the multicast group address is a symmetric naming component, but
  366. there's nowhere to specify symmetric components except in the socket
  367. creation call and nowhere in that call to specify per protocol
  368. information.
  369. X.pp
  370. We solved this problem by specifying the multicast group address in all
  371. of the bind (mc_bind), connect (mc_connect), and sendto (mc_sendto)
  372. calls.  Once the multicast group address has been specified with any of
  373. these mechanisms, it may not be changed again.  Thus, the multicast group
  374. address offered to all but the first call is superfluous.
  375. X.pp
  376. A much better approach might be to allow per protocol naming information
  377. to be specified at socket creation time.  We didn't do this because we
  378. didn't want to step too far outside the Berkeley socket paradigm.  Also,
  379. we weren't at all certain that it might make even more sense to have a
  380. completely different scheme of providing socket naming information.
  381. Options range from specifying all naming information at socket creation
  382. time (this is similar to the
  383. X.i open
  384. call) to allowing all information to be built up incrementally (this
  385. is similar to declaring a structure and then filling it in a piece at
  386. a time).
  387. X.pp
  388. Also note that this wouldn't be a problem if multicast transport packets
  389. contained a source and a destination multicast group.  This could
  390. probably be done and make sense logically, but the routing issues could
  391. be frightening ...
  392. X.pp
  393. XFinally, regardless of the approach used to specify a socket's name
  394. components, it became obvious to us that a nice high level open-style
  395. routine is highly desirable.  For our multicast sockets, we developed
  396. mcast_sopen.  This simple routine allows applications to connect up to a
  397. multicast group and start communicating with just a few lines of code.
  398. We think that equivalent inet_sopen, ns_sopen, osi_sopen calls would be
  399. equally well received ...
  400. X.sh 1 "The Implementation"
  401. X.pp
  402. The current implementation of the multicast transport uses a
  403. communication server to emulate multicast message passing semantics.
  404. Clients connect to the server, subscribe to a multicast group, and
  405. then commence sending and receiving messages on that group.  A Berkeley
  406. socket-style API (Application Programming Interface) is provided to
  407. hide the details of this interaction.
  408. X.pp
  409. One of the main reasons for developing the client/server implementation of
  410. the multicast communication system was to rapidly prototype and experiment
  411. with multicast communication abstractions and application programming
  412. interfaces (APIs).  We also wanted to enable the rest of our project team
  413. to use multicast communication paradigms without having to wait for a
  414. final implementation.  A later, more complete implementation is destined
  415. to take the place of this initial implementation [hopefully]
  416. transparently under the defined API.
  417. END_OF_FILE
  418. if test 19495 -ne `wc -c <'doc/notes.me'`; then
  419.     echo shar: \"'doc/notes.me'\" unpacked with wrong size!
  420. fi
  421. # end of 'doc/notes.me'
  422. fi
  423. if test -f 'mcastd/mcastd.c' -a "${1}" != "-c" ; then 
  424.   echo shar: Will not clobber existing file \"'mcastd/mcastd.c'\"
  425. else
  426. echo shar: Extracting \"'mcastd/mcastd.c'\" \(48856 characters\)
  427. sed "s/^X//" >'mcastd/mcastd.c' <<'END_OF_FILE'
  428. X/*
  429. X * $Header: /u0/casey/src/mcast/mcastd/RCS/mcastd.c,v 1.7 93/03/17 09:38:39 casey Exp $
  430. X */
  431. X
  432. X/*
  433. X * Copyright (c) 1992 The Regents of the University of California.
  434. X * All rights reserved.
  435. X *
  436. X * Redistribution and use in source and binary forms, with or without
  437. X * modification, are permitted provided that the following conditions
  438. X * are met:
  439. X * 1. Redistributions of source code must retain the above copyright
  440. X *    notice, this list of conditions and the following disclaimer.
  441. X * 2. Redistributions in binary form must reproduce the above copyright
  442. X *    notice, this list of conditions and the following disclaimer in the
  443. X *    documentation and/or other materials provided with the distribution.
  444. X * 3. All advertising materials mentioning features or use of this software
  445. X *    must display the following acknowledgement:
  446. X *    This product includes software developed by the University of
  447. X *    California, Lawrence Livermore National Laboratory and its
  448. X *    contributors.
  449. X * 4. Neither the name of the University nor the names of its contributors
  450. X *    may be used to endorse or promote products derived from this software
  451. X *    without specific prior written permission.
  452. X *
  453. X * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
  454. X * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  455. X * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  456. X * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  457. X * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  458. X * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  459. X * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  460. X * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  461. X * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  462. X * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  463. X * SUCH DAMAGE.
  464. X */
  465. X
  466. X#ifndef lint
  467. static char rcsid[] = "$Header: /u0/casey/src/mcast/mcastd/RCS/mcastd.c,v 1.7 93/03/17 09:38:39 casey Exp $";
  468. static char copyright[] =
  469. X    "Copyright (c) 1992 The Regents of the University of California.\n"
  470. X    "All rights reserved.\n";
  471. static char classification[] =
  472. X    "Unclassified\n";
  473. X#endif
  474. X
  475. X
  476. X/*
  477. X * This is a multicast communication simulation server.  It simulates the
  478. X * semantics of joining multicast groups and sending and reciving messages
  479. X * to and from a set of peer multicast group members.
  480. X *
  481. X * Clients connect to the server, subscribe to a group, and then commence
  482. X * sending and receiving messages on that group.  A Berkeley socket-style
  483. X * API (Application Programming Interface) is provided to hide the details
  484. X * of this interaction.  See the manual pages in ../man and the library
  485. X * API routines in ../libmcast for details on that API.
  486. X *
  487. X * Clients are only allowed to subscribe to one multicast group at a time.
  488. X * Thus, a process wishing to subscribe to more than one multicast group
  489. X * must open a separate connection to the server for each group.
  490. X *
  491. X * This server operates as a member of every multicast group with member
  492. X * address MCD_MCADDR.  Messages to and from the server are used for
  493. X * connection administration.  This includes things like subscribing members
  494. X * to multicast groups, implementing fcntl(2) style controls, etc.
  495. X */
  496. X
  497. X/*
  498. X * Special note:
  499. X *
  500. X * Migration isn't supported in this implementation since migration would
  501. X * require extra mechanism and migration isn't currently one of the goals
  502. X * of our project.  Thus we haven't spent the time to do it.
  503. X *
  504. X * To do client migration under this implementation a method would have to
  505. X * be developed to ``keep a client's place'' in a group's output queue
  506. X * during the client's migration.  One way to do this would be to have
  507. X * clients use some form of explicit shutdown protocol with the server.  A
  508. X * shutdown message could indicate that the connection should be closed and
  509. X * whether the client is ``done,'' or intends to reconnect momentarily.
  510. X *
  511. X * In the first case, any state associated with the client could then be
  512. X * freely dropped.  In the second case, a reconnection timer could be
  513. X * started.  If the client managed to reconnect before the timer expired,
  514. X * well and good.  If not ...
  515. X *
  516. X * But this is all pie in the sky thinking and no efforts have been spent
  517. X * in any real design work.
  518. X */
  519. X
  520. X
  521. X/*
  522. X * In general the compilation environment for this program is assumed to be
  523. X * ANSI C-1989 and POSIX 1003.1-1990 with Berkeley network extensions.  No
  524. X * effort has been spent to make this compile under any other environment.
  525. X * Maybe later ...  This program is known to compile under SGI IRIX 3.3.2.
  526. X */
  527. X
  528. X/*
  529. X * ANSI C and POSIX includes
  530. X */
  531. X#include <errno.h>
  532. X#include <fcntl.h>
  533. X#include <setjmp.h>
  534. X#include <signal.h>
  535. X#include <stdarg.h>
  536. X#include <stddef.h>
  537. X#include <stdio.h>
  538. X#include <stdlib.h>
  539. X#include <string.h>
  540. X#include <unistd.h>
  541. X#include <sys/types.h>
  542. X
  543. X#ifdef NEED_OFFSETOF
  544. X/*
  545. X * offsetof is supposed to be defined in <stddef.h> according to the ANSI C
  546. X * X3.159-1989 specification, but Sun OS 4.1.1 fails to define them ...
  547. X */
  548. X#define    offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
  549. X#endif
  550. X
  551. X#ifdef NEED_EXIT_CODES
  552. X/*
  553. X * EXIT_SUCCESS and EXIT_FAILURE are supposed to be defined in <stdlib.h>
  554. X * according to the ANSI C X3.159-1989 specification, but Sun OS 4.1.1
  555. X * fails to define them ...
  556. X */
  557. X#define    EXIT_SUCCESS    0
  558. X#define    EXIT_FAILURE    1
  559. X#endif
  560. X
  561. X#ifdef NEED_STRERROR
  562. X/*
  563. X * strerror is supposed to be defined in <string.h> and supplied in the
  564. X * standard C library according to the ANSI C X3.159-1989 specification,
  565. X * but Sun OS 4.1.1 fails to define or supply it ...  Unfortunately the only
  566. X * way we can control this is with an externally supplied define.
  567. X */
  568. extern int    errno;            /* system error number */
  569. extern char    *sys_errlist[];        /* system error messages */
  570. extern int    sys_nerr;        /* number of entries in sys_errlist */
  571. X
  572. char *
  573. strerror(int err)
  574. X{
  575. X    if (err < 0 || err >= sys_nerr) {
  576. X        static char msg[100];
  577. X
  578. X        sprintf(msg, "system error number %d", err);
  579. X        return(msg);
  580. X    }
  581. X    return(sys_errlist[err]);
  582. X}
  583. X#endif
  584. X
  585. X
  586. X/*
  587. X * BSD networking includes
  588. X *
  589. X * We use sockets, TCP/IP, select (and therefore fd sets), syslog, getservent,
  590. X * and htonl and friends.
  591. X */
  592. X#include <netdb.h>
  593. X#include <syslog.h>
  594. X#include <sys/uio.h>
  595. X#include <sys/socket.h>
  596. X#include <sys/time.h>
  597. X#include <netinet/in.h>
  598. X#include <arpa/inet.h>
  599. X
  600. X/*
  601. X * Multicast transport includes
  602. X */
  603. X#include <netmcast/mcast.h>
  604. X#include <netmcast/mcastd.h>
  605. X
  606. X
  607. X/*
  608. X *    Arguments
  609. X *    =========
  610. X */
  611. char *usage = "usage: %s [-d] [-l loglevel] [-p port]\n";
  612. X
  613. char *myname;            /* name we were invoked by */
  614. int debug;            /* debug mode -- don't disconnect from tty */
  615. int loglevel = LOG_NOTICE;    /* default syslog logging level */
  616. unsigned short port;        /* TCP port to set up mcast service on */
  617. X
  618. X
  619. X/*
  620. X *    Queue management
  621. X *    ================
  622. X *
  623. X * Used for generic queues.  Queue pointers usually point at Queue structures
  624. X * within the bodies of other structures.  Queue pointers are translated to
  625. X * pointers to the enclosing base type via the macro baseof().  There is
  626. X * usually some Queue structure designated as the head which will be
  627. X * instantiated before any other queue elements and won't be destroyed until
  628. X * all other elements are dequeued and there is no further need for the queue.
  629. X */
  630. typedef struct _Queue { 
  631. X    struct _Queue    *next;        /* next queue element */
  632. X    struct _Queue    *prev;        /* previous queue element */
  633. X} Queue;
  634. X
  635. X
  636. X/*
  637. X *    Message management
  638. X *    ==================
  639. X *
  640. X * Messages are strung together on a queue off of the multicast group that they
  641. X * are destined for.
  642. X *
  643. X * The field "references" actually counts the number of clients that are
  644. X * pointing at the message or some predecessor in the queue.  Thus,
  645. X * "references" is really a sum of the number of clients who are looking at or
  646. X * will look at the message.
  647. X *
  648. X * The mc_header "message" field *MUST* be last.  When messages are read in
  649. X * the Message storage will be realloc()'ed to accommodate the message body
  650. X * which will be stored immediately following the header.
  651. X */
  652. typedef struct _Message {
  653. X    Queue        queue;        /* message queue */
  654. X    int        references;    /* number of "references" */
  655. X    mc_header    msg;        /* MUST BE LAST: the message itself */
  656. X    /* message data follows header */
  657. X} Message;
  658. X
  659. X
  660. X/*
  661. X *    Multicast group management
  662. X *    ==========================
  663. X *
  664. X * Multicast groups are strung together on hash chains.  Currently we only
  665. X * have one hash chain, but all the mechanism is in place to do hashing if the
  666. X * server starts slowing down on multicast group subscriptions because there
  667. X * are too many multicast groups to search through.  Not likely, but it
  668. X * doesn't add much complexity to fake it in case it does become necessary.
  669. X *
  670. X * Clients subscribing to a multicast group are attached to the "clients"
  671. X * queue.  The clients queue is traversed whenever a new message arrives
  672. X * destined for the group.  Any client which is in an output idle state  will
  673. X * be checked to determine if it wants see the message.  If so, its write
  674. X * buffer will be pointed at the newly attached message and its output
  675. X * interrupts will be turned back on (see "pending").  If this traversal starts
  676. X * to become a problem a second client chain can be set up to separate clients
  677. X * in "output" and "idle" states, but it's very unlikely that it will become
  678. X * a problem.
  679. X *
  680. X * "nclients" counts the number of clients subscribed to the multicast group.
  681. X * It's used primarily to initialize the reference count on any incoming
  682. X * message.  A secondary use is to tell when there are no more client
  683. X * subscribed to the group, but that could just as easily be done be checking
  684. X * to see if the "clients" queue is empty.
  685. X *
  686. X * "messages" is an ordered queue of all messages sent to the multicast group
  687. X * which have a non-zero "reference" count.  (See description of "Message"
  688. X * above.)
  689. X */
  690. typedef struct _Group {
  691. X    Queue        hash;        /* group id hash chain */
  692. X    mc_addr        address;    /* group multicast address */
  693. X    Queue        clients;    /* group clients queue head */
  694. X    int        nclients;    /* number of clients */
  695. X    Queue        messages;    /* group messages queue head */
  696. X} Group;
  697. X
  698. X#define    GHASHSIZE    1        /* only one hash bucket for now */
  699. Queue    groups[GHASHSIZE];        /* group hash bucket heads */
  700. X
  701. X#define    ghash(gaddrp)    &groups[0]    /* mcast group address to hash chain */
  702. X
  703. X
  704. X/*
  705. X *    Client management
  706. X *    =================
  707. X *
  708. X * The clients of a multicast group are strung together via the "peers" queue
  709. X * and linked into the multicast group's "clients" field.
  710. X *
  711. X * "group" points back at the client's subscribed multicast group.  It's
  712. X * primary use is to detect the heads of the "peers" and "messages" queues
  713. X * as &group->clients and &group->messages.  It's also used to clean up group
  714. X * state when a client unsubscribes to the multicast group.
  715. X *
  716. X * The "fd" field contains the file descriptor the client is on.  It's used to
  717. X * translate from a client structure pointer to its index in the "clients" 
  718. X * array.
  719. X *
  720. X * The "flags" field is used to keep track of special client state.  Currently
  721. X * this is only used to tell whether the client wants its own multicast
  722. X * messages looped back to it and whether the client wants to operate in a
  723. X * ``send only'' mode (though messages directed explicitly at the client will
  724. X * still be delivered -- see ClientWantsMessage).  The default is not to send
  725. X * a client's messages back to it and to deliver general group messages.
  726. X *
  727. X * The fields "rbuf" and "wbuf" are used to keep track of clients message I/O.
  728. X *
  729. X * "rbuf.message" is either NULL or points at a message that's in the process
  730. X * of being read from the client.  A Message in the process of being read in
  731. X * will always have its "queue" linked to itself and its "reference" count set
  732. X * to 1.  This is not necessary, but it means that messages are always in a
  733. X * consistent state and applying FreeMessage() will be meaningful.: The
  734. X * client's receive buffer is the *ONLY* reference to a message being received.
  735. X * Thus it is always safe to simply free() it.
  736. X *
  737. X * "wbuf.message" is either NULL or points at a message in the group's message
  738. X * queue.  If wbuf.message is non-NULL, the client holds a reference to that
  739. X * message and all following messages.
  740. X *
  741. X * The "offset" subfield in each of "rbuf" and "wbuf" is used to keep track of
  742. X * the current I/O offset within the message.
  743. X */
  744. typedef struct _MIO { /* client message I/O information */
  745. X    Message        *message;    /* message being dealt with */
  746. X    int        offset;        /* I/O offset into message->msg */
  747. X} MIO;
  748. X
  749. typedef struct _Client {
  750. X    Queue        peers;        /* multicast group peers */
  751. X    Group        *group;        /* multicast group */
  752. X    mc_addr        local;        /* client's local multicast address */
  753. X    mc_addr        remote;        /* client's remote connected address */
  754. X    int        fd;        /* client's file descriptor */
  755. X    int        flags;        /* control information (see below) */
  756. X    MIO        rbuf;        /* input message control */
  757. X    MIO        wbuf;        /* output message control */
  758. X} Client;
  759. X
  760. X/* flags */
  761. X#define    CLIENT_CONNECTED    0x0001    /* client connected to another client */
  762. X#define    CLIENT_LOOPBACK        0x0002    /* client sees its own messages */
  763. X#define    CLIENT_SHUTDOWNRECV    0x0004    /* client doesn't want to receive */
  764. X
  765. X#define    MAX_CLIENTS    FD_SETSIZE    /* we use select! */
  766. Client    *clients[MAX_CLIENTS];        /* multicast clients */
  767. X
  768. X
  769. X/*
  770. X *    Server State and Connection management
  771. X *    ======================================
  772. X */
  773. int    mcastd;                /* incoming connection socket */
  774. fd_set    connections;            /* mcastd + all clients connections */
  775. fd_set    pending;            /* all clients with output pending */
  776. int    max_connection;            /* maximum fd in connections */
  777. jmp_buf    dead_client;            /* where to go if a client dies */
  778. jmp_buf    server_reset;            /* where to go if we receive a reset */
  779. X
  780. X#ifndef MCD_BUFSIZ
  781. X#define    MCD_BUFSIZ    (32*1024)    /* send/receive buffer size to attempt
  782. X                       to set on all client connections */
  783. X
  784. X#endif
  785. X
  786. X
  787. X/*
  788. X *    Internal routines
  789. X *    =================
  790. X */
  791. int main(int argc, char *argv[]);    /* main program */
  792. void BecomeDaemon(void);        /* fork and disconnect from tty */
  793. void SetupServerSocket(void);        /* just what it says ... */
  794. void ShutdownServer(void);        /* shutdown all server operations */
  795. void Server(void);            /* main server loop */
  796. void GetConnection(void);        /* accept connection on mcastd */
  797. void CloseConnection(int fd);        /* close down client connection */
  798. void ReadFromClient(int fd);        /* read data from client */
  799. void WriteToClient(int fd);        /* write pending output to client */
  800. void ProcessClientRbuf(int fd);        /* deal with a newly read message */
  801. void HandleClientRequest(int fd, Message *mp);
  802. X                    /* handle client request to server */
  803. void QueueClientMessage(int fd, Message *mp);
  804. X                    /* queue client message to group */
  805. int Subscribe(int fd, const mc_addr *addr);
  806. X                    /* add client to multicast group */
  807. void Unsubscribe(int fd);        /* delete client from current group */
  808. int ClientWantsMessage(const Client *cp, const Message *mp);
  809. X                    /* return TRUE if the message should be
  810. X                       sent to the client */
  811. void FreeMessage(Message *mp);        /* drop a reference to message */
  812. void Log(int level, const char *fmt, ...);
  813. X                    /* internal log routine */
  814. void SigDeadClient(int sig);        /* dead client catcher ... */
  815. void SigReset(int sig);            /* reset signal catcher */
  816. void SigShutdown(int sig);        /* termination signal catcher */
  817. int FD_FFS(const fd_set *fds);        /* first fd_set bit set */
  818. X
  819. X
  820. X/*
  821. X *    Miscellaneous macros, etc.
  822. X *    ==========================
  823. X */
  824. X
  825. X/*
  826. X * Test run-time self consistency assertion.  If assertion expression returns
  827. X * zero, log error and abort execution.  We can't use ANSI C assert macro
  828. X * because we won't have stdout if we fork off as a daemon.
  829. X *
  830. X * (The check for __STDC__ is obnoxious because we *assume* ANSI C
  831. X * everywhere else.  It's needed -- hopefully temporarily -- because IRIX
  832. X * 3.3.2 offers an almost, but not quite, full ANSI C compiler.  They
  833. X * completely fell down with respect to the default cpp and their standard
  834. X * header files can't aren't set up to be processed by their ANSI C acpp.
  835. X * (sigh))  ...
  836. X *
  837. X * later that ``night'', SGI came out with their next brain damage in IRIX
  838. X * 4.0.*: an ANSI standard CPP is now the default, but __STDC__ isn't.
  839. X * They view that __STDC__ should only be defined if one is absolutely and
  840. X * mind-bogglingly, to the letter standard with no extensions -- regardless
  841. X * of whether those extensions affect the semantics of the standard
  842. X * construct subsets or not. (sigh) Sometimes you just want to hit people
  843. X * and tell them ``No.  That's not it.  Try again.'' For now we use an SGI
  844. X * specific define __ANSI_CPP__ which they define when an ANSI cpp is being
  845. X * used ...
  846. X */
  847. X#ifdef NDEBUG
  848. X#define    Assert(e)    ((void)0)
  849. X#else
  850. X#if defined(__STDC__) || (defined(sgi) && defined(__ANSI_CPP__))
  851. X#define    Assert(e) \
  852. X    if (!(e)) { \
  853. X        Log(LOG_EMERG, "assertion failed: file %s, line %d: %s", \
  854. X            __FILE__, __LINE__, #e); \
  855. X        (void)abort(); \
  856. X    }
  857. X#else
  858. X#define    Assert(e) \
  859. X    if (!(e)) { \
  860. X        Log(LOG_EMERG, "assertion failed: file %s, line %d: %s", \
  861. X            __FILE__, __LINE__, "e"); \
  862. X        (void)abort(); \
  863. X    }
  864. X#endif
  865. X#endif
  866. X
  867. X/*
  868. X * Return a pointer to the containing base type given a pointer to one of
  869. X * the base type's fields.
  870. X */
  871. X#define    baseof(type, field, fieldp) \
  872. X    (type *)(void *)((char *)(fieldp) - offsetof(type, field))
  873. X
  874. X/*
  875. X *    The Program ...
  876. X *    ===============
  877. X */
  878. X
  879. int
  880. main(int argc, char *argv[])
  881. X    /*
  882. X     * Process command line arguments, set up server socket, initialize
  883. X     * various data structures, and enter main server loop.
  884. X     */
  885. X{
  886. X    int ch, i;
  887. X    extern int getopt(int, char **, char *);
  888. X    extern char *optarg;
  889. X    extern int optind;
  890. X
  891. X    /*
  892. X     * Parse command line arguments.
  893. X     */
  894. X    myname = strrchr(argv[0], '/');
  895. X    if (myname != NULL)
  896. X        myname++;
  897. X    else
  898. X        myname = argv[0];
  899. X    while ((ch = getopt(argc, argv, "dl:p:")) != EOF)
  900. X        switch ((char)ch) {
  901. X            case '?':
  902. X            (void)fprintf(stderr, usage, myname);
  903. X            exit(EXIT_FAILURE);
  904. X            /*NOTREACHED*/
  905. X            case 'd':
  906. X            debug = 1;
  907. X            break;
  908. X            case 'l':
  909. X            loglevel = atoi(optarg);
  910. X            break;
  911. X            case 'p':
  912. X            port = atoi(optarg);
  913. X            break;
  914. X        }
  915. X
  916. X    /*
  917. X     * Set up ``catch'' for SIGHUP server reset handler.
  918. X     */
  919. X    if (sigsetjmp(server_reset, 1) == 0) {
  920. X        struct sigaction sig;
  921. X
  922. X        sig.sa_handler = SigReset;
  923. X        (void)sigemptyset(&sig.sa_mask);
  924. X        sig.sa_flags = 0;
  925. X        (void)sigaction(SIGHUP, &sig, NULL);
  926. X    } else
  927. X        Log(LOG_EMERG, "reset: restarting operations");
  928. X
  929. X    /*
  930. X     * Start up server.
  931. X     */
  932. X    if (!debug)
  933. X        BecomeDaemon();
  934. X    SetupServerSocket();
  935. X    FD_ZERO(&connections);
  936. X    FD_ZERO(&pending);
  937. X    FD_SET(mcastd, &connections);
  938. X    max_connection = mcastd;
  939. X    for (i = 0; i < GHASHSIZE; i++) {
  940. X        groups[i].next = &groups[i];
  941. X        groups[i].prev = &groups[i];
  942. X    }
  943. X    for (i = 0; i < MAX_CLIENTS; i++) {
  944. X        clients[i] = NULL;
  945. X    }
  946. X    Server();
  947. X    /*NOTREACHED*/
  948. X}
  949. X
  950. X
  951. void
  952. BecomeDaemon(void)
  953. X    /*
  954. X     * Do all the standard stuff necessary to become a daemon.
  955. X     * Fundamentally consists of forking, parent exits, child disassociate
  956. X     * itself from the tty and set up for logging messages to syslogd.  The
  957. X     * demonic child returns to take on the duties of the daemon.
  958. X     */
  959. X{
  960. X    int pid;
  961. X
  962. X    pid = fork();
  963. X    if (pid < 0) {
  964. X        perror(myname);
  965. X        exit(EXIT_FAILURE);
  966. X        /*NOTREACHED*/
  967. X    }
  968. X    if (pid) {
  969. X        exit(EXIT_SUCCESS);
  970. X        /*NOTREACHED*/
  971. X    }
  972. X    (void)close(0);
  973. X    (void)close(1);
  974. X    (void)close(2);
  975. X    (void)setsid();
  976. X    openlog(myname, LOG_PID|LOG_CONS|LOG_NDELAY, LOG_DAEMON);
  977. X    (void)setlogmask(LOG_UPTO(loglevel));
  978. X}
  979. X
  980. X
  981. void
  982. SetupServerSocket(void)
  983. X    /*
  984. X     * Set up multicast daemon server socket.  If no port was specified on
  985. X     * the command line, look for service MCD_SERVICE_NAME/"tcp" in the
  986. X     * services table.  Fall back to MCD_SERVICE_PORT if service is not
  987. X     * registered.
  988. X     */
  989. X{
  990. X    int s;
  991. X    int on;
  992. X    struct sockaddr_in server;
  993. X
  994. X    /* figure out what port we're going to use */
  995. X    if (port)
  996. X        server.sin_port = htons(port);
  997. X    else {
  998. X        char *service_name;
  999. X        struct servent *sp;
  1000. X        extern struct servent *getservbyname(const char *, const char *);
  1001. X
  1002. X        service_name = getenv(MCD_SERVICE_ENV);
  1003. X        if (service_name == NULL)
  1004. X            service_name = MCD_SERVICE_NAME;
  1005. X        sp = getservbyname(service_name, "tcp");
  1006. X        if (sp != NULL)
  1007. X            server.sin_port = sp->s_port;
  1008. X        else {
  1009. X            server.sin_port = htons(MCD_SERVICE_PORT);
  1010. X            Log(LOG_WARNING, "can't find \"" MCD_SERVICE_NAME "\""
  1011. X                " in services table; using %d", MCD_SERVICE_PORT);
  1012. X        }
  1013. X    }
  1014. X
  1015. X    /* before we take off, see if there's already a server running ... */
  1016. X    s = socket(PF_INET, SOCK_STREAM, 0);
  1017. X    if (s < 0) {
  1018. X        Log(LOG_ERR, "exit: %s", strerror(errno));
  1019. X        exit(EXIT_FAILURE);
  1020. X        /*NOTREACHED*/
  1021. X    }
  1022. X    server.sin_family = AF_INET;
  1023. X    server.sin_addr.s_addr = INADDR_LOOPBACK;
  1024. X    if (connect(s, &server, sizeof(server)) >= 0) {
  1025. X        Log(LOG_EMERG, "exit: found a server registered at %d/tcp;\n"
  1026. X            "\tversion of %s already running?",
  1027. X            ntohs(server.sin_port), myname);
  1028. X        (void)close(s);
  1029. X        exit(EXIT_FAILURE);
  1030. X        /*NOTREACHED*/
  1031. X    }
  1032. X    if (errno != ECONNREFUSED) {
  1033. X        Log(LOG_WARNING, "got weird server exclusion result: %s",
  1034. X            strerror(errno));
  1035. X    }
  1036. X
  1037. X    /* we appear to be alone ... */
  1038. X    on = 1;
  1039. X    server.sin_addr.s_addr = INADDR_ANY;
  1040. X    if ((mcastd = socket(PF_INET, SOCK_STREAM, 0)) < 0
  1041. X        || setsockopt(mcastd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0
  1042. X        || bind(mcastd, &server, sizeof(server)) < 0) {
  1043. X        Log(LOG_ERR, "exit: %s", strerror(errno));
  1044. X        exit(EXIT_FAILURE);
  1045. X        /*NOTREACHED*/
  1046. X    }
  1047. X    (void)listen(mcastd, 5);
  1048. X}
  1049. X
  1050. X
  1051. void
  1052. ShutdownServer(void)
  1053. X    /*
  1054. X     * Gracefully shutdown all server operations.
  1055. X     */
  1056. X{
  1057. X    int fd;
  1058. X
  1059. X    (void)close(mcastd);
  1060. X    FD_CLR(mcastd, &connections);
  1061. X    FD_CLR(mcastd, &connections);
  1062. X    while ((fd = FD_FFS(&connections)) >= 0) {
  1063. X        (void)shutdown(fd, 2);
  1064. X        (void)close(fd);
  1065. X        FD_CLR(fd, &connections);
  1066. X    }
  1067. X}
  1068. X
  1069. X
  1070. void
  1071. Server(void)
  1072. X    /*
  1073. X     * Main server loop.  Loops forever accepting connection requests,
  1074. X     * reading data from clients and writing data to clients.
  1075. X     */
  1076. X{
  1077. X    int fd, nfds;
  1078. X    fd_set readfds, writefds;
  1079. X    struct sigaction sig;
  1080. X#ifndef _AIX
  1081. X    /*
  1082. X     * Patch around problem in IBM's AIX 3.2 where they define select
  1083. X     * with void pointers in <sys/select.h>.
  1084. X     */
  1085. X    extern int select(int, fd_set *, fd_set *, fd_set *, struct timeval *);
  1086. X#endif
  1087. X
  1088. X    Log(LOG_INFO, "server started");
  1089. X    sig.sa_handler = SigShutdown;
  1090. X    (void)sigemptyset(&sig.sa_mask);
  1091. X    sig.sa_flags = 0;
  1092. X    (void)sigaction(SIGTERM, &sig, NULL);
  1093. X    for (;;) {
  1094. X        readfds = connections;
  1095. X        writefds = pending;
  1096. X        nfds = select(max_connection+1,
  1097. X            &readfds, &writefds, (fd_set *)0,
  1098. X            (struct timeval *)0);
  1099. X        if (nfds < 0) {
  1100. X            Log(LOG_ERR, "%s", strerror(errno));
  1101. X            (void)sleep(5);
  1102. X            continue;
  1103. X        }
  1104. X        if (nfds == 0) {
  1105. X            Log(LOG_WARNING, "select returned 0");
  1106. X            (void)sleep(5);
  1107. X            continue;
  1108. X        }
  1109. X
  1110. X        /*
  1111. X         * Handle incoming connection requests.
  1112. X         */
  1113. X        if (FD_ISSET(mcastd, &readfds)) {
  1114. X            GetConnection();
  1115. X            FD_CLR(mcastd, &readfds);
  1116. X        }
  1117. X        /*
  1118. X         * Handle all incoming data.
  1119. X         */
  1120. X        while ((fd = FD_FFS(&readfds)) >= 0) {
  1121. X            ReadFromClient(fd);
  1122. X            FD_CLR(fd, &readfds);
  1123. X            if (!FD_ISSET(fd, &connections)) {
  1124. X                /* lost the connection during the read ... */
  1125. X                FD_CLR(fd, &writefds);
  1126. X            }
  1127. X        }
  1128. X        /*
  1129. X         * Now handle clients ready to receive data.
  1130. X         */
  1131. X        while ((fd = FD_FFS(&writefds)) >= 0) {
  1132. X            WriteToClient(fd);
  1133. X            FD_CLR(fd, &writefds);
  1134. X        }
  1135. X    }
  1136. X    /*NOTREACHED*/
  1137. X}
  1138. X
  1139. X
  1140. void
  1141. GetConnection(void)
  1142. X    /*
  1143. X     * A client is knocking on our door.  Accept the connection request and
  1144. X     * set up all associated client state.
  1145. X     */
  1146. X{
  1147. X    int fd;
  1148. X    struct sockaddr_in client;
  1149. X    int client_len, opt, optlen;
  1150. X    Client *cp;
  1151. X
  1152. X    client_len = sizeof(client);
  1153. X    fd = accept(mcastd, &client, &client_len);
  1154. X    if (fd < 0) {
  1155. X        Log(LOG_ERR, "accept: %s", strerror(errno));
  1156. X        return;
  1157. X    }
  1158. X    /*
  1159. X     * When logging the host that the incoming connection is coming from
  1160. X     * we don't want to use gethostbyaddr and gethostbyname because they
  1161. X     * can hang in some environments which would cause the server to hang.
  1162. X     * If it becomes important to put the hostname in the log an alarm
  1163. X     * could be set or a child could be forked.  And don't forget to do
  1164. X     * gethostbyname(gethostbyaddr(addr)) to avoid security hack name
  1165. X     * spoofing!
  1166. X     */
  1167. X#ifdef BROKEN_INET_NTOA
  1168. X    Log(LOG_DEBUG, "fd %d: connection request from %s", fd,
  1169. X        inet_ntoa(&client.sin_addr));
  1170. X#else
  1171. X    Log(LOG_DEBUG, "fd %d: connection request from %s", fd,
  1172. X        inet_ntoa(client.sin_addr));
  1173. X#endif
  1174. X    if (fd >= MAX_CLIENTS) {
  1175. X        /* don't laugh -- this can actually happen under 4.4BSD */
  1176. X        Log(LOG_ALERT, "fd %d: client fd too big -- dropping"
  1177. X            " connection", fd);
  1178. X        (void)close(fd);
  1179. X        return;
  1180. X    }
  1181. X
  1182. X    /*
  1183. X     * Set up internal data structures for new client.
  1184. X     */
  1185. X    cp = (Client *)malloc(sizeof(Client));
  1186. X    if (cp == NULL) {
  1187. X        Log(LOG_ALERT, "fd %d: unable to allocate storage for new"
  1188. X            " client -- dropping connection", fd);
  1189. X        (void)close(fd);
  1190. X        return;
  1191. X    }
  1192. X    cp->peers.next = &cp->peers;
  1193. X    cp->peers.prev = &cp->peers;
  1194. X    (void)memset(&cp->local, 0, sizeof(cp->local));
  1195. X    cp->group = NULL;
  1196. X    cp->local = MCADDR_ANY;
  1197. X    cp->remote = MCADDR_ANY;
  1198. X    cp->fd = fd;
  1199. X    cp->flags = 0;
  1200. X    cp->rbuf.message = NULL;
  1201. X    cp->rbuf.offset = 0;
  1202. X    cp->wbuf.message = NULL;
  1203. X    cp->wbuf.offset = 0;
  1204. X    clients[fd] = cp;
  1205. X    FD_SET(fd, &connections);
  1206. X    if (fd > max_connection)
  1207. X        max_connection = fd;
  1208. X
  1209. X    /*
  1210. X     * Set the send and receive buffer sizes to MCD_BUFSIZ.
  1211. X     */
  1212. X    opt = MCD_BUFSIZ;
  1213. X    if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) {
  1214. X        optlen = sizeof(opt);
  1215. X        (void)getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
  1216. X        Log(LOG_WARNING, "fd %d: unable to set send buffer size to %d,"
  1217. X            " using %d", fd, MCD_BUFSIZ, opt);
  1218. X    }
  1219. X    opt = MCD_BUFSIZ;
  1220. X    if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) {
  1221. X        optlen = sizeof(opt);
  1222. X        (void)getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen);
  1223. X        Log(LOG_WARNING, "fd %d: unable to set receive buffer size to"
  1224. X            " %d, using %d", fd, MCD_BUFSIZ, opt);
  1225. X    }
  1226. X
  1227. X    /*
  1228. X     * Use non-blocking I/O to increase throughput, lower latency and avoid
  1229. X     * deadlocks.  This primarily affects reads and writes of very large
  1230. X     * messages and/or writes to a slow receiver.  A write to file
  1231. X     * descriptor with only a small amount of output space available could
  1232. X     * block waiting for sufficient output space to become available.
  1233. X     * Using non-blocking I/O we only write what can be accommodated and
  1234. X     * the remainder will have to wait for more output space to open up.
  1235. X     * This improves throughput performance because the multiplexing server
  1236. X     * isn't stuck waiting on a particular client when it could be working
  1237. X     * on something else.  It prevents deadlocks because the multiplexing
  1238. X     * server won't be waiting for a write to a client to finish while that
  1239. X     * client may be waiting for a write to the multiplexing server to
  1240. X     * finish ...
  1241. X     */
  1242. X    if (fcntl(fd, F_SETFL, O_NONBLOCK))
  1243. X        Log(LOG_WARNING, "fd %d: unable to set non-blocking mode --"
  1244. X            " suboptimal performance and deadlocks possible", fd);
  1245. X
  1246. X#ifdef BROKEN_INET_NTOA
  1247. X    Log(LOG_INFO, "fd %d: connection from %s established --"
  1248. X        " connections[0] = %#x, max = %d", fd, inet_ntoa(&client.sin_addr),
  1249. X        connections.fds_bits[0], max_connection);
  1250. X#else
  1251. X    Log(LOG_INFO, "fd %d: connection from %s established --"
  1252. X        " connections[0] = %#x, max = %d", fd, inet_ntoa(client.sin_addr),
  1253. X        connections.fds_bits[0], max_connection);
  1254. X#endif
  1255. X}
  1256. X
  1257. X
  1258. void
  1259. CloseConnection(int fd)
  1260. X    /*
  1261. X     * A client went away.  Clean up all associated state.
  1262. X     */
  1263. X{
  1264. X    Client *cp = clients[fd];
  1265. X
  1266. X    Log(LOG_INFO, "fd %d: shutting down connection", fd);
  1267. X    (void)shutdown(fd, 2);
  1268. X    (void)close(fd);
  1269. X    FD_CLR(fd, &connections);
  1270. X    FD_CLR(fd, &pending);
  1271. X    if (fd == max_connection) {
  1272. X        int i;
  1273. X
  1274. X        for (i = max_connection-1; i >= 0; i--)
  1275. X            if (FD_ISSET(i, &connections))
  1276. X                break;
  1277. X        max_connection = i;
  1278. X    }
  1279. X
  1280. X    /* clean up group state associated with client */
  1281. X    if (cp->group != NULL)
  1282. X        Unsubscribe(fd);
  1283. X
  1284. X    clients[fd] = NULL;
  1285. X    free(cp);
  1286. X}
  1287. X
  1288. X
  1289. void
  1290. ReadFromClient(int fd)
  1291. X    /*
  1292. X     * Read data from a client.  This may involve continuing reading a
  1293. X     * message already partially received or starting on a new message if
  1294. X     * none is in progress.  If we complete a message, we get to hand it
  1295. X     * off to ProcessClientRbuf ...
  1296. X     *
  1297. X     * Note that we never read more than one message's worth of data.  This
  1298. X     * may be a feature or a bug.  It's a feature if you don't want one
  1299. X     * client hogging the multicast server.  It's a bug if you can't get
  1300. X     * the throughput you need because the multicast server is effectively
  1301. X     * polling at the message level ...
  1302. X     */
  1303. X{
  1304. X    Client *cp = clients[fd];
  1305. X    Message *mp = cp->rbuf.message;
  1306. X    void *buf;
  1307. X    int len, nread;
  1308. X
  1309. X    /* if not currently reading a message, start on a new one */
  1310. X    if (mp == NULL) {
  1311. X        mp = (Message *)malloc(sizeof(Message));
  1312. X        if (mp == NULL) {
  1313. X            Log(LOG_ALERT, "fd %d: unable to allocate storage for"
  1314. X                " incoming message -- dropping connection!!", fd);
  1315. X            CloseConnection(fd);
  1316. X            return;
  1317. X        }
  1318. X        mp->queue.next = &mp->queue;
  1319. X        mp->queue.prev = &mp->queue;
  1320. X        mp->references = 1;
  1321. X        cp->rbuf.offset = 0;
  1322. X        cp->rbuf.message = mp;
  1323. X    }
  1324. X
  1325. X    /* read chunk of message in */
  1326. X    buf = (char *)&mp->msg + cp->rbuf.offset;
  1327. X    if (cp->rbuf.offset < MCHEADER_CORE) {
  1328. X        /* reading in core message header (which includes "length") */
  1329. X        len = MCHEADER_CORE - cp->rbuf.offset;
  1330. X    } else {
  1331. X        /* header read in -- safe to use "length" field in header */
  1332. X        len = ntohl(mp->msg.length) - cp->rbuf.offset;
  1333. X    }
  1334. X    Assert(len > 0);
  1335. X    nread = read(fd, buf, len);
  1336. X
  1337. X    /* find out what we actually got */
  1338. X    if (nread < 0) {
  1339. X        Log(LOG_ERR, "fd %d: read failed: %s", fd, strerror(errno));
  1340. X        return;
  1341. X    }
  1342. X    if (nread == 0) {
  1343. X        Log(LOG_INFO, "fd %d: read returned 0 -- peer dead", fd);
  1344. X        CloseConnection(fd);
  1345. X        return;
  1346. X    }
  1347. X    cp->rbuf.offset += nread;
  1348. X    Log(LOG_DEBUG, "fd %d: read %d bytes of message, %d requested", fd,
  1349. X        nread, len);
  1350. X    if (nread < len) {
  1351. X        /* still more to read for the message */
  1352. X        return;
  1353. X    }
  1354. X    ProcessClientRbuf(fd);
  1355. X    /*
  1356. X     * If the receive buffer still has a message attached to it, that
  1357. X     * means that ProcessClientRbuf enlarged the message from just a
  1358. X     * core message header to a message header followed by message body.
  1359. X     * We may as well see if there's more to read ...
  1360. X     */
  1361. X    if (cp->rbuf.message != NULL)
  1362. X        ReadFromClient(fd);
  1363. X}
  1364. X
  1365. X
  1366. void
  1367. WriteToClient(int fd)
  1368. X    /*
  1369. X     * Write data to a client.  This may involve continuing to write a
  1370. X     * message already partially transmitted or starting on a new message
  1371. X     * if none is in progress.  If we complete sending a message, we drop
  1372. X     * our reference to it and move on to the next in the queue.  If there
  1373. X     * are no more messages to send to the client, we drop the client from
  1374. X     * the pending output ``list.''
  1375. X     */
  1376. X{
  1377. X    Client *cp = clients[fd];
  1378. X    Message *mp = cp->wbuf.message;
  1379. X    void *buf;
  1380. X    int len, nwrite;
  1381. X    struct sigaction sig, osig;
  1382. X
  1383. X    /* set up to catch a SIGPIPE in case peer has died */
  1384. X    if (sigsetjmp(dead_client, 1) != 0) {
  1385. X        /* note: osig will be set by the time this is executed */
  1386. X        (void) sigaction(SIGPIPE, &osig, (struct sigaction *)0);
  1387. X        Log(LOG_INFO, "fd %d: write got SIGPIPE -- peer died", fd);
  1388. X        CloseConnection(fd);
  1389. X        return;
  1390. X    }
  1391. X    sig.sa_handler = SigDeadClient;
  1392. X    (void)sigemptyset(&sig.sa_mask);
  1393. X    sig.sa_flags = 0;
  1394. X    (void)sigaction(SIGPIPE, &sig, &osig);
  1395. X
  1396. X    /* write next chunk of message out */
  1397. X    Assert(mp != NULL);
  1398. X    buf = (char *)&mp->msg + cp->wbuf.offset;
  1399. X    len = ntohl(mp->msg.length) - cp->wbuf.offset;
  1400. X    Assert(len > 0);
  1401. X    nwrite = write(fd, buf, len);
  1402. X
  1403. X    /* reset SIGPIPE signal state and find out what the write did */
  1404. X    (void)sigaction(SIGPIPE, &osig, (struct sigaction *)0);
  1405. X    if (nwrite < 0) {
  1406. X        Log(LOG_ERR, "fd %d: write failed: %s", fd, strerror(errno));
  1407. X        return;
  1408. X    }
  1409. X    if (nwrite == 0) {
  1410. X        Log(LOG_WARNING, "fd %d: write returned 0", fd);
  1411. X        return;
  1412. X    }
  1413. X    if (nwrite < len) {
  1414. X        /* still more to write on the message */
  1415. X        cp->wbuf.offset += nwrite;
  1416. X        Log(LOG_DEBUG, "fd %d: wrote %d bytes of message", fd, nwrite);
  1417. X        return;
  1418. X    }
  1419. X
  1420. X    /* finished writing message -- go on to next */
  1421. X    Log(LOG_INFO, "fd %d: finished writing %d byte message", fd,
  1422. X        ntohl(mp->msg.length));
  1423. X    cp->wbuf.offset = 0;
  1424. X    for (;;) {
  1425. X        Queue *mqp = mp->queue.next;
  1426. X
  1427. X        FreeMessage(mp);
  1428. X        if (mqp == &cp->group->messages) {
  1429. X            /* that was the last message in the queue */
  1430. X            cp->wbuf.message = NULL;
  1431. X            FD_CLR(fd, &pending);
  1432. X            break;
  1433. X        }
  1434. X        mp = baseof(Message, queue, mqp);
  1435. X        if (ClientWantsMessage(cp, mp)) {
  1436. X            cp->wbuf.message = mp;
  1437. X            break;
  1438. X        }
  1439. X    }
  1440. X}
  1441. X
  1442. X
  1443. void
  1444. ProcessClientRbuf(int fd)
  1445. X    /*
  1446. X     * Process a newly received client message.  ProcessClientRbuf is
  1447. X     * called when the current expected message length has been read in.
  1448. X     * If that length only covers the core message header,
  1449. X     * ProcessClientRbuf will delve into the message header to determine
  1450. X     * how large the entire message is.  It will then reallocate the
  1451. X     * message storage to cover the message and leave the message connected
  1452. X     * to the client's receive buffer for further input.
  1453. X     *
  1454. X     * If the message is complete, ProcessClientRbuf will disconnect the
  1455. X     * message from the client's receive buffer and dispatch it to either
  1456. X     * HandleClientRequest or QueueClientMessage depending on whether the
  1457. X     * message was or was not directed at the server (us).
  1458. X     */
  1459. X{
  1460. X    Client *cp = clients[fd];
  1461. X    Message *mp = cp->rbuf.message;
  1462. X
  1463. X    /*
  1464. X     * If just finished reading core message header, set up for reading
  1465. X     * remaining header and body.
  1466. X     */
  1467. X    if (cp->rbuf.offset == MCHEADER_CORE) {
  1468. X        unsigned int
  1469. X            ver = mp->msg.version,
  1470. X            hlen = mp->msg.hlength,
  1471. X            len = ntohl(mp->msg.length);
  1472. X
  1473. X        if (ver != MCVERSION
  1474. X            || hlen < MCHEADER_MIN || hlen > MCHEADER_MAX
  1475. X            || len < hlen) {
  1476. X            Log(LOG_ERR, "fd %d: malformed message header --"
  1477. X                " dropping message", fd);
  1478. X            cp->rbuf.message = NULL;
  1479. X            cp->rbuf.offset = 0;
  1480. X            free(mp);
  1481. X            return;
  1482. X        }
  1483. X        mp = (Message *)realloc(mp, offsetof(Message, msg) + len);
  1484. X        if (mp == NULL) {
  1485. X            Log(LOG_ALERT, "fd %d: unable to allocate storage for"
  1486. X                " message body -- dropping connection!!", fd);
  1487. X            CloseConnection(fd);
  1488. X            return;
  1489. X        }
  1490. X        cp->rbuf.message = mp;
  1491. X        return;
  1492. X    }
  1493. X
  1494. X    /*
  1495. X     * Message complete -- disconnect from receive buffer and process.
  1496. X     */
  1497. X    cp->rbuf.offset = 0;
  1498. X    cp->rbuf.message = NULL;
  1499. X    Log(LOG_INFO, "fd %d: finished reading %d byte message", fd,
  1500. X        ntohl(mp->msg.length));
  1501. X
  1502. X    if (MCADDR_EQ(mp->msg.destination, MCD_MCADDR))
  1503. X        HandleClientRequest(fd, mp);
  1504. X    else
  1505. X        QueueClientMessage(fd, mp);
  1506. X}
  1507. X
  1508. X
  1509. void
  1510. HandleClientRequest(int fd, Message *mp)
  1511. X    /*
  1512. X     * Handle a request from the client to the server (us).  Message is
  1513. X     * freed after being processed.  Most of these reflect what would be the
  1514. X     * equivalents of ioctl's, bind's, and other local network and link
  1515. X     * layer operations if the multicast protocol were implemented with
  1516. X     * network and link layer multicast facilities instead of a server as
  1517. X     * it is in this implementation.
  1518. X     */
  1519. X{
  1520. X    Client *cp = clients[fd];
  1521. X    unsigned long len
  1522. X        = ntohl(mp->msg.length) - mp->msg.hlength;
  1523. X    mcd_message *sp
  1524. X        = (mcd_message *)(void *)((char *)&mp->msg + mp->msg.hlength);
  1525. X
  1526. X    if (len == 0) {
  1527. X        Log(LOG_WARNING, "fd %d: zero length server request --"
  1528. X            " dropping message", fd);
  1529. X        free(mp);
  1530. X        return;
  1531. X    }
  1532. X    while (len > 0) {
  1533. X        if (len < sizeof(mcd_message)) {
  1534. X            Log(LOG_ERR, "fd %d: incomplete server request --"
  1535. X                " dropping message", fd);
  1536. X            free(mp);
  1537. X            return;
  1538. X        }
  1539. X        switch (ntohl(sp->request)) {
  1540. X            default:
  1541. X            Log(LOG_ERR, "fd %d: bogus request %d -- dropping"
  1542. X                " request", fd, htonl(sp->request));
  1543. X            break;
  1544. X
  1545. X            case MCD_BIND:
  1546. X            if (MCADDR_EQ(sp->group, MCADDR_ANY)
  1547. X                && MCADDR_EQ(sp->local, MCADDR_ANY)
  1548. X                && MCADDR_EQ(sp->remote, MCADDR_ANY)) {
  1549. X                Log(LOG_INFO, "fd %d: unbinding addresses", fd);
  1550. X                if (cp->group)
  1551. X                    Unsubscribe(fd);
  1552. X                cp->local = MCADDR_ANY;
  1553. X                cp->remote = MCADDR_ANY;
  1554. X                cp->flags &= ~CLIENT_CONNECTED;
  1555. X                break;
  1556. X            }
  1557. X            if (MCADDR_EQ(sp->group, MCADDR_ANY)
  1558. X                || MCADDR_EQ(sp->group, MCADDR_BROADCAST)
  1559. X                || MCADDR_EQ(sp->local, MCADDR_ANY)
  1560. X                || MCADDR_EQ(sp->local, MCADDR_BROADCAST)
  1561. X                || MCADDR_EQ(sp->remote, MCADDR_ANY)) {
  1562. X                Log(LOG_ERR, "fd %d: bad MCD_BIND request"
  1563. X                    " -- dropping request", fd);
  1564. X                break;
  1565. X            }
  1566. X            Log(LOG_INFO, "fd %d: binding addresses", fd);
  1567. X            if (cp->group)
  1568. X                Unsubscribe(fd);
  1569. X            (void)Subscribe(fd, &sp->group);
  1570. X            cp->local = sp->local;;
  1571. X            cp->remote = sp->remote;
  1572. X            if (MCADDR_EQ(cp->remote, MCADDR_BROADCAST))
  1573. X                cp->flags &= ~CLIENT_CONNECTED;
  1574. X            else
  1575. X                cp->flags |= CLIENT_CONNECTED;
  1576. X            break;
  1577. X
  1578. X            case MCD_LOOPBACK:
  1579. X            if (ntohl(sp->option) != 0)
  1580. X                cp->flags |= CLIENT_LOOPBACK;
  1581. X            else
  1582. X                cp->flags &= ~CLIENT_LOOPBACK;
  1583. X            Log(LOG_INFO, "fd %d: set client loopback %s", fd,
  1584. X                ntohl(sp->option) ? "on" : "off");
  1585. X            break;
  1586. X
  1587. X            case MCD_SHUTDOWNRECV:
  1588. X            if (ntohl(sp->option) != 0)
  1589. X                cp->flags |= CLIENT_SHUTDOWNRECV;
  1590. X            else
  1591. X                cp->flags &= ~CLIENT_SHUTDOWNRECV;
  1592. X            Log(LOG_INFO, "fd %d: set client sendonly %s", fd,
  1593. X                ntohl(sp->option) ? "on" : "off");
  1594. X            break;
  1595. X        }
  1596. X        sp++;
  1597. X        len -= sizeof(mcd_message);
  1598. X    }
  1599. X    free(mp);
  1600. X}
  1601. X
  1602. X
  1603. void
  1604. QueueClientMessage(int fd, Message *mp)
  1605. X    /*
  1606. X     * Queue up a message from a client to its multicast group.  Under
  1607. X     * various cercumstances, the message may simply be freed.  Specific
  1608. X     * cases include bad message destination address and no clients
  1609. X     * qualified to receive the message.
  1610. X     */
  1611. X{
  1612. X    Group *gp = clients[fd]->group;
  1613. X    Queue *cqp;
  1614. X
  1615. X    if (gp == NULL) {
  1616. X        Log(LOG_ERR, "fd %d: unregistered client attempting to"
  1617. X            " transmit -- dropping message", fd);
  1618. X        free(mp);
  1619. X        return;
  1620. X    }
  1621. X    if (!MCADDR_EQ(mp->msg.group, gp->address)) {
  1622. X        Log(LOG_ERR, "fd %d: client sending message to wrong"
  1623. X            " group -- dropping message", fd);
  1624. X        free(mp);
  1625. X        return;
  1626. X    }
  1627. X
  1628. X    /* start by assuming that all clients will want this message */
  1629. X    mp->references = gp->nclients;
  1630. X
  1631. X    /* add message to end of group message queue */
  1632. X    mp->queue.next = &gp->messages;
  1633. X    mp->queue.prev = gp->messages.prev;
  1634. X    mp->queue.prev->next = &mp->queue;
  1635. X    mp->queue.next->prev = &mp->queue;
  1636. X
  1637. X    /* restart output on any idling clients that want the message */
  1638. X    for (cqp = gp->clients.next; cqp != &gp->clients; cqp = cqp->next) {
  1639. X        Client *pp = baseof(Client, peers, cqp);
  1640. X
  1641. X        if (pp->wbuf.message == NULL) {
  1642. X            /*
  1643. X             * DON'T PULL THIS OPTIMIZATION OUT!  If you do,
  1644. X             * you'll have to change the structure of
  1645. X             * WriteToClient since it assumes on entry that any
  1646. X             * message in its output buffer should be sent to the
  1647. X             * client.  Note that this would also lead to a
  1648. X             * probably undesirable scenario where the client
  1649. X             * sends a LOOPBACK request and gets copies of its own
  1650. X             * messages sent prior to the LOOPBACK request ...
  1651. X             *
  1652. X             * Also, checking for the condition here avoids waking
  1653. X             * up uselessly to deliver a message only to find out
  1654. X             * the client doesn't want it.  There are several
  1655. X             * common use patterns of multicast groups where this
  1656. X             * is a win.
  1657. X             */
  1658. X            if (ClientWantsMessage(pp, mp)) {
  1659. X                pp->wbuf.offset = 0;
  1660. X                pp->wbuf.message = mp;
  1661. X                FD_SET(pp->fd, &pending);
  1662. X                Log(LOG_DEBUG, "fd %d: restarted output on %d",
  1663. X                    fd, pp->fd);
  1664. X            } else {
  1665. X                /* this client will never see it */
  1666. X                mp->references--;
  1667. X                Assert(mp->references >= 0);
  1668. X                Log(LOG_DEBUG, "fd %d: not restarting output"
  1669. X                    " on %d", fd, pp->fd);
  1670. X            }
  1671. X        }
  1672. X    }
  1673. X    if (mp->references == 0) {
  1674. X        /* no one wanted the message -- dequeue and free message */
  1675. X        mp->queue.prev->next = mp->queue.next;
  1676. X        mp->queue.next->prev = mp->queue.prev;
  1677. X        free(mp);
  1678. X        Log(LOG_DEBUG, "fd %d: no clients wanted message -- message"
  1679. X            " dropped", fd);
  1680. X    }
  1681. X}
  1682. X
  1683. X
  1684. int
  1685. Subscribe(int fd, const mc_addr *addr)
  1686. X    /*
  1687. X     * Subscribe client to multicast group with multicast address "addr."
  1688. X     * If client is already subscribe to another group, it will be
  1689. X     * unsubscribed first.
  1690. X     *
  1691. X     * If successful, 1 will be returned and the client's group state will
  1692. X     * be initialized to be a client of the requested group.  State
  1693. X     * particular to the client will not be touched.  Thus, the client's
  1694. X     * address and flags state will remain unchanged.
  1695. X     *
  1696. X     * If unsuccessful, a 0 will be returned and no state will have been
  1697. X     * changed.  The only reason for a failing return is inability to
  1698. X     * allocate storage for a new group control block.  It's almost certain
  1699. X     * that any calling party will be forced to drop the client's
  1700. X     * connection because there just isn't much that can be done, but we'll
  1701. X     * let our caller decide what to do ...
  1702. X     */
  1703. X{
  1704. X    Client *cp = clients[fd];
  1705. X    Group *gp;
  1706. X    Queue *ghqp = ghash(addr);
  1707. X    Queue *gqp;
  1708. X
  1709. X    /*
  1710. X     * Find group with address addr.
  1711. X     */
  1712. X    for (gqp = ghqp->next; /* void */; gqp = gqp->next) {
  1713. X        if (gqp == ghqp) {
  1714. X            /*
  1715. X             * Reached end of hash chain, so this is the first
  1716. X             * client to request subscription to the group --
  1717. X             * instantiate the group.
  1718. X             */
  1719. X            gp = (Group *)malloc(sizeof(Group));
  1720. X            if (gp == NULL) {
  1721. X                Log(LOG_ALERT, "fd %d: unable to allocate"
  1722. X                    " storage for new group -- dropping"
  1723. X                    " connection!", fd);
  1724. X                return(0);
  1725. X            }
  1726. X            Log(LOG_INFO, "fd %d: created new group", fd);
  1727. X
  1728. X            /*
  1729. X             * Initialize new group state, we'll add client below.
  1730. X             */
  1731. X            gp->address = *addr;
  1732. X            gp->clients.next = &gp->clients;
  1733. X            gp->clients.prev = &gp->clients;
  1734. X            gp->nclients = 0;
  1735. X            gp->messages.next = &gp->messages;
  1736. X            gp->messages.prev = &gp->messages;
  1737. X
  1738. X            /* add new group to its hash chain */
  1739. X            gp->hash.next = ghqp;
  1740. X            gp->hash.prev = ghqp->prev;
  1741. X            gp->hash.prev->next = &gp->hash;
  1742. X            gp->hash.next->prev = &gp->hash;
  1743. X            break;
  1744. X        }
  1745. X        gp = baseof(Group, hash, gqp);
  1746. X        if (MCADDR_EQ(gp->address, *addr))
  1747. X            break;
  1748. X    }
  1749. X
  1750. X    /*
  1751. X     * Found the group -- it's okay to drop any old subscription.
  1752. X     */
  1753. X    if (cp->group != NULL) {
  1754. X        Log(LOG_NOTICE, "fd %d: subscribe called on already subscribe"
  1755. X            " client -- unsubscribing first ...", fd);
  1756. X        Unsubscribe(fd);
  1757. X    }
  1758. X
  1759. X    /*
  1760. X     * Add client to group.
  1761. X     */
  1762. X    cp->group = gp;
  1763. X    cp->peers.next = &gp->clients;
  1764. X    cp->peers.prev = gp->clients.prev;
  1765. X    cp->peers.prev->next = &cp->peers;
  1766. X    cp->peers.next->prev = &cp->peers;
  1767. X    gp->nclients++;
  1768. X
  1769. X    /*
  1770. X     * Note: we could start sending the client already queued messages,
  1771. X     * but there's really no point.
  1772. X     */
  1773. X    Assert(cp->rbuf.message == NULL);
  1774. X    Assert(cp->wbuf.message == NULL);
  1775. X
  1776. X    Log(LOG_INFO, "fd %d: subscribed to group", fd);
  1777. X    return(1);
  1778. X}
  1779. X
  1780. X
  1781. void
  1782. Unsubscribe(int fd)
  1783. X    /*
  1784. X     * Unsubscribe client from multicast group.  Clean up group state
  1785. X     * associated with client.  If no more clients exist for group, destroy
  1786. X     * the group.  On return, the client will be marked as not belonging to
  1787. X     * any group, its read and write buffers will have been cleared and any
  1788. X     * output interrupts turned off.  State particular to the client will
  1789. X     * not be touched.  Thus, the client's address and flags state will
  1790. X     * remain unchanged.
  1791. X     */
  1792. X{
  1793. X    Client *cp = clients[fd];
  1794. X    Group *gp = cp->group;
  1795. X
  1796. X    if (gp == NULL) {
  1797. X        Log(LOG_NOTICE, "fd %d: unsubscribe called on non-subscribed"
  1798. X            " client -- ignoring", fd);
  1799. X        return;
  1800. X    }
  1801. X
  1802. X    /* remove client from group client list */
  1803. X    cp->peers.next->prev = cp->peers.prev;
  1804. X    cp->peers.prev->next = cp->peers.next;
  1805. X
  1806. X    /* mark client as not belonging to any group */
  1807. X    cp->peers.next = &cp->peers;
  1808. X    cp->peers.prev = &cp->peers;
  1809. X    cp->group = NULL;
  1810. X
  1811. X    /* drop client's reference to all messages down the queue */
  1812. X    if (cp->wbuf.message != NULL) {
  1813. X        Message *mp = cp->wbuf.message;
  1814. X
  1815. X        FD_CLR(fd, &pending);
  1816. X        cp->wbuf.message = NULL;
  1817. X        cp->wbuf.offset = 0;
  1818. X        for (;;) {
  1819. X            Queue *mqp = mp->queue.next;
  1820. X
  1821. X            FreeMessage(mp);
  1822. X            if (mqp == &gp->messages)
  1823. X                break;
  1824. X            mp = baseof(Message, queue, mqp);
  1825. X        }
  1826. X    }
  1827. X
  1828. X    /* drop client's rbuf */
  1829. X    if (cp->rbuf.message != NULL)
  1830. X        free(cp->rbuf.message);
  1831. X
  1832. X    Log(LOG_INFO, "fd %d: unsubscribed from group", fd);
  1833. X
  1834. X    /* one fewer client in the group */
  1835. X    gp->nclients--;
  1836. X    Assert(gp->nclients >= 0);
  1837. X    /* destroy group if that was the last client */
  1838. X    if (gp->nclients == 0) {
  1839. X        Log(LOG_INFO, "fd %d: destroyed group", fd);
  1840. X
  1841. X        /* do a little bit of internal consistency checking */
  1842. X        Assert(gp->clients.next == &gp->clients);
  1843. X        Assert(gp->clients.prev == &gp->clients);
  1844. X        Assert(gp->messages.next == &gp->messages);
  1845. X        Assert(gp->messages.prev == &gp->messages);
  1846. X
  1847. X        /* remove the group from its hash chain */
  1848. X        gp->hash.next->prev = gp->hash.prev;
  1849. X        gp->hash.prev->next = gp->hash.next;
  1850. X        free(gp);
  1851. X    }
  1852. X}
  1853. X
  1854. X
  1855. int
  1856. ClientWantsMessage(const Client *cp, const Message *mp)
  1857. X    /*
  1858. X     * Return TRUE if the client wants to see the message.
  1859. X     *
  1860. X     * If the client is ``connected'' to another client and this message
  1861. X     * isn't from that other client, return FALSE.
  1862. X     *
  1863. X     * If the message is directed explicitly at this client (i.e. a
  1864. X     * ``unicast'' message) return TRUE.  Neither the CLIENT_LOOPBACK nor
  1865. X     * CLIENT_SHUTDOWNRECV flags will affect this semantic.
  1866. X     *
  1867. X     * Else, if the client has CLIENT_SHUTDOWNRECV set return FALSE.  The
  1868. X     * client is a ``write-only'' client.
  1869. X     *
  1870. X     * Else, if the message isn't directed at the multicast group, it must
  1871. X     * be a ``unicast'' message to another client, so return FALSE.
  1872. X     *
  1873. X     * Else, if the message isn't from this client, return TRUE.
  1874. X     *
  1875. X     * Else, if the client has requested loopback of multicast messages,
  1876. X     * return TRUE.  Else, return FALSE.
  1877. X     */
  1878. X{
  1879. X    if ((cp->flags & CLIENT_CONNECTED)
  1880. X        && !MCADDR_EQ(mp->msg.source, cp->remote)) {
  1881. X        /* client is connected; uninterested in dealing with others */
  1882. X        return(0);
  1883. X    }
  1884. X    if (MCADDR_EQ(mp->msg.destination, cp->local)) {
  1885. X        /* ``unicast'' to client */
  1886. X        return(1);
  1887. X    }
  1888. X    if (cp->flags & CLIENT_SHUTDOWNRECV) {
  1889. X        /* client is ``write-only'' */
  1890. X        return(0);
  1891. X    }
  1892. X    if (!MCADDR_EQ(mp->msg.destination, MCADDR_BROADCAST)) {
  1893. X        /* ``unicast'' to some other client */
  1894. X        return(0);
  1895. X    }
  1896. X    if (!MCADDR_EQ(mp->msg.source, cp->local)) {
  1897. X        /* multicast, but not originated by client */
  1898. X        return(1);
  1899. X    }
  1900. X    /* multicast originated by client */
  1901. X    return((cp->flags & CLIENT_LOOPBACK) != 0);
  1902. X}
  1903. X
  1904. void
  1905. XFreeMessage(Message *mp)
  1906. X    /*
  1907. X     * Drop a reference to a message in a message queue.  If that was the
  1908. X     * last reference, dequeue and free the message.
  1909. X     */
  1910. X{
  1911. X    mp->references--;
  1912. X    Assert(mp->references >= 0);
  1913. X    if (mp->references == 0) {
  1914. X        mp->queue.next->prev = mp->queue.prev;
  1915. X        mp->queue.prev->next = mp->queue.next;
  1916. X        free(mp);
  1917. X    }
  1918. X}
  1919. X
  1920. X
  1921. X/*VARARGS2*/
  1922. void
  1923. Log(int level, const char *fmt, ...)
  1924. X    /*
  1925. X     * Log a message.  If in debug mode all messages are printed to stderr.
  1926. X     * Otherwise they're sent to syslogd ...
  1927. X     */
  1928. X{
  1929. X    va_list args;
  1930. X    extern int vsyslog(int, const char *, va_list);
  1931. X
  1932. X    va_start(args, fmt);
  1933. X    if (debug) {
  1934. X        if (level <= loglevel) {
  1935. X            (void)fprintf(stderr, "%s: ", myname);
  1936. X            (void)vfprintf(stderr, fmt, args);
  1937. X            (void)fputc('\n', stderr);
  1938. X        }
  1939. X    } else
  1940. X        (void)vsyslog(level, fmt, args);
  1941. X    va_end(args);
  1942. X}
  1943. X
  1944. X
  1945. X#ifdef _AIX
  1946. int
  1947. vsyslog(int level, const char *fmt, va_list ap)
  1948. X    /*
  1949. X     * Log a message to the syslog facility using a varargs agument list.
  1950. X     * This is a simple minded replacement for vsyslog(3) on systems which
  1951. X     * don't supply it.
  1952. X     */
  1953. X{
  1954. X    char buf[BUFSIZ], str[BUFSIZ];
  1955. X    register char *bp, *sp;
  1956. X
  1957. X    (void)vsprintf(buf, fmt, ap);
  1958. X    /*
  1959. X     * Go back and double any percent signs (%) to prevent syslog
  1960. X     * from barfing.
  1961. X     */
  1962. X    bp = buf;
  1963. X    sp = str;
  1964. X    for (;;) {
  1965. X        register char c;
  1966. X
  1967. X        *sp++ = c = *bp++;
  1968. X        if (c == '\0')
  1969. X            break;
  1970. X        if (c == '%')
  1971. X            *sp++ = '%';
  1972. X    }
  1973. X    return(syslog(level, str));
  1974. X}
  1975. X#endif /* NEED VSYSLOG(3) */
  1976. X
  1977. X
  1978. X/*ARGSUSED*/
  1979. void
  1980. SigDeadClient(int sig)
  1981. X    /*
  1982. X     * Dead client signal handler.  Indicates that a connection died and we
  1983. X     * got got a SIGPIPE while trying to write on it.  WriteToClient sets
  1984. X     * this trap and is prepared to take an error return through a long
  1985. X     * jump to dead_client.
  1986. X     */
  1987. X{
  1988. X    siglongjmp(dead_client, 1);
  1989. X    /*NOTREACHED*/
  1990. X}
  1991. X
  1992. X
  1993. X/*ARGSUSED*/
  1994. void
  1995. SigReset(int sig)
  1996. X    /*
  1997. X     * Reset signal handler.  Indicates a reset request from someone with
  1998. X     * privileges to send us a reset signal (SIGHUP).  Shutdown and close
  1999. X     * all connections and restart operations.
  2000. X     */
  2001. X{
  2002. X    Log(LOG_EMERG, "reset: received reset signal (%d)"
  2003. X        " -- resetting immediately", sig);
  2004. X    ShutdownServer();
  2005. X    siglongjmp(server_reset, 1);
  2006. X    /*NOTREACHED*/
  2007. X}
  2008. X
  2009. X
  2010. X/*ARGSUSED*/
  2011. void
  2012. SigShutdown(int sig)
  2013. X    /*
  2014. X     * Shutdown signal handler.  Indicates a shutdown request either from
  2015. X     * init or someone else with privileges to send us a shutdown signal
  2016. X     * (SIGTERM).  Shutdown and close all connections and exit.
  2017. X     */
  2018. X{
  2019. X    Log(LOG_EMERG, "exit: received shutdown signal (%d)"
  2020. X        " -- shutting down immediately");
  2021. X    ShutdownServer();
  2022. X    exit(SIGTERM);
  2023. X    /*NOTREACHED*/
  2024. X}
  2025. X
  2026. X
  2027. int
  2028. XFD_FFS(const fd_set *fds)
  2029. X    /*
  2030. X     * Return the file descriptor of the first (lowest) file descriptor bit
  2031. X     * set in the file descriptor mask.  If no bits are set, return -1.
  2032. X     */
  2033. X{
  2034. X    int m, b, fd;
  2035. X    const fd_mask *p;
  2036. X
  2037. X    p = fds->fds_bits;
  2038. X    for (m = 0; m < howmany(FD_SETSIZE, NFDBITS); m++)
  2039. X        if (p[m])
  2040. X            for (b = 1, fd = 0; fd < NFDBITS; b <<= 1, fd++)
  2041. X                if (p[m] & b)
  2042. X                    return(m*NFDBITS + fd);
  2043. X    return(-1);
  2044. X}
  2045. END_OF_FILE
  2046. if test 48856 -ne `wc -c <'mcastd/mcastd.c'`; then
  2047.     echo shar: \"'mcastd/mcastd.c'\" unpacked with wrong size!
  2048. fi
  2049. # end of 'mcastd/mcastd.c'
  2050. fi
  2051. echo shar: End of archive 4 \(of 4\).
  2052. cp /dev/null ark4isdone
  2053. MISSING=""
  2054. for I in 1 2 3 4 ; do
  2055.     if test ! -f ark${I}isdone ; then
  2056.     MISSING="${MISSING} ${I}"
  2057.     fi
  2058. done
  2059. if test "${MISSING}" = "" ; then
  2060.     echo You have unpacked all 4 archives.
  2061.     rm -f ark[1-9]isdone
  2062. else
  2063.     echo You still need to unpack the following archives:
  2064.     echo "        " ${MISSING}
  2065. fi
  2066. ##  End of shell archive.
  2067. exit 0
  2068.