diff options
author | Eric Wong <normalperson@yhbt.net> | 2012-01-11 21:46:04 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2012-01-11 21:46:04 +0000 |
commit | 301b41b6f1350806a750794d615e3468735757a6 (patch) | |
tree | 54deb2b4cb0060a54746e3635746d0f338c294f5 | |
download | cmogstored-301b41b6f1350806a750794d615e3468735757a6.tar.gz |
Nuked old history since it was missing copyright/GPLv3 notices.
-rw-r--r-- | .gitignore | 37 | ||||
-rw-r--r-- | AUTHORS | 1 | ||||
-rw-r--r-- | COPYING | 674 | ||||
-rw-r--r-- | HACKING | 25 | ||||
-rw-r--r-- | INSTALL | 396 | ||||
-rw-r--r-- | Makefile.am | 107 | ||||
-rw-r--r-- | README | 77 | ||||
-rw-r--r-- | accept.c | 15 | ||||
-rw-r--r-- | accept_loop.c | 77 | ||||
-rw-r--r-- | activeq.c | 44 | ||||
-rw-r--r-- | addrinfo.c | 20 | ||||
-rw-r--r-- | alloc.c | 191 | ||||
-rwxr-xr-x | autogen.sh | 7 | ||||
-rw-r--r-- | bind_listen.c | 89 | ||||
-rw-r--r-- | bsd/README | 7 | ||||
-rw-r--r-- | bsd/queue_safe.h | 43 | ||||
-rw-r--r-- | canonpath.c | 27 | ||||
-rw-r--r-- | cfg.c | 177 | ||||
-rw-r--r-- | cfg.h | 24 | ||||
-rw-r--r-- | cfg_parser.rl | 129 | ||||
-rw-r--r-- | cfg_validate.c | 131 | ||||
-rw-r--r-- | check.h | 24 | ||||
-rw-r--r-- | cloexec_detect.c | 41 | ||||
-rw-r--r-- | cloexec_from.c | 17 | ||||
-rw-r--r-- | close.c | 20 | ||||
-rw-r--r-- | cmogstored.c | 267 | ||||
-rw-r--r-- | cmogstored.h | 352 | ||||
-rw-r--r-- | compat_accept.h | 57 | ||||
-rw-r--r-- | configure.ac | 40 | ||||
-rw-r--r-- | coverage.mk | 34 | ||||
-rw-r--r-- | defaults.h | 6 | ||||
-rw-r--r-- | dev.c | 103 | ||||
-rw-r--r-- | die.c | 15 | ||||
-rw-r--r-- | digmd5.c | 56 | ||||
-rw-r--r-- | digmd5.h | 7 | ||||
-rw-r--r-- | doc/design.txt | 28 | ||||
-rw-r--r-- | fdmap.c | 111 | ||||
-rw-r--r-- | fdmap.h | 26 | ||||
-rw-r--r-- | file.c | 59 | ||||
-rw-r--r-- | fs.c | 159 | ||||
-rw-r--r-- | gcc.h | 15 | ||||
-rw-r--r-- | iostat.c | 118 | ||||
-rw-r--r-- | iostat.h | 28 | ||||
-rw-r--r-- | iostat_parser.rl | 87 | ||||
-rw-r--r-- | iostat_process.c | 161 | ||||
-rw-r--r-- | iov_str.h | 14 | ||||
-rw-r--r-- | listen_parser.h | 5 | ||||
-rw-r--r-- | listen_parser.rl | 50 | ||||
-rw-r--r-- | listen_parser_common.rl | 18 | ||||
-rw-r--r-- | listen_parser_internal.c | 42 | ||||
-rw-r--r-- | m4/.gitignore | 132 | ||||
-rw-r--r-- | m4/ax_pthread.m4 | 309 | ||||
-rw-r--r-- | m4/gnulib-cache.m4 | 59 | ||||
-rw-r--r-- | maxconns.c | 64 | ||||
-rw-r--r-- | mgmt.c | 224 | ||||
-rw-r--r-- | mgmt.h | 22 | ||||
-rw-r--r-- | mgmt_fn.c | 156 | ||||
-rw-r--r-- | mgmt_parser.rl | 101 | ||||
-rw-r--r-- | mnt.c | 182 | ||||
-rw-r--r-- | mnt.h | 10 | ||||
-rw-r--r-- | notify.c | 112 | ||||
-rw-r--r-- | pidfile.c | 88 | ||||
-rw-r--r-- | queue_common.h | 27 | ||||
-rw-r--r-- | queue_epoll.c | 109 | ||||
-rw-r--r-- | queue_epoll.h | 12 | ||||
-rw-r--r-- | queue_loop.c | 83 | ||||
-rw-r--r-- | queue_step.c | 16 | ||||
-rw-r--r-- | sig.c | 25 | ||||
-rw-r--r-- | svc.c | 112 | ||||
-rw-r--r-- | svc_dev.c | 140 | ||||
-rw-r--r-- | test/cfg-parser-1.c | 108 | ||||
-rwxr-xr-x | test/cmogstored-cfg.rb | 80 | ||||
-rw-r--r-- | test/fdmap-1.c | 27 | ||||
-rwxr-xr-x | test/iostat-mock.rb | 69 | ||||
-rwxr-xr-x | test/mgmt-iostat.rb | 112 | ||||
-rwxr-xr-x | test/mgmt-usage.rb | 79 | ||||
-rwxr-xr-x | test/mgmt.rb | 215 | ||||
-rw-r--r-- | test/queue-1.c | 45 | ||||
-rw-r--r-- | test/queue-epoll-1.c | 72 | ||||
-rwxr-xr-x | test/ruby-parallel.mk | 8 | ||||
-rwxr-xr-x | test/ruby-parallel.sh | 19 | ||||
-rw-r--r-- | test/thrpool-1.c | 60 | ||||
-rw-r--r-- | test/trywrite-1.c | 148 | ||||
-rw-r--r-- | test/valid-path-1.c | 19 | ||||
-rw-r--r-- | thrpool.c | 46 | ||||
-rw-r--r-- | trywrite.c | 119 | ||||
-rw-r--r-- | util.h | 49 | ||||
-rw-r--r-- | valid_path.rl | 39 | ||||
-rw-r--r-- | warn.c | 13 |
89 files changed, 7568 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6f021d2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +*-1 +*-t +*.gc?? +*.gz +*.log +.deps +.dirstamp +/*.cache +/*.in +/_build +/aclocal.m4 +/cfg_parser.c +/cmogstored +/config.guess +/config.h +/config.rpath +/config.status +/config.sub +/configure +/depcomp +/install-sh +/iostat_parser.c +/lib +/listen_parser.c +/mgmt_parser.c +/missing +/snippet +/stamp-h1 +/valid_path.c +Makefile +Makefile.in +TAGS +cmogstored-*.tar.gz +confdefs.h +conftest* +cover_db +tags @@ -0,0 +1 @@ +* Eric Wong <normalperson@yhbt.net> @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + <program> Copyright (C) <year> <name of author> + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +<http://www.gnu.org/licenses/>. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +<http://www.gnu.org/philosophy/why-not-lgpl.html>. @@ -0,0 +1,25 @@ +Bootstrapping from the git repo requires GNU autotools, Gnulib and +Ragel. You will need "autoreconf", "gnulib-tool" and "ragel" in your +$PATH. Some of the tests require "ruby" (any 1.8 or later version will +do). + +* autoconf - https://www.gnu.org/software/autoconf/ +* automake - https://www.gnu.org/software/automake/ +* Gnulib - https://www.gnu.org/software/gnulib/ +* Ragel - http://www.comp-lang.org/ragel/ (parser) +* ruby - http://www.ruby-lang.org/ +* git - http://www.git-scm.com/ + +$ git clone git://bogomips.org/cmogstored +$ cd cmogstored && ./autogen.sh + +Generally, the versions of these tools bundled with the latest +stable release of Debian GNU/Linux will work. + +For Gnulib, we will use the latest git checkouts from: + git://git.savannah.gnu.org/gnulib.git + +Hack away! + +Email patches (git format-patch + git send-email) and pull requests to +Eric Wong <normalperson@yhbt.net> @@ -0,0 +1,396 @@ +Standard autotools installation +******************************* + + ./configure && make && make install + +Full instructions (as written by the FSF at the bottom) + +Runtime Requirements +==================== + +A modern GNU/Linux system with NPTL and epoll support is _required_. + +* Linux kernel 2.6.18 or later +* GNU C Library (glibc 2.5 or later) +* iostat ("sysstat" in most package managers) + +The "stable" release of Debian GNU/Linux (currently 6.0) is the +recommended runtime environment, but CentOS 5.x (or later) shall +be fully supported. + +We will only support running on 100% Free Software platforms. + +FreeBSD 8+ support (via kqueue) is planned. + +Build Requirements +================== + +* gcc 4.1 or later - we currently depend on GCC-specific features + +See the "HACKING" document for required development tools. + + +Installation Instructions +************************* + +Copyright (C) 1994, 1995, 1996, 1999, 2000, 2001, 2002, 2004, 2005, +2006, 2007, 2008, 2009 Free Software Foundation, Inc. + + Copying and distribution of this file, with or without modification, +are permitted in any medium without royalty provided the copyright +notice and this notice are preserved. This file is offered as-is, +without warranty of any kind. + +Basic Installation +================== + + Briefly, the shell commands `./configure; make; make install' should +configure, build, and install this package. The following +more-detailed instructions are generic; see the `README' file for +instructions specific to this package. Some packages provide this +`INSTALL' file but do not implement all of the features documented +below. The lack of an optional feature in a given package is not +necessarily a bug. More recommendations for GNU packages can be found +in *note Makefile Conventions: (standards)Makefile Conventions. + + The `configure' shell script attempts to guess correct values for +various system-dependent variables used during compilation. It uses +those values to create a `Makefile' in each directory of the package. +It may also create one or more `.h' files containing system-dependent +definitions. Finally, it creates a shell script `config.status' that +you can run in the future to recreate the current configuration, and a +file `config.log' containing compiler output (useful mainly for +debugging `configure'). + + It can also use an optional file (typically called `config.cache' +and enabled with `--cache-file=config.cache' or simply `-C') that saves +the results of its tests to speed up reconfiguring. Caching is +disabled by default to prevent problems with accidental use of stale +cache files. + + If you need to do unusual things to compile the package, please try +to figure out how `configure' could check whether to do them, and mail +diffs or instructions to the address given in the `README' so they can +be considered for the next release. If you are using the cache, and at +some point `config.cache' contains results you don't want to keep, you +may remove or edit it. + + The file `configure.ac' (or `configure.in') is used to create +`configure' by a program called `autoconf'. You need `configure.ac' if +you want to change it or regenerate `configure' using a newer version +of `autoconf'. + + The simplest way to compile this package is: + + 1. `cd' to the directory containing the package's source code and type + `./configure' to configure the package for your system. + + Running `configure' might take a while. While running, it prints + some messages telling which features it is checking for. + + 2. Type `make' to compile the package. + + 3. Optionally, type `make check' to run any self-tests that come with + the package, generally using the just-built uninstalled binaries. + + 4. Type `make install' to install the programs and any data files and + documentation. When installing into a prefix owned by root, it is + recommended that the package be configured and built as a regular + user, and only the `make install' phase executed with root + privileges. + + 5. Optionally, type `make installcheck' to repeat any self-tests, but + this time using the binaries in their final installed location. + This target does not install anything. Running this target as a + regular user, particularly if the prior `make install' required + root privileges, verifies that the installation completed + correctly. + + 6. You can remove the program binaries and object files from the + source code directory by typing `make clean'. To also remove the + files that `configure' created (so you can compile the package for + a different kind of computer), type `make distclean'. There is + also a `make maintainer-clean' target, but that is intended mainly + for the package's developers. If you use it, you may have to get + all sorts of other programs in order to regenerate files that came + with the distribution. + + 7. Often, you can also type `make uninstall' to remove the installed + files again. In practice, not all packages have tested that + uninstallation works correctly, even though it is required by the + GNU Coding Standards. + + 8. Some packages, particularly those that use Automake, provide `make + distcheck', which can by used by developers to test that all other + targets like `make install' and `make uninstall' work correctly. + This target is generally not run by end users. + +Compilers and Options +===================== + + Some systems require unusual options for compilation or linking that +the `configure' script does not know about. Run `./configure --help' +for details on some of the pertinent environment variables. + + You can give `configure' initial values for configuration parameters +by setting variables in the command line or in the environment. Here +is an example: + + ./configure CC=c99 CFLAGS=-g LIBS=-lposix + + *Note Defining Variables::, for more details. + +Compiling For Multiple Architectures +==================================== + + You can compile the package for more than one kind of computer at the +same time, by placing the object files for each architecture in their +own directory. To do this, you can use GNU `make'. `cd' to the +directory where you want the object files and executables to go and run +the `configure' script. `configure' automatically checks for the +source code in the directory that `configure' is in and in `..'. This +is known as a "VPATH" build. + + With a non-GNU `make', it is safer to compile the package for one +architecture at a time in the source code directory. After you have +installed the package for one architecture, use `make distclean' before +reconfiguring for another architecture. + + On MacOS X 10.5 and later systems, you can create libraries and +executables that work on multiple system types--known as "fat" or +"universal" binaries--by specifying multiple `-arch' options to the +compiler but only a single `-arch' option to the preprocessor. Like +this: + + ./configure CC="gcc -arch i386 -arch x86_64 -arch ppc -arch ppc64" \ + CXX="g++ -arch i386 -arch x86_64 -arch ppc -arch ppc64" \ + CPP="gcc -E" CXXCPP="g++ -E" + + This is not guaranteed to produce working output in all cases, you +may have to build one architecture at a time and combine the results +using the `lipo' tool if you have problems. + +Installation Names +================== + + By default, `make install' installs the package's commands under +`/usr/local/bin', include files under `/usr/local/include', etc. You +can specify an installation prefix other than `/usr/local' by giving +`configure' the option `--prefix=PREFIX', where PREFIX must be an +absolute file name. + + You can specify separate installation prefixes for +architecture-specific files and architecture-independent files. If you +pass the option `--exec-prefix=PREFIX' to `configure', the package uses +PREFIX as the prefix for installing programs and libraries. +Documentation and other data files still use the regular prefix. + + In addition, if you use an unusual directory layout you can give +options like `--bindir=DIR' to specify different values for particular +kinds of files. Run `configure --help' for a list of the directories +you can set and what kinds of files go in them. In general, the +default for these options is expressed in terms of `${prefix}', so that +specifying just `--prefix' will affect all of the other directory +specifications that were not explicitly provided. + + The most portable way to affect installation locations is to pass the +correct locations to `configure'; however, many packages provide one or +both of the following shortcuts of passing variable assignments to the +`make install' command line to change installation locations without +having to reconfigure or recompile. + + The first method involves providing an override variable for each +affected directory. For example, `make install +prefix=/alternate/directory' will choose an alternate location for all +directory configuration variables that were expressed in terms of +`${prefix}'. Any directories that were specified during `configure', +but not in terms of `${prefix}', must each be overridden at install +time for the entire installation to be relocated. The approach of +makefile variable overrides for each directory variable is required by +the GNU Coding Standards, and ideally causes no recompilation. +However, some platforms have known limitations with the semantics of +shared libraries that end up requiring recompilation when using this +method, particularly noticeable in packages that use GNU Libtool. + + The second method involves providing the `DESTDIR' variable. For +example, `make install DESTDIR=/alternate/directory' will prepend +`/alternate/directory' before all installation names. The approach of +`DESTDIR' overrides is not required by the GNU Coding Standards, and +does not work on platforms that have drive letters. On the other hand, +it does better at avoiding recompilation issues, and works well even +when some directory options were not specified in terms of `${prefix}' +at `configure' time. + +Optional Features +================= + + If the package supports it, you can cause programs to be installed +with an extra prefix or suffix on their names by giving `configure' the +option `--program-prefix=PREFIX' or `--program-suffix=SUFFIX'. + + Some packages pay attention to `--enable-FEATURE' options to +`configure', where FEATURE indicates an optional part of the package. +They may also pay attention to `--with-PACKAGE' options, where PACKAGE +is something like `gnu-as' or `x' (for the X Window System). The +`README' should mention any `--enable-' and `--with-' options that the +package recognizes. + + For packages that use the X Window System, `configure' can usually +find the X include and library files automatically, but if it doesn't, +you can use the `configure' options `--x-includes=DIR' and +`--x-libraries=DIR' to specify their locations. + + Some packages offer the ability to configure how verbose the +execution of `make' will be. For these packages, running `./configure +--enable-silent-rules' sets the default to minimal output, which can be +overridden with `make V=1'; while running `./configure +--disable-silent-rules' sets the default to verbose, which can be +overridden with `make V=0'. + +Particular systems +================== + + On HP-UX, the default C compiler is not ANSI C compatible. If GNU +CC is not installed, it is recommended to use the following options in +order to use an ANSI C compiler: + + ./configure CC="cc -Ae -D_XOPEN_SOURCE=500" + +and if that doesn't work, install pre-built binaries of GCC for HP-UX. + + On OSF/1 a.k.a. Tru64, some versions of the default C compiler cannot +parse its `<wchar.h>' header file. The option `-nodtk' can be used as +a workaround. If GNU CC is not installed, it is therefore recommended +to try + + ./configure CC="cc" + +and if that doesn't work, try + + ./configure CC="cc -nodtk" + + On Solaris, don't put `/usr/ucb' early in your `PATH'. This +directory contains several dysfunctional programs; working variants of +these programs are available in `/usr/bin'. So, if you need `/usr/ucb' +in your `PATH', put it _after_ `/usr/bin'. + + On Haiku, software installed for all users goes in `/boot/common', +not `/usr/local'. It is recommended to use the following options: + + ./configure --prefix=/boot/common + +Specifying the System Type +========================== + + There may be some features `configure' cannot figure out +automatically, but needs to determine by the type of machine the package +will run on. Usually, assuming the package is built to be run on the +_same_ architectures, `configure' can figure that out, but if it prints +a message saying it cannot guess the machine type, give it the +`--build=TYPE' option. TYPE can either be a short name for the system +type, such as `sun4', or a canonical name which has the form: + + CPU-COMPANY-SYSTEM + +where SYSTEM can have one of these forms: + + OS + KERNEL-OS + + See the file `config.sub' for the possible values of each field. If +`config.sub' isn't included in this package, then this package doesn't +need to know the machine type. + + If you are _building_ compiler tools for cross-compiling, you should +use the option `--target=TYPE' to select the type of system they will +produce code for. + + If you want to _use_ a cross compiler, that generates code for a +platform different from the build platform, you should specify the +"host" platform (i.e., that on which the generated programs will +eventually be run) with `--host=TYPE'. + +Sharing Defaults +================ + + If you want to set default values for `configure' scripts to share, +you can create a site shell script called `config.site' that gives +default values for variables like `CC', `cache_file', and `prefix'. +`configure' looks for `PREFIX/share/config.site' if it exists, then +`PREFIX/etc/config.site' if it exists. Or, you can set the +`CONFIG_SITE' environment variable to the location of the site script. +A warning: not all `configure' scripts look for a site script. + +Defining Variables +================== + + Variables not defined in a site shell script can be set in the +environment passed to `configure'. However, some packages may run +configure again during the build, and the customized values of these +variables may be lost. In order to avoid this problem, you should set +them in the `configure' command line, using `VAR=value'. For example: + + ./configure CC=/usr/local2/bin/gcc + +causes the specified `gcc' to be used as the C compiler (unless it is +overridden in the site shell script). + +Unfortunately, this technique does not work for `CONFIG_SHELL' due to +an Autoconf bug. Until the bug is fixed you can use this workaround: + + CONFIG_SHELL=/bin/bash /bin/bash ./configure CONFIG_SHELL=/bin/bash + +`configure' Invocation +====================== + + `configure' recognizes the following options to control how it +operates. + +`--help' +`-h' + Print a summary of all of the options to `configure', and exit. + +`--help=short' +`--help=recursive' + Print a summary of the options unique to this package's + `configure', and exit. The `short' variant lists options used + only in the top level, while the `recursive' variant lists options + also present in any nested packages. + +`--version' +`-V' + Print the version of Autoconf used to generate the `configure' + script, and exit. + +`--cache-file=FILE' + Enable the cache: use and save the results of the tests in FILE, + traditionally `config.cache'. FILE defaults to `/dev/null' to + disable caching. + +`--config-cache' +`-C' + Alias for `--cache-file=config.cache'. + +`--quiet' +`--silent' +`-q' + Do not print messages saying which checks are being made. To + suppress all normal output, redirect it to `/dev/null' (any error + messages will still be shown). + +`--srcdir=DIR' + Look for the package's source code in directory DIR. Usually + `configure' can determine that directory automatically. + +`--prefix=DIR' + Use DIR as the installation prefix. *note Installation Names:: + for more details, including other options available for fine-tuning + the installation locations. + +`--no-create' +`-n' + Run the configure checks, but stop before creating any output + files. + +`configure' also accepts some other, not widely useful, options. Run +`configure --help' for more details. diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 0000000..5bff850 --- /dev/null +++ b/Makefile.am @@ -0,0 +1,107 @@ +ACLOCAL_AMFLAGS = -I m4 +AM_CPPFLAGS = -I$(top_builddir)/lib +AM_CFLAGS = $(WARN_CFLAGS) $(PTHREAD_CFLAGS) +SUBDIRS = lib + +mog_src = +mog_src += accept.c +mog_src += accept_loop.c +mog_src += activeq.c +mog_src += addrinfo.c +mog_src += alloc.c +mog_src += bind_listen.c +mog_src += bsd/queue_safe.h +mog_src += canonpath.c +mog_src += cfg.c +mog_src += cfg.h +mog_src += cfg_validate.c +mog_src += cloexec_detect.c +mog_src += cloexec_from.c +mog_src += close.c +mog_src += cmogstored.h +mog_src += compat_accept.h +mog_src += defaults.h +mog_src += dev.c +mog_src += die.c +mog_src += digmd5.c +mog_src += digmd5.h +mog_src += fdmap.c +mog_src += fdmap.h +mog_src += file.c +mog_src += fs.c +mog_src += gcc.h +mog_src += iostat.c +mog_src += iostat.h +mog_src += iostat_process.c +mog_src += iov_str.h +mog_src += listen_parser.h +mog_src += listen_parser_internal.c +mog_src += maxconns.c +mog_src += mgmt.c +mog_src += mgmt.h +mog_src += mgmt_fn.c +mog_src += mnt.c +mog_src += mnt.h +mog_src += notify.c +mog_src += pidfile.c +mog_src += queue_common.h +mog_src += queue_epoll.c +mog_src += queue_epoll.h +mog_src += queue_loop.c +mog_src += queue_step.c +mog_src += sig.c +mog_src += svc.c +mog_src += svc_dev.c +mog_src += thrpool.c +mog_src += trywrite.c +mog_src += util.h +mog_src += warn.c + +LDADD = $(LIBINTL) $(top_builddir)/lib/libgnu.a + +RLFLAGS = -G2 +RAGEL = ragel +RL_MAIN = cfg_parser.rl iostat_parser.rl listen_parser.rl mgmt_parser.rl \ + valid_path.rl +RL_CGEN = cfg_parser.c iostat_parser.c listen_parser.c mgmt_parser.c \ + valid_path.c +RL_ALL = listen_parser_common.rl $(RL_MAIN) + +cfg_parser.c: cfg_parser.rl listen_parser_common.rl +listen_parser.c: listen_parser.rl listen_parser_common.rl + +%.c: %.rl + $(AM_V_GEN)$(RAGEL) $< -C $(RLFLAGS) -o $@ + +bin_PROGRAMS = cmogstored +cmogstored_SOURCES = $(mog_src) $(RL_CGEN) cmogstored.c + +TEST_EXTENSIONS = .rb +RB_LOG_COMPILER = $(top_srcdir)/test/ruby-parallel.sh +RB_TESTS = test/mgmt-usage.rb test/mgmt.rb test/mgmt-iostat.rb \ + test/cmogstored-cfg.rb + +check_PROGRAMS = test/valid-path-1 test/trywrite-1 \ + test/cfg-parser-1 test/fdmap-1 test/thrpool-1 test/queue-1 \ + test/queue-epoll-1 + +TESTS = $(RB_TESTS) $(check_PROGRAMS) + +test_COMMON = $(mog_src) $(RL_CGEN) check.h + +test_valid_path_1_SOURCES = test/valid-path-1.c $(test_COMMON) +test_trywrite_1_SOURCES = test/trywrite-1.c $(test_COMMON) +test_cfg_parser_1_SOURCES = test/cfg-parser-1.c $(test_COMMON) +test_fdmap_1_SOURCES = test/fdmap-1.c $(test_COMMON) +test_thrpool_1_SOURCES = test/thrpool-1.c $(test_COMMON) +test_queue_1_SOURCES = test/queue-1.c $(test_COMMON) +test_queue_epoll_1_SOURCES = test/queue-epoll-1.c $(test_COMMON) + +EXTRA_DIST = $(RB_TESTS) m4 $(RL_ALL) autogen.sh HACKING \ + test/ruby-parallel.mk test/ruby-parallel.sh \ + test/iostat-superfast.sh + +TESTS_ENVIRONMENT = PATH=$(top_builddir):$$PATH + +CLEANFILES = *.gcov *.gcda *.gcno +include $(top_srcdir)/coverage.mk @@ -0,0 +1,77 @@ +cmogstored - alternative mogstored implementation for MogileFS +-------------------------------------------------------------- + +cmogstored is an _experimental_, alternative implementation of the +"mogstored" storage component of MogileFS. Unlike MogileFS itself, +cmogstored is not considered stable nor sane for production use. +cmogstored is implemented in C and does not use Perl at runtime. + +Read more about MogileFS here: http://mogilefs.org/ +cmogstored is not directly affiliated with the MogileFS project +nor any commercial interests behind MogileFS. + + +*** WARNING! *** WARNING! *** WARNING! *** WARNING! *** +*** WARNING! *** WARNING! *** WARNING! *** WARNING! *** + +cmogstored is a _completely_ new code base and likely contains fatal +bugs. There are many assertions in the code and you're likely to trip +some. + +*** WARNING! *** WARNING! *** WARNING! *** WARNING! *** +*** WARNING! *** WARNING! *** WARNING! *** WARNING! *** + + +Getting Started +=============== + +See the INSTALL document for installation and runtime requirements. + +cmogstored aims to be mostly command-line and configuration-file +compatible with the Perl mogstored. + +cmogstored does not support spawning lighttpd/Apache instances like its +Perl counterpart. + +In other cases, you can simply replace "mogstored" with "cmogstored" +in your init scripts. + + +Contact +======= + +For any and all questions, bug reports, patches, pull requests, +send plain-text email to: Eric Wong <normalperson@yhbt.net> +We may also piggy-back onto the public MogileFS mailing list: +<mogile@googlegroups.com> for public discussions. + +Downloads +========= + +Source tarballs suitable for distribution are housed here: + +* http://bogomips.org/cmogstored/files/ + +Source code +=========== + +* git clone git://bogomips.org/cmogstored.git +* cgit :: http://bogomips.org/cmogstored.git +* gitweb :: http://repo.or.cz/w/cmogstored.git + +License +======= + +cmogstored is licensed under the GNU General Public License, v3 (or later). +Distribution tarballs include LGPL code from the Gnulib project. +The bsd/*.h compatibility headers are BSD-licensed. +Contributors retain their copyrights to their respective contributions. + +This project is not affiliated with the GNU project nor FSF even though +we use their tools and licenses. + +Homepage +======== + +Eric Wong hates pretty things, so this README file is also the homepage: +http://bogomips.org/cmogstored/README diff --git a/accept.c b/accept.c new file mode 100644 index 0000000..d44f82d --- /dev/null +++ b/accept.c @@ -0,0 +1,15 @@ +#include "cmogstored.h" + +struct mog_accept * +mog_accept_init(int fd, struct mog_svc *svc, post_accept_fn fn) +{ + struct mog_fd *mfd = mog_fd_get(fd); + struct mog_accept *ac = &mfd->as.accept; + + mfd->fd = fd; + ac->post_accept_fn = fn; + ac->svc = svc; + memset(&ac->thrpool, 0, sizeof(struct mog_thrpool)); + + return ac; +} diff --git a/accept_loop.c b/accept_loop.c new file mode 100644 index 0000000..f7539ec --- /dev/null +++ b/accept_loop.c @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "compat_accept.h" + +MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) +{ + int fd; + + switch (errno) { + case ECONNABORTED: + case EINTR: + return; /* common errors, nothing we can do about it */ + case EBADF: + /* can happen due to unavoidable race at shutdown */ + case ENOTSOCK: + case EOPNOTSUPP: + return pthread_exit(NULL); + case_EAGAIN: + fd = mog_fd_of(ac)->fd; + if (set_nonblocking_flag(fd, false) != 0) { + if (errno == EBADF) + return pthread_exit(NULL); + syslog(LOG_ERR, + "failed to make fd=%d blocking: %m", fd); + } + syslog(LOG_DEBUG, "made fd=%d blocking", fd); + return; + case EMFILE: + case ENFILE: + case ENOBUFS: + case ENOMEM: + syslog(LOG_ERR, "accept4() failed with: %m"); + return; + default: + syslog(LOG_ERR, "accept4() failed with: %m"); + } +} + +static void post_accept_run(struct mog_accept *ac, int client_fd) +{ + struct mog_fd *mfd = mog_fd_get(client_fd); + + mfd->fd = client_fd; + mfd->queue_state = MOG_QUEUE_STATE_NEW; + ac->post_accept_fn(mfd, ac->svc); +} + +/* + * passed as the start_routine argument to pthread_create. + * This function may run concurrently in multiple threads. + * The design of cmogstored assumes "wake-one" behavior for blocking + * accept()/accept4() callers. We will force accept_fd into blocking + * state if O_NONBLOCK is ever set (e.g. listen socket was inherited). + */ +void *mog_accept_loop(void *arg) +{ + struct mog_accept *ac = arg; + int accept_fd = mog_fd_of(ac)->fd; + + for (;;) { + /* pthread cancellation point */ + int client_fd = mog_accept(accept_fd, NULL, NULL); + + if (client_fd >= 0) { + mog_cancel_disable(); + post_accept_run(ac, client_fd); + mog_cancel_enable(); + } else { + accept_error_check(ac); + } + } + + return NULL; +} diff --git a/activeq.c b/activeq.c new file mode 100644 index 0000000..900c51a --- /dev/null +++ b/activeq.c @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* the portable parts of mog_queue, epoll/kqueue-agnostic */ +#include "cmogstored.h" + +/* + * activeq is _only_ used from mog_queue_loop. mog_accept_loop + * never touches this, it can only safely push to the regular + * (epoll/kqueue) queue. + */ +void mog_activeq_push(struct mog_queue *q, struct mog_fd *mfd) +{ + if (mfd->queue_state == MOG_QUEUE_STATE_NEW) { + /* + * accept loop can't push into active queue without + * potential deadlock (or extra syscalls), so just push + * into the idle queue to transition mfd->queue_state + */ + mog_idleq_push(q, mfd, MOG_QEV_RW); + } else { + mog_fd_check_in(mfd); + CHECK(int, 0, pthread_mutex_lock(&q->activeq_lock)); + SIMPLEQ_INSERT_TAIL(&q->activeq_head, mfd, active_fd); + CHECK(int, 0, pthread_mutex_unlock(&q->activeq_lock)); + } +} + +struct mog_fd *mog_activeq_trytake(struct mog_queue *q) +{ + struct mog_fd *mfd; + + CHECK(int, 0, pthread_mutex_lock(&q->activeq_lock)); + mfd = SIMPLEQ_FIRST(&q->activeq_head); + if (mfd) + SIMPLEQ_REMOVE_HEAD(&q->activeq_head, active_fd); + CHECK(int, 0, pthread_mutex_unlock(&q->activeq_lock)); + + if (mfd) + mog_fd_check_out(mfd); + + return mfd; +} diff --git a/addrinfo.c b/addrinfo.c new file mode 100644 index 0000000..58e2c97 --- /dev/null +++ b/addrinfo.c @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* + * mog_listen_parse and mog_cfg_parse generate mog_addrinfo objects internally + */ +void mog_addrinfo_free(struct mog_addrinfo **aptr) +{ + struct mog_addrinfo *a = *aptr; + + if (!a) return; + + *aptr = NULL; + mog_free(a->orig); + freeaddrinfo(a->addr); + free(a); +} @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */ +static size_t l1_cache_line_size = L1_CACHE_LINE_MAX; +static pthread_key_t mog_rbuf_key; /* for small socket/pipe reads (headers) */ +static pthread_key_t mog_fsbuf_key; /* large for filesystem I/O */ +static size_t FSBUF_SIZE = BUFSIZ; /* trust stdio, BUFSIZ=8192 on glibc */ + +#define MOG_MASK(align) (~((size_t)align - 1)) +#define MOG_ALIGN(align,val) (((val) + (size_t)align - 1) & MOG_MASK(align)) + +/* + * Just in case the system page size is larger than BUFSIZ, + * crank up FSBUF_SIZE to match since mmap() will internally + * round up the allocation anyways + */ +#ifdef _SC_PAGE_SIZE +static void fsbuf_size_page_align(void) +{ + long page_size = sysconf(_SC_PAGE_SIZE); + + if (page_size > 0) { + if (page_size > FSBUF_SIZE) + FSBUF_SIZE = page_size; + else + FSBUF_SIZE = MOG_ALIGN(page_size, FSBUF_SIZE); + } +} +#else +# define fsbuf_size_page_align() for(;0;) +#endif /* _SC_PAGE_SIZE */ + + +#ifdef _SC_LEVEL1_DCACHE_LINESIZE +static size_t l1_cache_line_size_detect(void) +{ + long tmp = sysconf(_SC_LEVEL1_DCACHE_LINESIZE); + + if (tmp > 0 && tmp <= L1_CACHE_LINE_MAX) return (size_t)tmp; + + return L1_CACHE_LINE_MAX; +} +#else +# define l1_cache_line_size_detect() (L1_CACHE_LINE_MAX) +#endif /* _SC_LEVEL1_DCACHE_LINESIZE */ + + +static void fsbuf_free(void *); + +__attribute__((constructor)) static /* FIXME for non-gcc */ +void mog_alloc_init(void) +{ + fsbuf_size_page_align(); + l1_cache_line_size = l1_cache_line_size_detect(); + CHECK(int, 0, pthread_key_create(&mog_rbuf_key, free)); + CHECK(int, 0, pthread_key_create(&mog_fsbuf_key, fsbuf_free)); +} + + +void mog_free_and_null(void *ptrptr) +{ + void **tmp = ptrptr; + + free(*tmp); + *tmp = NULL; +} + +_Noreturn void mog_oom(void) +{ + write(STDERR_FILENO, "OOM\n", 4); + syslog(LOG_CRIT, "Out of memory, aborting"); + abort(); +} + + +/* + * Cache alignment is important for sub-pagesized allocations + * that can be bounced between threads. We round up the + * allocation to the cache size + */ +void *mog_cachealign(size_t size) +{ + void *ptr; + int err = posix_memalign(&ptr, l1_cache_line_size, size); + + switch (err) { + case 0: return ptr; + case ENOMEM: mog_oom(); + } + die("posix_memalign failed: %s\n", strerror(err)); +} + + +/* allocates a new mog_rbuf of +size+ bytes */ +struct mog_rbuf *mog_rbuf_new(size_t size) +{ + struct mog_rbuf *rbuf = mog_cachealign(size + sizeof(struct mog_rbuf)); + + assert(size > 0 && "tried to allocate a zero-byte mog_rbuf"); + rbuf->rsize = size; + + return rbuf; +} + +MOG_NOINLINE static struct mog_rbuf * +rbuf_replace(struct mog_rbuf *rbuf, size_t size) +{ + free(rbuf); /* free(NULL) works on modern systems */ + rbuf = mog_rbuf_new(size); + CHECK(int, 0, pthread_setspecific(mog_rbuf_key, rbuf)); + + return rbuf; +} + +/* + * retrieves the per-thread rbuf belonging to the current thread, + * ensuring it is at least capable of storing the specified size + */ +struct mog_rbuf *mog_rbuf_get(size_t size) +{ + struct mog_rbuf *rbuf = pthread_getspecific(mog_rbuf_key); + + if (rbuf && rbuf->rsize >= size) return rbuf; + + return rbuf_replace(rbuf, size); +} + +/* ensures a given rbuf is no longer associated with the current thread */ +struct mog_rbuf *mog_rbuf_defer(struct mog_rbuf *rbuf) +{ + struct mog_rbuf *cur = pthread_getspecific(mog_rbuf_key); + + if (cur == rbuf) + CHECK(int, 0, pthread_setspecific(mog_rbuf_key, NULL)); + + return rbuf; +} + +void mog_rbuf_free(struct mog_rbuf *rbuf) +{ + assert(((rbuf == NULL) || + (pthread_getspecific(mog_rbuf_key) != rbuf)) && + "trying to free undeferred rbuf"); + free(rbuf); +} + +void mog_rbuf_free_and_null(struct mog_rbuf **ptrptr) +{ + mog_rbuf_free(*ptrptr); + *ptrptr = NULL; +} + + +#ifndef MAP_ANONYMOUS +# define MAP_ANONYMOUS MAP_ANON +#endif + +MOG_NOINLINE static void * fsbuf_new(void) +{ + int prot = PROT_READ | PROT_WRITE; + int flags = MAP_ANONYMOUS | MAP_PRIVATE; + int fd = -1; + off_t offset = 0; + void *ptr = mmap(NULL, FSBUF_SIZE, prot, flags, fd, offset); + + if (ptr == MAP_FAILED) mog_oom(); + + CHECK(int, 0, pthread_setspecific(mog_fsbuf_key, ptr)); + + return ptr; +} + +/* retrieves the per-thread fsbuf and sets size to the value of FSBUF_SIZE */ +void *mog_fsbuf_get(size_t *size) +{ + void *ptr = pthread_getspecific(mog_fsbuf_key); + + *size = FSBUF_SIZE; + if (ptr) return ptr; + + return fsbuf_new(); +} + +/* destructor called at thread exit */ +static void fsbuf_free(void *ptr) +{ + if (ptr) CHECK(int, 0, munmap(ptr, FSBUF_SIZE)); +} diff --git a/autogen.sh b/autogen.sh new file mode 100755 index 0000000..f2d71ea --- /dev/null +++ b/autogen.sh @@ -0,0 +1,7 @@ +#!/bin/sh +if gnulib-tool --update && autoreconf -i +then + exit 0 +fi +cat HACKING +exit 1 diff --git a/bind_listen.c b/bind_listen.c new file mode 100644 index 0000000..fad6116 --- /dev/null +++ b/bind_listen.c @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* + * TODO + * - support accept filters in FreeBSD + * - configurable socket buffer sizes (where to put config?) + * - configurable listen() backlog (where to put config?) + */ +#ifdef TCP_DEFER_ACCEPT +static int set_accept_filter(int fd, const char *accept_filter) +{ + int val = 1; + socklen_t len = sizeof(int); + + return setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len); +} +#else /* !TCP_DEFER_ACCEPT */ +static int set_accept_filter(int fd, const char *accept_filter) +{ + /* TODO: FreeBSD accept filter */ + return 0; +} +#endif /* !TCP_DEFER_ACCEPT */ + +static int set_tcp_opts(int fd, const char *accept_filter) +{ + int val; + socklen_t len = sizeof(int); + int rc; + + val = 1; + rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, len); + if (rc < 0) return rc; + + val = 1; + rc = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, len); + if (rc < 0) return rc; + + val = 1; + rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, len); + if (rc < 0) return rc; + + if (accept_filter) { + rc = set_accept_filter(fd, accept_filter); + if (rc < 0) return rc; + + /* mgmt doesn't need large buffers */ + if (strcmp("dataready", accept_filter) == 0) { + val = 2048; + setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, len); + val = 2048; + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, len); + } + } + + return rc; +} + +int mog_bind_listen(struct addrinfo *r, const char *accept_filter) +{ + int fd = -1; + + for (; r; r = r->ai_next) { + fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol); + if (fd < 0) + continue; + + /* + * We'll need to unset FD_CLOEXEC in the child for upgrades + * Leave FD_CLOEXEC set because we fork+exec iostat(1) + * frequently. We can't guarantee SOCK_CLOEXEC works + * everywhere yet (in 2012). + */ + if (set_cloexec_flag(fd, true) == 0 && + set_tcp_opts(fd, accept_filter) == 0 && + bind(fd, r->ai_addr, r->ai_addrlen) == 0 && + listen(fd, 1024) == 0) + break; + + PRESERVE_ERRNO( close(fd) ); + fd = -1; + } + + return fd; +} diff --git a/bsd/README b/bsd/README new file mode 100644 index 0000000..f7823a4 --- /dev/null +++ b/bsd/README @@ -0,0 +1,7 @@ +We depend on headers common to *BSDs but not standardized in POSIX. +GNU/Linux distributions sometimes package these headers, but they +are sometimes out-of-date and missing macros we rely on. + +All headers in this (bsd/) directory are under their respective +(3-clause BSD) license and not covered by the GPL that applies +to the rest of cmogstored. diff --git a/bsd/queue_safe.h b/bsd/queue_safe.h new file mode 100644 index 0000000..7b5cbfd --- /dev/null +++ b/bsd/queue_safe.h @@ -0,0 +1,43 @@ +/* + * this file adds *_SAFE macros that may be missing from the stock + * sys/queue.h header. The original (BSD) license terms are + * retained (see below). + */ +#include <sys/queue.h> + +/*- + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#ifndef LIST_FOREACH_SAFE +#define LIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = LIST_FIRST((head)); \ + (var) && ((tvar) = LIST_NEXT((var), field), 1); \ + (var) = (tvar)) +#endif /* LIST_FOREACH_SAFE */ + +/* add more as we need them */ diff --git a/canonpath.c b/canonpath.c new file mode 100644 index 0000000..83fc569 --- /dev/null +++ b/canonpath.c @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode) +{ + char *p = canonicalize_filename_mode(path, canon_mode); + + if (!p && errno == ENOMEM) + mog_oom(); + + return p; /* may be null if errors */ +} + +char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode) +{ + char *p = mog_canonpath(path, canon_mode); + + if (p) return p; + + if (errno) + die("`%s' failed to resolve: %s\n", path, strerror(errno)); + else + die("`%s' failed to resolve\n", path); +} @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "cfg.h" + +static Hash_table *all_cfg; /* we support multiple configs -> svcs */ +struct mog_cfg mog_cli; +bool mog_cfg_multi; + +static void cfg_free_internal(struct mog_cfg *cfg) +{ + mog_free(cfg->docroot); + mog_free(cfg->pidfile); + mog_free(cfg->configfile); + mog_free(cfg->server); + mog_addrinfo_free(&cfg->mgmtlisten); + mog_addrinfo_free(&cfg->httplisten); + mog_addrinfo_free(&cfg->httpgetlisten); + /* let svc.c deal with cfg->svc for now */ +} + +static void cfg_free(void *ptr) +{ + struct mog_cfg *cfg = ptr; + cfg_free_internal(cfg); + free(cfg); +} + +static size_t cfg_hash(const void *x, size_t tablesize) +{ + const struct mog_cfg *cfg = x; + + return hash_string(cfg->configfile, tablesize); +} + +static bool cfg_cmp(const void *a, const void *b) +{ + const struct mog_cfg *cfg_a = a; + const struct mog_cfg *cfg_b = b; + + return strcmp(cfg_a->configfile, cfg_b->configfile) == 0; +} + +static void cfg_atexit(void) +{ + hash_free(all_cfg); + cfg_free_internal(&mog_cli); +} + +__attribute__((constructor)) static +void mog_cfg_init(void) +{ + all_cfg = hash_initialize(7, NULL, cfg_hash, cfg_cmp, cfg_free); + if (!all_cfg) + mog_oom(); + + atexit(cfg_atexit); +} + +struct mog_cfg * mog_cfg_new(const char *configfile) +{ + struct mog_cfg *cfg = xzalloc(sizeof(struct mog_cfg)); + + cfg->configfile = mog_canonpath_die(configfile, CAN_EXISTING); + + switch (hash_insert_if_absent(all_cfg, cfg, NULL)) { + case 0: + cfg_free(cfg); + cfg = NULL; + case 1: break; + default: mog_oom(); + } + + return cfg; +} + +int mog_cfg_load(struct mog_cfg *cfg) +{ + struct stat sb; + char *buf = NULL; + ssize_t r; + int rc = -1; + int fd = open(cfg->configfile, O_RDONLY); + + if (fd < 0) goto out; + if (fstat(fd, &sb) < 0) goto out; + + buf = xmalloc(sb.st_size + strlen("\n")); + + errno = 0; + r = read(fd, buf, sb.st_size); + if (r != sb.st_size) + die("read(..., %ld) failed on %s: %s\n", + (long)sb.st_size, cfg->configfile, + errno ? strerror(errno) : "EOF"); + + buf[r] = '\n'; /* parser _needs_ a trailing newline */ + rc = mog_cfg_parse(cfg, buf, r + 1); +out: + PRESERVE_ERRNO(do { + if (buf) free(buf); + if (fd >= 0) close(fd); + } while(0)); + + return rc; +} + +static size_t nr_config(void) +{ + return all_cfg ? hash_get_n_entries(all_cfg) : 0; +} + +static char multi_cfg_err[] = +"--multi must be set if using multiple --config/-c switches"; + +void mog_cfg_validate_or_die(struct mog_cfg *cli) +{ + switch (nr_config()) { + case 0: + mog_cfg_merge_defaults(cli); + assert(cli->configfile == NULL && + "BUG: --config was set but not detected"); + break; /* CLI-only */ + case 1: + hash_do_for_each(all_cfg, mog_cfg_validate_one, cli); + mog_cfg_merge_defaults(cli); + break; + default: /* multiple config files */ + mog_cfg_die_if_cli_set(cli); + hash_do_for_each(all_cfg, mog_cfg_validate_multi, cli); + if (!mog_cfg_multi) + die(multi_cfg_err); + } + mog_set_maxconns(cli->maxconns); +} + +static int bind_or_die(struct mog_addrinfo *a, const char *accept_filter) +{ + int fd; + + if (a == NULL) return -1; + fd = mog_bind_listen(a->addr, accept_filter); + if (fd >= 0) return fd; + + die("addr=%s failed to bind+listen: %s\n", a->orig, strerror(errno)); +} + +static bool svc_from_cfg(void *cfg_ptr, void *ignored) +{ + struct mog_cfg *cfg = cfg_ptr; + struct mog_svc *svc; + + assert(cfg->docroot && "no docroot specified"); + svc = mog_svc_new(cfg->docroot); + if (!svc) + die("failed to load svc from docroot=%s\n", cfg->docroot); + + svc->mgmt_fd = bind_or_die(cfg->mgmtlisten, "dataready"); + svc->http_fd = bind_or_die(cfg->httplisten, "httpready"); + svc->httpget_fd = bind_or_die(cfg->httpgetlisten, "httpready"); + + return true; +} + +void mog_cfg_svc_start_or_die(struct mog_cfg *cli) +{ + switch (nr_config()) { + case 0: + case 1: + svc_from_cfg(cli, NULL); + break; + default: + hash_do_for_each(all_cfg, svc_from_cfg, NULL); + } +} @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +struct mog_svc; +struct mog_cfg { + const char *docroot; + int daemonize; + unsigned long maxconns; + const char *pidfile; + const char *configfile; + const char *server; + struct mog_addrinfo *httplisten; + struct mog_addrinfo *mgmtlisten; + struct mog_addrinfo *httpgetlisten; /* unique to cmogstored */ + struct mog_svc *svc; +}; + +void mog_cfg_validate_or_die(struct mog_cfg *cli); +bool mog_cfg_validate_one(void *ent, void *cli); +bool mog_cfg_validate_multi(void *ent, void *cli); +void mog_cfg_die_if_cli_set(struct mog_cfg *); +void mog_cfg_merge_defaults(struct mog_cfg *); +void mog_cfg_check_server(struct mog_cfg *); diff --git a/cfg_parser.rl b/cfg_parser.rl new file mode 100644 index 0000000..843a216 --- /dev/null +++ b/cfg_parser.rl @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* + * parses config files used by the original (Perl) mogstored + */ +#include "cmogstored.h" +#include "cfg.h" +#include "listen_parser.h" + +static char *mystrdup(const char *key, char *mark_beg, const char *p) +{ + size_t mark_len = p - mark_beg; + mark_beg[mark_len] = 0; + if (strlen(mark_beg) != mark_len) { + syslog(LOG_ERR, "NUL character in `%s' value", key); + return NULL; + } + + return xstrdup(mark_beg); +} + +%%{ + machine cfg_parser; + include listen_parser_common "listen_parser_common.rl"; + + eor = '\n'; + ignored_line := ( (any-'\n')* eor ) @ { fgoto main; }; + lws = (space-'\n'); + comment = lws* ("#"(any-'\n')*); + comment_line = comment eor; + sep = lws* "=" lws*; + + path = ((any - space)+) > { mark_beg = fpc; }; + + mgmtlisten = lws* "mgmtlisten" sep listen comment* (eor) > { + a = mog_listen_parse_internal(mark_beg, mark_len, + port_beg, port_len); + if (!a) return -1; + cfg->mgmtlisten = a; + }; + + httplisten = lws* "httplisten" sep listen comment* eor > { + a = mog_listen_parse_internal(mark_beg, mark_len, + port_beg, port_len); + if (!a) return -1; + cfg->httplisten = a; + }; + httpgetlisten = lws* "httpgetlisten" sep listen comment* eor > { + a = mog_listen_parse_internal(mark_beg, mark_len, + port_beg, port_len); + if (!a) return -1; + cfg->httpgetlisten = a; + }; + docroot = lws* "docroot" sep path (comment* eor) > { + /* delay realpath(3) until svc init, symlinks may change */ + cfg->docroot = mystrdup("docroot", mark_beg, fpc); + if (!cfg->docroot) return -1; + }; + pidfile = lws* "pidfile" sep path (comment* eor) > { + /* delay realpath(3) until svc init, symlinks may change */ + cfg->pidfile = mystrdup("pidfile", mark_beg, fpc); + if (!cfg->pidfile) return -1; + }; + daemonize = lws* "daemonize" comment* eor > { cfg->daemonize = 1; }; + maxconns = lws* "maxconns" sep + (digit+) > { mark_beg = fpc; } + (comment* eor) > { + mark_len = fpc - mark_beg; + mark_beg[mark_len] = 0; + errno = 0; + cfg->maxconns = strtoul(mark_beg, NULL, 10); + if (errno) { + syslog(LOG_ERR, + "failed to parse: maxconns = %s - %m", + mark_beg); + return -1; + } + }; + server = lws* "server" sep + (alpha+) > { mark_beg = fpc; } + (comment* eor) > { + cfg->server = mystrdup("server", mark_beg, fpc); + if (!cfg->server) return -1; + mog_cfg_check_server(cfg); + }; + serverbin = lws* "serverbin" sep path + (comment* eor) > { + char *tmp = mystrdup("serverbin", mark_beg, fpc); + if (!tmp) return -1; + warn("W: serverbin = %s ignored\n", tmp); + free(tmp); + }; + main := (mgmtlisten | httplisten | httpgetlisten | + pidfile | docroot | daemonize | maxconns | + server | serverbin ) + + $! { + fhold; + fgoto ignored_line; + }; +}%% + +%% write data; + +/* this is a one-shot parser, no need to stream local config files */ +int mog_cfg_parse(struct mog_cfg *cfg, char *buf, size_t len) +{ + char *p, *pe, *eof = NULL; + char *mark_beg = NULL; + char *port_beg = NULL; + size_t mark_len = 0; + size_t port_len = 0; + struct mog_addrinfo *a; + int cs; + + %% write init; + + p = buf; + pe = buf + len; + + %% write exec; + + if (cs == cfg_parser_error) + return -1; + + assert(p <= pe && "buffer overflow after cfg parse"); + return 0; +} diff --git a/cfg_validate.c b/cfg_validate.c new file mode 100644 index 0000000..7c65c6e --- /dev/null +++ b/cfg_validate.c @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "cfg.h" + +static void +paths_eql_or_die( + const char *param, const char *a_orig, const char *b_orig, + enum canonicalize_mode_t canon_mode) +{ + char *a = mog_canonpath_die(a_orig, canon_mode); + char *b = mog_canonpath_die(b_orig, canon_mode); + int ok = (strcmp(a, b) == 0); + + free(a); + free(b); + + if (ok) return; + + die("conflicting path values for %s: `%s' != `%s'\n", + param, a_orig, b_orig); +} + +static void merge_addr(struct mog_addrinfo **dst, struct mog_addrinfo *src) +{ + if (!*dst && src) *dst = mog_listen_parse(src->orig); +} + +static void merge_str(const char **dst, const char *src) +{ + if (!*dst && src) *dst = xstrdup(src); +} + +static void validate_merge_common(struct mog_cfg *ent, struct mog_cfg *cli) +{ + merge_addr(&cli->mgmtlisten, ent->mgmtlisten); + merge_addr(&cli->httplisten, ent->httplisten); + merge_addr(&cli->httpgetlisten, ent->httpgetlisten); + + /* multiple config files can all specify the same pidfile */ + if (cli->pidfile && ent->pidfile) + paths_eql_or_die("pidfile", cli->pidfile, ent->pidfile, + CAN_ALL_BUT_LAST); + merge_str(&cli->pidfile, ent->pidfile); + + cli->maxconns += ent->maxconns; + cli->daemonize |= ent->daemonize; +} + +bool mog_cfg_validate_one(void *ent_ptr, void *cli_ptr) +{ + struct mog_cfg *ent = ent_ptr; + struct mog_cfg *cli = cli_ptr; + + /* + * in the mixed single config file + CLI usage case, ensure docroot + * is the same (or only specified in one + */ + if (cli->docroot && ent->docroot) + paths_eql_or_die("docroot", cli->docroot, ent->docroot, + CAN_EXISTING); + merge_str(&cli->docroot, ent->docroot); + + validate_merge_common(ent, cli); + + return true; +} + +bool mog_cfg_validate_multi(void *ent_ptr, void *cli_ptr) +{ + struct mog_cfg *ent = ent_ptr; + struct mog_cfg *cli = cli_ptr; + + if (!ent->configfile) + die("BUG: no config path"); + if (!ent->httplisten && !ent->mgmtlisten && !ent->httpgetlisten) + die("no listeners in --config=%s\n", ent->configfile); + if (!ent->docroot) + die("no docroot in --config=%s\n", ent->configfile); + + validate_merge_common(ent, cli); + + return true; +} + +static void die_if_set(const void *a, const char *sw) +{ + if (!a) return; + die("--%s may not be used with multiple --config files\n", sw); +} + +/* + * some settings we can't make sense of when supporting multiple + * config files + */ +void mog_cfg_die_if_cli_set(struct mog_cfg *cli) +{ + die_if_set(cli->docroot, "docroot"); + die_if_set(cli->httplisten, "httplisten"); + die_if_set(cli->mgmtlisten, "mgmtlisten"); + + /* we don't actually support --httpgetlisten on the CLI ... */ + die_if_set(cli->httpgetlisten, "httpgetlisten"); +} + +void mog_cfg_merge_defaults(struct mog_cfg *cli) +{ + if (!cli->docroot) + cli->docroot = xstrdup(MOG_DEFAULT_DOCROOT); + + /* default listeners */ + if (!cli->httplisten && !cli->httpgetlisten && !cli->mgmtlisten) { + cli->httplisten = mog_listen_parse(MOG_DEFAULT_HTTPLISTEN); + cli->mgmtlisten = mog_listen_parse(MOG_DEFAULT_MGMTLISTEN); + } +} + +void mog_cfg_check_server(struct mog_cfg *cfg) +{ + const char *s = cfg->server; + + if (!s) return; + + if (strcmp(s, "none") == 0) return; + if (strcmp(s, "perlbal") == 0) + warn("W: using internal HTTP for 'server = perlbal' instead\n"); + else + die("E: 'server = %s' not understood by cmogstored\n", s); +} @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* This header is only used by C test programs */ +#ifdef NDEBUG +# undef NDEBUG +#endif +#include "cmogstored.h" +#include <sys/ioctl.h> + +static inline void pipe_or_die(int *fds) +{ + int rc = pipe(fds); + + assert(rc == 0 && "pipe failed"); +} + +static inline void close_pipe(int *fds) +{ + assert(0 == close(fds[0]) && "close(fd[0]) failed"); + assert(0 == close(fds[1]) && "close(fd[1]) failed"); +} diff --git a/cloexec_detect.c b/cloexec_detect.c new file mode 100644 index 0000000..e9ca8f9 --- /dev/null +++ b/cloexec_detect.c @@ -0,0 +1,41 @@ +#include "cmogstored.h" +bool mog_cloexec_atomic; + +/* + * The presence of O_CLOEXEC in headers doesn't mean the kernel supports it + */ +#if defined(O_CLOEXEC) && (O_CLOEXEC != 0) && defined(SOCK_CLOEXEC) +__attribute__((constructor)) static +void mog_cloexec_detect(void) +{ + int flags; + int fd = open("/dev/null", O_RDONLY|O_CLOEXEC); + + if (fd < 0) { + if (errno == EINVAL) goto out; + die("open(/dev/null) failed: %s", strerror(errno)); + } + + flags = fcntl(fd, F_GETFD); + if (flags != -1) + mog_cloexec_atomic = ((flags & FD_CLOEXEC) == FD_CLOEXEC); + mog_close(fd); + + if (! mog_cloexec_atomic) goto out; + + /* try to ensure sockets are sane, too */ + fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (fd < 0) { + if (errno == EINVAL) goto out; + die("socket(AF_INET, ...) failed: %s", strerror(errno)); + } + flags = fcntl(fd, F_GETFD); + if (flags != -1) + mog_cloexec_atomic = ((flags & FD_CLOEXEC) == FD_CLOEXEC); + mog_close(fd); + +out: + if (!mog_cloexec_atomic) + warn("close-on-exec is NOT atomic\n"); +} +#endif /* no O_CLOEXEC at all */ diff --git a/cloexec_from.c b/cloexec_from.c new file mode 100644 index 0000000..efa6e30 --- /dev/null +++ b/cloexec_from.c @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +void mog_cloexec_from(int lowfd) +{ + int fd; + int last_good = lowfd; + + for (fd = lowfd; fd < INT_MAX; fd++) { + if (fcntl(fd, F_SETFD, FD_CLOEXEC) == 0) + last_good = fd; + if ((last_good + 1024) < fd) + break; + } +} @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +void mog_close(int fd) +{ + if (close(fd) == 0) + return; + + switch (errno) { + case EINTR: return; /* nothing we can do */ + case EBADF: + /* EBADF would be a disaster since we use threads */ + syslog(LOG_CRIT, "BUG: attempted to close(fd=%d)", fd); + assert(0 && fd && "won't attempt to continue on bad close()"); + default: + syslog(LOG_ERR, "close(fd=%d) failed: %m", fd); + } +} diff --git a/cmogstored.c b/cmogstored.c new file mode 100644 index 0000000..92aa308 --- /dev/null +++ b/cmogstored.c @@ -0,0 +1,267 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "cfg.h" +#define THIS "cmogstored" +static char summary[] = THIS " -- (unofficial) MogileFS storage daemon"; +const char *argp_program_bug_address = PACKAGE_BUGREPORT; +const char *argp_program_version = THIS" "PACKAGE_VERSION; +static sig_atomic_t sigchld_nr; +static sig_atomic_t do_exit; + +#define CFG_KEY(f) -((int)offsetof(struct mog_cfg,f) + 1) +static struct argp_option options[] = { + { .name = "daemonize", .key = 'd', + .doc = "Daemonize" }, + { .name = "config", .key = CFG_KEY(configfile), + .arg = "<file>", + .doc = "Set config file (default is "MOG_DEFAULT_CONFIGFILE")" }, + { .name = "httplisten", .key = CFG_KEY(httplisten), + .arg = "<ip:port>", + .doc = "IP/Port HTTP server listens on" }, + { .name = "mgmtlisten", .key = CFG_KEY(mgmtlisten), + .arg = "<ip:port>", + .doc = "IP/Port management/sidechannel listens on" }, + { .name = "docroot", .key = CFG_KEY(docroot), + .arg = "<path>", + .doc = "Docroot above device mount points. " + "Defaults to "MOG_DEFAULT_DOCROOT + }, + { .name = "maxconns", .key = CFG_KEY(maxconns), + .arg = "<number>", + .doc = "Number of simultaneous clients to serve. " + "Default " MOG_STR(MOG_DEFAULT_MAXCONNS) }, + { .name = "pidfile", .key = CFG_KEY(pidfile), + .arg = "<path>", + .doc = "path to PID file" }, + { .name = "server", .key = CFG_KEY(server), .flags = OPTION_HIDDEN }, + { + /* hidden for now, don't break compat with Perl mogstored */ + .name = "multi", .key = 'M', .flags = OPTION_HIDDEN + }, + { NULL } +}; + +static void new_cfg_or_die(const char *config) +{ + struct mog_cfg *cfg = mog_cfg_new(config); + + if (!cfg) die("invalid (or duplicate) config=%s\n", config); + if (mog_cfg_load(cfg) == 0) return; + + die("failed to load config=%s (%s)\n", + config, errno ? strerror(errno) : "parser error"); +} + +static void maxconns_or_die(struct mog_cfg *cfg, const char *s) +{ + char *end; + + errno = 0; + cfg->maxconns = strtoul(s, &end, 10); + if (errno) + die("failed to parse maxconns=%s (%s)\n", s, strerror(errno)); + if (*end) + die("failed to parse maxconns=%s (invalid character: %c)\n", + s, *end); +} + +static void addr_or_die(struct mog_addrinfo **dst, const char *key, char *s) +{ + *dst = mog_listen_parse(s); + if (!*dst) + die("failed to parse %s=%s\n", key, s); +} + +static error_t parse_opt(int key, char *arg, struct argp_state *state) +{ + struct mog_cfg *cfg = state->input; + int rv = 0; + + switch (key) { + case 'd': cfg->daemonize = 1; break; + case CFG_KEY(docroot): cfg->docroot = xstrdup(arg); break; + case CFG_KEY(pidfile): cfg->pidfile = xstrdup(arg); break; + case CFG_KEY(configfile): new_cfg_or_die(arg); break; + case CFG_KEY(maxconns): maxconns_or_die(cfg, arg); break; + case CFG_KEY(httplisten): + addr_or_die(&cfg->httplisten, "httplisten", arg); + break; + case CFG_KEY(mgmtlisten): + addr_or_die(&cfg->mgmtlisten, "mgmtlisten", arg); + break; + case CFG_KEY(server): + cfg->server = xstrdup(arg); + mog_cfg_check_server(cfg); + break; + case 'M': mog_cfg_multi = true; break; + case ARGP_KEY_ARG: + argp_usage(state); + case ARGP_KEY_END: + break; + default: + rv = ARGP_ERR_UNKNOWN; + } + + return rv; +} + +static int open_pidfile(void) +{ + pid_t cur_pid = -1; + int pid_fd = mog_pidfile_open(mog_cli.pidfile, &cur_pid); + + if (pid_fd >= 0) + return pid_fd; + if (errno == EAGAIN) + die("already running on PID: %d\n", (int)cur_pid); + else + die("mog_pidfile_open failed: %s\n", strerror(errno)); + return -1; +} + +MOG_NOINLINE static void setup(int argc, char *argv[]) +{ + int pid_fd = -1; + static struct argp argp = { options, parse_opt, NULL, summary }; + int mask = 0; + + argp_parse(&argp, argc, argv, 0, NULL, &mog_cli); + mog_cfg_validate_or_die(&mog_cli); + mog_cfg_svc_start_or_die(&mog_cli); + + if (mog_cli.pidfile) pid_fd = open_pidfile(); + + /* TODO: make logging configurable */ + { + openlog(THIS, LOG_ODELAY|LOG_PID, LOG_DAEMON); + mask |= LOG_MASK(LOG_EMERG); + mask |= LOG_MASK(LOG_ALERT); + mask |= LOG_MASK(LOG_CRIT); + mask |= LOG_MASK(LOG_ERR); + mask |= LOG_MASK(LOG_WARNING); + mask |= LOG_MASK(LOG_NOTICE); + mask |= LOG_MASK(LOG_INFO); + /* mask |= LOG_MASK(LOG_DEBUG); */ + setlogmask(mask); + } + + if (mog_cli.daemonize) daemon(0, 0); + + if (pid_fd >= 0 && mog_pidfile_commit(pid_fd) < 0) + syslog(LOG_ERR, + "failed to write pidfile(%s): %m. continuing...", + mog_cli.pidfile); +} + +/* Hash iterator function */ +static bool svc_start_each(void *svcptr, void *qptr) +{ + struct mog_svc *svc = svcptr; + struct mog_queue *q = qptr; + struct mog_accept *ac; + + mog_svc_scandev(svc, mog_dev_mkusage); + svc->queue = q; + + if (svc->mgmt_fd >= 0) { + ac = mog_accept_init(svc->mgmt_fd, svc, mog_mgmt_post_accept); + mog_thrpool_start(&ac->thrpool, 8, mog_accept_loop, ac); + } + + return true; +} + +static bool svc_quit_each(void *svcptr, void *ignored) +{ + struct mog_svc *svc = svcptr; + struct mog_fd *mfd; + struct mog_accept *ac; + + if (svc->mgmt_fd >= 0) { + mfd = mog_fd_get(svc->mgmt_fd); + ac = &mfd->as.accept; + mog_thrpool_quit(&ac->thrpool); + } + + return true; +} + +_Noreturn static void cmogstored_exit(void) +{ + /* call atexit() handlers and make valgrind happy */ + mog_svc_each(svc_quit_each, NULL); + mog_fdmap_stop_queues(); + exit(EXIT_SUCCESS); +} + +static void cmogstored_wakeup_handler(int signum) +{ + switch (signum) { + case SIGCHLD: ++sigchld_nr; break; + case SIGQUIT: + case SIGTERM: + case SIGINT: + do_exit = signum; + } + mog_notify(MOG_NOTIFY_SIGNAL); +} + +static void cmogstored_siginit(void) +{ + struct sigaction sa; + + memset(&sa, 0, sizeof(struct sigaction)); + CHECK(int, 0, sigemptyset(&sa.sa_mask) ); + + sa.sa_handler = SIG_IGN; + CHECK(int, 0, sigaction(SIGPIPE, &sa, NULL)); + + sa.sa_handler = cmogstored_wakeup_handler; + CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + CHECK(int, 0, sigaction(SIGINT, &sa, NULL)); + + /* TODO: graceful shutdown, upgrade, reload */ + CHECK(int, 0, sigaction(SIGQUIT, &sa, NULL)); + CHECK(int, 0, sigaction(SIGHUP, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR2, &sa, NULL)); + + sa.sa_flags = SA_NOCLDSTOP; + CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL)); +} + +static void main_loop(struct mog_queue *q) +{ + for (;;) { + mog_notify_wait(); + if (do_exit) + cmogstored_exit(); + if (sigchld_nr > 0) { + mog_mnt_refresh(); + mog_iostat_spawn(q); + sigchld_nr = 0; + } + } +} + +int main(int argc, char *argv[]) +{ + struct mog_queue *q; + + mog_notify_init(); + mog_mnt_refresh(); + setup(argc, argv); + + q = mog_queue_new(); + mog_intr_disable(); + cmogstored_siginit(); + mog_thrpool_start(&q->thrpool, 8, mog_queue_loop, q); + mog_svc_each(svc_start_each, q); + mog_iostat_spawn(q); + main_loop(q); + + return 0; +} diff --git a/cmogstored.h b/cmogstored.h new file mode 100644 index 0000000..b9f280b --- /dev/null +++ b/cmogstored.h @@ -0,0 +1,352 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* + * common headers, macros, and static inline functions for the entire project + * + * Internal APIs are very much in flux and subject to change frequently + */ +#define _GNU_SOURCE +#define _XOPEN_SOURCE 700 +#define _POSIX_C_SOURCE 200809L +#include "config.h" +#include <pthread.h> +#include <sys/uio.h> +#include <sys/mman.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/socket.h> +#include <signal.h> +#include <stddef.h> +#include <stdint.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include <assert.h> +#include <errno.h> +#include <limits.h> +#include <dirent.h> +#include <syslog.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/statvfs.h> +#include <time.h> +#include "bsd/queue_safe.h" +#include "lib/argp.h" +#include "lib/hash.h" +#include "lib/xalloc.h" +#include "lib/xvasprintf.h" +#include "lib/nonblocking.h" +#include "lib/canonicalize.h" +#include "lib/verify.h" +#include "lib/cloexec.h" +#include "lib/mountlist.h" +#include "gcc.h" +#include "util.h" +#include "defaults.h" +#include "iostat.h" +#include "mnt.h" + +#define MOG_WR_ERROR ((void *)-1) +#define MOG_IOSTAT (MAP_FAILED) +#define MOG_FD_MAX (INT_MAX-1) + +enum mog_write_state { + MOG_WRSTATE_ERR = -1, + MOG_WRSTATE_DONE = 0, + MOG_WRSTATE_BUSY = 1 +}; + +enum mog_parser_state { + MOG_PARSER_ERROR = -1, + MOG_PARSER_DONE = 0, + MOG_PARSER_CONTINUE = 1 +}; + +enum mog_next { + MOG_NEXT_CLOSE = 0, + MOG_NEXT_ACTIVE, + MOG_NEXT_WAIT_RD, + MOG_NEXT_WAIT_WR, + MOG_NEXT_IGNORE /* for iostat */ +}; + +struct mog_wbuf; +struct mog_dev { + dev_t st_dev; + uint32_t devid; + char prefix[FLEXIBLE_ARRAY_MEMBER]; +}; + +struct mog_rbuf { + size_t rsize; + char rptr[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * overhead based on the malloc implementation found in glibc, + * it's unlikely another memalign implentation would have _more_ overhead... + */ +#define MOG_MEMALIGN_OVERHEAD (sizeof(size_t) * 2) +#define MOG_RBUF_OVERHEAD (sizeof(struct mog_rbuf) + MOG_MEMALIGN_OVERHEAD) + +struct mog_mgmt; +struct mog_mgmt { + int cs; + struct mog_fd *forward; + size_t offset; + size_t mark[2]; + struct mog_rbuf *rbuf; + struct mog_wbuf *wbuf; /* uncommonly needed */ + struct mog_svc *svc; + LIST_ENTRY(mog_mgmt) subscribed; +}; + +struct mog_queue; +struct mog_svc { + int dirfd; + const char *docroot; + + /* private */ + DIR *dir; + Hash_table *by_st_dev; + pthread_mutex_t devstats_lock; + struct iovec devstats; + struct mog_queue *queue; + LIST_HEAD(mgmt_head, mog_mgmt) devstats_subscribers; + int http_fd; + int httpget_fd; + int mgmt_fd; +}; + +struct mog_http { + int cs; + struct mog_fd *forward; + size_t offset; + size_t mark[2]; + struct mog_rbuf *rbuf; + struct mog_wbuf *wbuf; /* uncommonly needed */ +}; + +struct mog_thrpool { + size_t n_threads; + pthread_t *threads; +}; + +struct mog_fd; + +/* + * this is a two-part queue: epoll or kqueue for the idle blocking part, + * and SIMPLEQ is the non-blocking part. + * mog_queue objects can be shared by any number of mog_svcs + */ +struct mog_queue { + int queue_fd; /* epoll or kqueue */ + SIMPLEQ_HEAD(active_fd, mog_fd) activeq_head; + pthread_mutex_t activeq_lock; /* may be pointer for non-NPTL */ + struct mog_thrpool thrpool; +}; + +/* accept.c */ +typedef void (*post_accept_fn)(struct mog_fd *, struct mog_svc *); +struct mog_accept { + struct mog_svc *svc; + post_accept_fn post_accept_fn; + struct mog_thrpool thrpool; +}; +struct mog_accept * mog_accept_init(int fd, struct mog_svc *, post_accept_fn); +void * mog_accept_loop(void *ac); + +struct md5_ctx; +struct mog_file { + off_t fsize; + off_t foff; + char *path; + size_t pathlen; + char *tmppath; + void *mmptr; + struct mog_svc *svc; + struct md5_ctx *md5; +}; + +enum mog_queue_state { + MOG_QUEUE_STATE_NEW = 0, + MOG_QUEUE_STATE_OLD = 1 +}; + +#include "queue_epoll.h" + +enum mog_fd_type { + MOG_FD_TYPE_UNUSED = 0, + MOG_FD_TYPE_HTTP, + MOG_FD_TYPE_MGMT, + MOG_FD_TYPE_ACCEPT, + MOG_FD_TYPE_FILE, + MOG_FD_TYPE_QUEUE, + MOG_FD_TYPE_IOSTAT, + MOG_FD_TYPE_SVC /* for docroot dirfd */ +}; + +/* fdmap.c */ +struct mog_fd { + enum mog_fd_type fd_type; + enum mog_queue_state queue_state; + int fd; /* redundant ... */ + int in_queue; + SIMPLEQ_ENTRY(mog_fd) active_fd; + union { + struct mog_accept accept; + struct mog_mgmt mgmt; + struct mog_http http; + struct mog_file file; + struct mog_queue queue; + struct mog_iostat iostat; + struct mog_svc *svc; + } as; +}; +struct mog_fd *mog_fd_get(int fd); +void mog_fd_put(struct mog_fd *, int fd); +void mog_fdmap_stop_queues(void); +#include "fdmap.h" + +/* alloc.c */ +void mog_free_and_null(void *ptrptr); +_Noreturn void mog_oom(void); +void *mog_cachealign(size_t size); +struct mog_rbuf *mog_rbuf_new(size_t size); +struct mog_rbuf *mog_rbuf_get(size_t size); +struct mog_rbuf *mog_rbuf_defer(struct mog_rbuf *rbuf); +void mog_rbuf_free(struct mog_rbuf *); +void mog_rbuf_free_and_null(struct mog_rbuf **); +void *mog_fsbuf_get(size_t *size); + +/* die.c */ +_Noreturn void die(const char *fmt, ...) MOG_PRINTF; + +/* warn.c */ +void warn(const char *fmt, ...) MOG_PRINTF; + +/* maxconns.c */ +void mog_set_maxconns(unsigned long); + +/* svc.c */ +struct mog_svc *mog_svc_new(const char *docroot); +typedef int (*mog_scandev_cb)(const struct mog_dev *, struct mog_svc *); +int mog_svc_scandev(struct mog_svc *, mog_scandev_cb); +size_t mog_svc_each(Hash_processor processor, void *data); + +/* dev.c */ +struct mog_dev * mog_dev_new(struct mog_svc *, uint32_t mog_devid); +int mog_dev_mkusage(const struct mog_dev *, struct mog_svc *); + +/* valid_path.rl */ +int mog_valid_path(const char *buf, size_t len); + +/* trywrite.c */ +void * mog_trywritev(int fd, struct iovec *iov, int iovcnt); +enum mog_write_state mog_tryflush(int fd, struct mog_wbuf **); + +/* fs.c */ +extern int (*mog_stat)(struct mog_svc *, const char *path, struct stat *sb); +extern int (*mog_unlink)(struct mog_svc *, const char *path); +extern int (*mog_open_read)(struct mog_svc *, const char *path); +extern int (*mog_open_excl)(struct mog_svc *, const char *path); +extern int (*mog_rename)(struct mog_svc *, const char *old, const char *new); + +/* pidfile.c */ +int mog_pidfile_open(const char *path, pid_t *cur); +int mog_pidfile_commit(int fd); + +/* svc_dev.c */ +bool mog_svc_devstats_broadcast(void *svc, void *ignored); +void mog_svc_devstats_subscribe(struct mog_mgmt *); + +/* cloexec_detect.c */ +extern bool mog_cloexec_atomic; + +/* cloexec_from.c */ +void mog_cloexec_from(int lowfd); + +/* iostat_process.c */ +struct mog_iostat * mog_iostat_spawn(struct mog_queue *); +struct mog_iostat * mog_iostat_get(struct mog_queue *); + +/* cfg_parser.rl */ +struct mog_cfg; +int mog_cfg_parse(struct mog_cfg *, char *buf, size_t len); + +/* cfg.c */ +struct mog_cfg * mog_cfg_new(const char *configfile); +int mog_cfg_load(struct mog_cfg *); +void mog_cfg_svc_start_or_die(struct mog_cfg *cli); +extern struct mog_cfg mog_cli; +extern bool mog_cfg_multi; + +/* listen_parser.rl */ +struct mog_addrinfo *mog_listen_parse(const char *host_with_port); + +/* canonpath.c */ +char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode); +char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); + +/* thrpool.c */ +void mog_thrpool_start(struct mog_thrpool *, size_t n, + void *(*start_fn)(void *), void *arg); +void mog_thrpool_quit(struct mog_thrpool *); + +/* mgmt.c */ +void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); +void mog_mgmt_post_accept(struct mog_fd *, struct mog_svc *); +void mog_mgmt_queue_step(struct mog_fd *); + +/* queue_epoll.c */ +struct mog_queue * mog_queue_new(void); +void mog_idleq_push(struct mog_queue *, struct mog_fd *, enum mog_qev); +struct mog_fd * mog_idleq_wait(struct mog_queue *, int dontwait); + +/* activeq.c */ +void mog_activeq_push(struct mog_queue *, struct mog_fd *mfd); +struct mog_fd *mog_activeq_trytake(struct mog_queue *); + +/* addrinfo.c */ +struct mog_addrinfo { + const char *orig; + struct addrinfo *addr; +}; +void mog_addrinfo_free(struct mog_addrinfo **); + +/* bind_listen.c */ +int mog_bind_listen(struct addrinfo *, const char *accept_filter); + +/* close.c */ +void mog_close(int fd); + +/* mog_queue_loop.c */ +void * mog_queue_loop(void *arg); + +/* queue_step.c */ +void mog_queue_step(struct mog_fd *mfd); + +/* sig.c */ +void mog_intr_disable(void); +void mog_intr_enable(void); + +/* file.c */ +struct mog_fd * mog_file_open_read(struct mog_svc *, char *path); +int mog_file_fstat(struct mog_fd *, struct stat *); +void mog_file_close(struct mog_fd *); +size_t mog_file_pathlen(struct mog_fd *); + +/* notify.c */ +enum mog_notification { + MOG_NOTIFY_SIGNAL = -1, + MOG_NOTIFY_DEVICE_REFRESH = 0, + MOG_NOTIFY_MAX +}; +void mog_notify_init(void); +void mog_notify(enum mog_notification); +void mog_notify_wait(void); diff --git a/compat_accept.h b/compat_accept.h new file mode 100644 index 0000000..ee24e02 --- /dev/null +++ b/compat_accept.h @@ -0,0 +1,57 @@ +/* + * accept()/accept4() wrappers + * + * It's tricky for gnulib to work with SOCK_CLOEXEC/SOCK_NONBLOCK + * in a forward-compatible way, so we'll handle accept4()-compatibility + * for ourselves. + * + * Copyright (C) 2012 Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* + * we always define this because accept4() may fail with ENOSYS + * if it was compiled/built on a newer system and run on an older one + */ +MOG_NOINLINE static int +mog_accept_old(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + int fd = accept(sockfd, addr, addrlen); + + if (fd >= 0) { + /* + * don't bother with the nicer gnulib functions, we know + * these flags we want aren't set so save two fcntl() + * syscalls in a frequently-called function. + */ + CHECK(int, 0, fcntl(fd, F_SETFL, O_NONBLOCK|O_RDWR)); + CHECK(int, 0, fcntl(fd, F_SETFD, O_CLOEXEC)); + } + return fd; +} + +#if defined(HAVE_ACCEPT4) +static int accept_fallback(int, struct sockaddr *, socklen_t *); +static int +mog_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + int fd = accept4(sockfd, addr, addrlen, SOCK_CLOEXEC|SOCK_NONBLOCK); + + return fd >= 0 ? fd : accept_fallback(sockfd, addr, addrlen); +} + +static int (*mog_accept)(int, struct sockaddr *, socklen_t *) = mog_accept4; + +MOG_NOINLINE static int +accept_fallback(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + if (errno != ENOSYS) return -1; + + mog_accept = mog_accept_old; + syslog(LOG_WARNING, "accept4() failed, falling back"); + + return mog_accept_old(sockfd, addr, addrlen); +} +#else /* HAVE_ACCEPT4 */ +static int (*mog_accept)(int, struct sockaddr *, socklen_t *) = mog_accept_old; +#endif /* HAVE_ACCEPT4 */ diff --git a/configure.ac b/configure.ac new file mode 100644 index 0000000..d0cf395 --- /dev/null +++ b/configure.ac @@ -0,0 +1,40 @@ +AC_INIT([cmogstored], [0.0.0], [normalperson@yhbt.net]) +AM_INIT_AUTOMAKE([foreign silent-rules parallel-tests subdir-objects]) +AM_SILENT_RULES([yes]) +AC_PREREQ(2.59) + +AC_CONFIG_HEADER(config.h) +AC_CONFIG_MACRO_DIR([m4]) +AC_CONFIG_SRCDIR([cmogstored.c]) + +dnl we use C99 struct initializers +AC_PROG_CC_C99 + +gl_EARLY +gl_INIT +gl_WARN_ADD([-Wall]) +gl_WARN_ADD([-Wcast-qual]) +gl_WARN_ADD([-Wstrict-prototypes]) +gl_WARN_ADD([-Wredundant-decls]) +gl_WARN_ADD([-Wshadow]) +gl_WARN_ADD([-Werror=write-strings]) +gl_WARN_ADD([-Werror=aggregate-return]) +gl_WARN_ADD([-Werror=char-subscripts]) +gl_WARN_ADD([-Werror=pointer-arith]) +gl_WARN_ADD([-Werror-implicit-function-declaration]) +gl_WARN_ADD([-Werror=declaration-after-statement]) dnl I hate C99 for this :P + +AX_PTHREAD(true) +AC_SYS_LARGEFILE +AC_C_FLEXIBLE_ARRAY_MEMBER + +dnl gnulib *at functions aren't thread-safe, ask for the real thing +AC_CHECK_FUNCS([openat renameat mkdirat fstatat unlinkat]) + +AC_CHECK_FUNCS([epoll_wait]) + +dnl gnulib doesn't actually define SOCK_NONBLOCK/SOCK_CLOEXEC.. +AC_CHECK_FUNCS([accept4]) + +AC_CONFIG_FILES([Makefile lib/Makefile]) +AC_OUTPUT diff --git a/coverage.mk b/coverage.mk new file mode 100644 index 0000000..7651359 --- /dev/null +++ b/coverage.mk @@ -0,0 +1,34 @@ +cov_cflags = $(CFLAGS) -O0 -ggdb -ftest-coverage -fprofile-arcs +cov_ldflags = $(LDFLAGS) -lgcov +GCOV = gcov +gcovflags = --preserve-paths --branch-probabilities --all-blocks \ + --function-summaries +.PHONY: coverage-clean coverage-build coverage-show coverage-gen +cov_src = cmogstored.c $(mog_src) $(RL_CGEN) + +# this doesn't work in out-of-tree builds, yet... +cover_db = $(top_srcdir)/cover_db +coverage-clean: + $(MAKE) clean + $(RM) -r $(cover_db) + +COVERAGE_CHECK = check +coverage-build: coverage-clean + $(MAKE) CFLAGS="$(cov_cflags)" LDFLAGS="$(cov_ldflags)" + $(MAKE) CFLAGS="$(cov_cflags)" LDFLAGS="$(cov_ldflags)" \ + JOBS=1 -j1 $(COVERAGE_CHECK) + $(MAKE) coverage-gen + +COVER_IGNORE = $(addprefix -ignore ,$(RL_CGEN)) -ignore_re ^lib/ +coverage-show: + @cover $(COVER_IGNORE) -summary -report text $(cover_db) + +coverage-gen: + $(GCOV) $(gcovflags) --object-directory=$(top_builddir) \ + $(addprefix $(top_srcdir)/,$(cov_src)) + $(AM_V_GEN)gcov2perl -db $(cover_db) *.gcov >/dev/null 2>&1 + @echo Run '"'make coverage-show'"' to view summary + +coverage: coverage-build + $(MAKE) coverage-show + @echo Run '"'make coverage-show'"' to view summary diff --git a/defaults.h b/defaults.h new file mode 100644 index 0000000..d32b2f9 --- /dev/null +++ b/defaults.h @@ -0,0 +1,6 @@ +#define MOG_DEFAULT_MAXCONNS 10000 +#define MOG_STR(s) #s +#define MOG_DEFAULT_HTTPLISTEN "0.0.0.0:7500" +#define MOG_DEFAULT_MGMTLISTEN "0.0.0.0:7501" +#define MOG_DEFAULT_DOCROOT "/var/mogdata" +#define MOG_DEFAULT_CONFIGFILE "/etc/mogilefs/mogstored.conf" @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +struct mog_dev * mog_dev_new(struct mog_svc *svc, uint32_t mog_devid) +{ + struct mog_dev *dev; + struct stat sb; + char *devprefix = xasprintf("/dev%u/", mog_devid); + size_t len = strlen(devprefix); + + if (mog_stat(svc, devprefix, &sb) < 0) { + PRESERVE_ERRNO( free(devprefix) ); + return NULL; + } + + dev = xmalloc(sizeof(struct mog_dev) + len); + + assert(devprefix[len - 1] == '/' && "not trailing slash"); + devprefix[len - 1] = '\0'; + memcpy(dev->prefix, devprefix, len); + free(devprefix); + + dev->devid = mog_devid; + dev->st_dev = sb.st_dev; + + return dev; +} + +static int +emit_usage( +const struct mog_dev *dev, struct mog_svc *svc, int fd, struct statvfs *v) +{ + int rc = -1; + unsigned long long available = v->f_bavail; + unsigned long long total = v->f_blocks - (v->f_bfree - v->f_bavail); + unsigned long long used = v->f_blocks - v->f_bfree; + unsigned use = (used * 100) / total + !!((used * 100) % total); + const struct mount_entry *me = mog_mnt_acquire(dev->st_dev); + + if (me) { + static const char usage_fmt[] = + "available: %llu\n" + "device: %s\n" + "disk: %s%s\n" + "time: %lld\n" + "total: %llu\n" + "use: %u%%\n" + "used: %llu\n"; + rc = dprintf(fd, usage_fmt, + available * (v->f_frsize / 1024), + me->me_devname, + svc->docroot, dev->prefix, + (long long)time(NULL), + total * (v->f_frsize / 1024), + use > 100 ? 100 : use, + used * (v->f_frsize / 1024)); + + PRESERVE_ERRNO( mog_mnt_release(me) ); + } else { + syslog(LOG_ERR, "mount entry not found for %s%s", + svc->docroot, dev->prefix); + errno = ENODEV; + } + + return rc; +} + +int mog_dev_mkusage(const struct mog_dev *dev, struct mog_svc *svc) +{ + struct statvfs v; + char *usage_path = xasprintf("%s/usage", dev->prefix); + char *tmp_path = xasprintf("%s.%x", usage_path, (unsigned)getpid()); + int fd = -1; + + if (mog_unlink(svc, tmp_path) < 0 && errno != ENOENT) goto out; + + errno = 0; + fd = mog_open_excl(svc, tmp_path); + if (fd < 0) goto out; + if (fstatvfs(fd, &v) < 0) goto out; + if (emit_usage(dev, svc, fd, &v) < 0) goto out; + + /* skip rename on EIO or EBADF if close() fails */ + fd = close(fd); + if (fd < 0 && errno != EINTR) goto out; + fd = -1; + errno = 0; + + if (mog_rename(svc, tmp_path, usage_path) < 0) goto out; +out: + PRESERVE_ERRNO(do { + if (errno) + (void)mog_unlink(svc, tmp_path); + if (fd >= 0) + (void)close(fd); + free(tmp_path); + free(usage_path); + } while (0)); + return errno ? -1 : 0; +} @@ -0,0 +1,15 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +_Noreturn void die(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vdprintf(STDERR_FILENO, fmt, ap); + va_end(ap); + exit(EXIT_FAILURE); +} diff --git a/digmd5.c b/digmd5.c new file mode 100644 index 0000000..41dc8c8 --- /dev/null +++ b/digmd5.c @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "digmd5.h" +#include "lib/md5.h" + +struct md5_ctx * mog_md5_new(void) +{ + struct md5_ctx *ctx = mog_cachealign(sizeof(struct md5_ctx)); + + md5_init_ctx(ctx); + + return ctx; +} + +ssize_t mog_md5_read(struct md5_ctx *ctx, int fd) +{ + size_t len; + void *buf = mog_fsbuf_get(&len); + ssize_t r; + +retry: + r = read(fd, buf, len); + if (r > 0) { /* most likely */ + md5_process_bytes(buf, r, ctx); + } else if (r == 0) { + /* wait for user to call mog_md5_hex() */ + } else { + assert(r < 0 && errno && "buggy read(2)?"); + if (errno == EINTR) goto retry; /* may happen on crazy FSes */ + /* bail on EAGAIN, too, not possible on regular files */ + } + + return r; +} + +void mog_md5_hex(struct md5_ctx *ctx, char *buf, size_t len) +{ + static const char hex[] = "0123456789abcdef"; + unsigned char result[16]; + unsigned char *b = result; + char *out = buf; + int i; + + md5_finish_ctx(ctx, result); + + assert(len >= (sizeof(result) * 2) && "hex buffer too small"); + + for (i = 0; i < sizeof(result); i++) { + *out++ = hex[*b >> 4]; + *out++ = hex[*b & 0x0f]; + b++; + } +} diff --git a/digmd5.h b/digmd5.h new file mode 100644 index 0000000..434db2a --- /dev/null +++ b/digmd5.h @@ -0,0 +1,7 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +struct md5_ctx * mog_md5_new(void); +ssize_t mog_md5_read(struct md5_ctx *, int fd); +void mog_md5_hex(struct md5_ctx *, char *buf, size_t len); diff --git a/doc/design.txt b/doc/design.txt new file mode 100644 index 0000000..f873834 --- /dev/null +++ b/doc/design.txt @@ -0,0 +1,28 @@ + +object relationships +-------------------- + +1:1 relationship between mog_cfg and mog_svc, we'll support multiple +mog_svc if they don't conflict. + +There's only one mog_queue instance for now, shared by any number of +mog_svc instances. Theoretically, there can be any number of mog_queue +objects, but having one means the fairest distribution in the worst-case +scenarios (at the cost of optimal performance in the best-case scenario) + + + +mog_cfg[0] -- mog_svc[0] --- mog_mgmt[N] + | \-- mog_http[N] + | + | + / + / ___ mog_accept[N] + mog_queue[0]--------<___ mog_accept[N] + \ + \ + | + | + | +mog_cfg[1] -- mog_svc[1] --- mog_mgmt[M] + \-- mog_http[M] @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#define FD_PAD_SIZE ((size_t)128) +verify(sizeof(struct mog_fd) <= FD_PAD_SIZE); +static int max_fd; +static size_t fd_heaps; +static const size_t FD_PER_HEAP = 256; +static unsigned char **fd_map; +static pthread_mutex_t fd_lock = PTHREAD_MUTEX_INITIALIZER; + +static inline struct mog_fd *aref(size_t fd) +{ + unsigned char *base = fd_map[fd / FD_PER_HEAP]; + + return (struct mog_fd *)(base + (fd % FD_PER_HEAP) * FD_PAD_SIZE); +} + +static void fd_map_atexit(void) +{ + while (fd_heaps-- > 0) + free(fd_map[fd_heaps]); + free(fd_map); +} + +static void *my_memalign(size_t size) +{ + void *ptr; + int err = posix_memalign(&ptr, FD_PAD_SIZE, size); + + switch (err) { + case 0: return ptr; + case ENOMEM: mog_oom(); + } + + die("posix_memalign() failed: %s\n", strerror(err)); + return NULL; +} + +static void fd_map_init(void) +{ + long open_max = sysconf(_SC_OPEN_MAX); + size_t slots = open_max / FD_PER_HEAP + 1; + size_t size = slots * sizeof(void *); + + assert(fd_map == NULL && "fd_map reinitialized?"); + fd_map = my_memalign(size); + atexit(fd_map_atexit); +} + +MOG_NOINLINE static struct mog_fd * grow_ref(size_t fd) +{ + assert(fd < INT_MAX && "fd too large"); + CHECK(int, 0, pthread_mutex_lock(&fd_lock)); + + if (!fd_map) fd_map_init(); + while (fd >= (size_t)mog_sync_fetch(&max_fd)) { + unsigned char *base = my_memalign(FD_PAD_SIZE * FD_PER_HEAP); + struct mog_fd *tmp; + size_t i; + + for (i = 0; i < FD_PER_HEAP; i++) { + tmp = (struct mog_fd *)(base + (i * FD_PAD_SIZE)); + tmp->fd_type = MOG_FD_TYPE_UNUSED; + } + + fd_map[fd_heaps++] = base; + mog_sync_add_and_fetch(&max_fd, FD_PER_HEAP); + } + + CHECK(int, 0, pthread_mutex_unlock(&fd_lock)); + + return aref(fd); +} + +struct mog_fd *mog_fd_get(int fd) +{ + assert(fd >= 0 && "FD is negative"); + if (MOG_LIKELY(fd < mog_sync_fetch(&max_fd))) + return aref((size_t)fd); + + return grow_ref(fd); +} + +/* optional... */ +void mog_fd_put(struct mog_fd *validate, int fd) +{ + struct mog_fd *tmp; + assert(fd >= 0 && "FD is negative"); + assert(fd < mog_sync_fetch(&max_fd) && "FD too small"); + tmp = aref(fd); + + assert(tmp == validate && "tried to put incorrect mog_fd back in"); + validate->fd_type = MOG_FD_TYPE_UNUSED; +} + +void mog_fdmap_stop_queues(void) +{ + int i; + + for (i = 0; i < FD_PER_HEAP; ++i) { + struct mog_fd *mfd = mog_fd_get(i); + + if (mfd->fd_type != MOG_FD_TYPE_QUEUE) + continue; + mog_thrpool_quit(&mfd->as.queue.thrpool); + mog_close(mfd->fd); + } +} @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +static inline struct mog_fd * mog_fd_of(void *as_obj) +{ + uintptr_t as_addr = (uintptr_t)as_obj; + + return (struct mog_fd *)(as_addr - offsetof(struct mog_fd, as)); +} + + +/* used to validate a mog_fd is never in two queues at once */ +static inline void mog_fd_check_in(struct mog_fd *mfd) +{ + assert(mog_sync_add_and_fetch(&mfd->in_queue, 1) == 1 && + "in_queue counter off (check in)"); +} + +/* used to validate a mog_fd is never in two queues at once */ +static inline void mog_fd_check_out(struct mog_fd *mfd) +{ + assert(mog_sync_sub_and_fetch(&mfd->in_queue, 1) == 0 && + "in_queue counter off (check out)"); +} @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* path must be a free()-able pointer */ +struct mog_fd * +mog_file_open_read(struct mog_svc *svc, char *path) +{ + struct mog_fd *mfd; + struct mog_file *mfile; + int fd = mog_open_read(svc, path); + + if (fd < 0) return NULL; + + mfd = mog_fd_get(fd); + mfd->fd = fd; + mfd->fd_type = MOG_FD_TYPE_FILE; + mfd->queue_state = MOG_QUEUE_STATE_NEW; + + mfile = &mfd->as.file; + memset(mfile, 0, sizeof(struct mog_file)); + mfile->fsize = -1; + mfile->svc = svc; + mfile->path = path; + + return mfd; +} + +int mog_file_fstat(struct mog_fd *mfd, struct stat *sb) +{ + assert(mfd->fd_type == MOG_FD_TYPE_FILE && "mog_fd is not a file"); + + return fstat(mfd->fd, sb); +} + +void mog_file_close(struct mog_fd *mfd) +{ + struct mog_file *mfile = &mfd->as.file; + + assert(mfd->fd_type == MOG_FD_TYPE_FILE && "mog_fd is not a file"); + + mog_close(mfd->fd); + assert(mfile->path && "mog_file->path unset!"); + free(mfile->path); + free(mfile->tmppath); /* may be NULL */ + free(mfile->md5); +} + +size_t mog_file_pathlen(struct mog_fd *mfd) +{ + struct mog_file *mfile = &mfd->as.file; + + if (mfile->pathlen == 0) + mfile->pathlen = strlen(mfile->path); + + return mfile->pathlen; +} @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* all path operations we support are relative to mog_svc.docroot */ +#include "cmogstored.h" +#define MY_PATHMAX 80 +#ifdef O_CLOEXEC +# define MY_CLOEXEC O_CLOEXEC +#else +# define MY_CLOEXEC 0 +#endif + +static int noatime_flags = O_RDONLY | O_NOATIME | MY_CLOEXEC; +static const int excl_flags = O_RDWR | O_EXCL | O_CREAT; + +static inline int force_cloexec(int fd) +{ + if (MY_CLOEXEC == 0 && fd >= 0) + set_cloexec_flag(fd, true); + return fd; +} + +/* we only use real *at syscalls, Gnulib workalikes aren't thread-safe */ + +#define GET_FSPATH(DST,SRC) do { \ + int rc = snprintf((DST), sizeof(DST), "%s%s", svc->docroot, (SRC)); \ + if (rc <= 0 || rc >= sizeof(DST)) { \ + errno = ENAMETOOLONG; \ + return -1; \ + } \ +} while (0) + +static int mog_statpath(struct mog_svc *svc, const char *path, struct stat *sb) +{ + char fspath[MY_PATHMAX]; + + GET_FSPATH(fspath, path); + return stat(fspath, sb); +} + +static int mog_openpath_read(struct mog_svc *svc, const char *path) +{ + int fd; + char fspath[MY_PATHMAX]; + + GET_FSPATH(fspath, path); + fd = open(fspath, noatime_flags, 0666); + + if (fd < 0 && errno != ENOENT && (noatime_flags & O_NOATIME)) { + noatime_flags = O_RDONLY | MY_CLOEXEC; + fd = open(fspath, noatime_flags, 0666); + } + return force_cloexec(fd); +} + +static int mog_openpath_excl(struct mog_svc *svc, const char *path) +{ + int fd; + char fspath[MY_PATHMAX]; + + GET_FSPATH(fspath, path); + fd = open(fspath, excl_flags, 0666); + + return force_cloexec(fd); +} + +#ifdef HAVE_FSTATAT +static int mog_statat(struct mog_svc *svc, const char *path, struct stat *sb) +{ + int rc = fstatat(svc->dirfd, path + 1, sb, 0); + + if (rc < 0 && errno == ENOSYS) + return mog_statpath(svc, path, sb); + return rc; +} +#else +# define mog_statat mog_statpath +#endif + +#ifdef HAVE_OPENAT +static int mog_openat_read(struct mog_svc *svc, const char *path) +{ + int rc = openat(svc->dirfd, path + 1, noatime_flags, 0666); + + if (rc < 0 && errno == ENOSYS) + return mog_openpath_read(svc, path); + + if (rc < 0 && errno != ENOENT && (noatime_flags & O_NOATIME)) { + noatime_flags = O_RDONLY | MY_CLOEXEC; + rc = openat(svc->dirfd, path + 1, noatime_flags, 0666); + } + + return force_cloexec(rc); +} + +static int mog_openat_excl(struct mog_svc *svc, const char *path) +{ + int rc = openat(svc->dirfd, path + 1, excl_flags, 0666); + + if (rc < 0 && errno == ENOSYS) + return mog_openpath_excl(svc, path); + + return force_cloexec(rc); +} + +#else +# define mog_openat_read mog_openpath_read +# define mog_openat_excl mog_openpath_excl +#endif + +static int mog_unlinkpath(struct mog_svc *svc, const char *path) +{ + char fspath[MY_PATHMAX]; + + GET_FSPATH(fspath,path); + return unlink(fspath); +} + +#ifdef HAVE_UNLINKAT +static int mog_unlinkat(struct mog_svc *svc, const char *path) +{ + int rc = unlinkat(svc->dirfd, path + 1, 0); + + if (rc < 0 && errno == ENOSYS) + return mog_unlinkpath(svc, path); + return rc; +} +#else +# define mog_unlinkat mog_unlinkpath +#endif + +static int mog_renamepath(struct mog_svc *svc, const char *old, const char *new) +{ + char fsnew[MY_PATHMAX]; + char fsold[MY_PATHMAX]; + + GET_FSPATH(fsold, old); + GET_FSPATH(fsnew, new); + return rename(fsold, fsnew); +} +#ifdef HAVE_RENAMEAT +static int mog_renameat(struct mog_svc *svc, const char *old, const char *new) +{ + int rc = renameat(svc->dirfd, old + 1, svc->dirfd, new + 1); + + if (rc < 0 && errno == ENOSYS) + return mog_renamepath(svc, old, new); + return rc; +} +#else +# define mog_renameat mog_renamepath +#endif + +int (*mog_stat)(struct mog_svc *, const char *, struct stat *) = mog_statat; +int (*mog_open_read)(struct mog_svc *, const char *) = mog_openat_read; +int (*mog_open_excl)(struct mog_svc *, const char *) = mog_openat_excl; +int (*mog_unlink)(struct mog_svc *, const char *) = mog_unlinkat; +int (*mog_rename)(struct mog_svc *, const char *, const char *) = mog_renameat; @@ -0,0 +1,15 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#define MOG_PRINTF __attribute__((format(printf,1,2))) +#define MOG_LIKELY(x) (__builtin_expect((x), 1)) +#define MOG_UNLIKELY(x) (__builtin_expect((x), 0)) +#define MOG_NOINLINE __attribute__((noinline)) +#define mog_sync_add_and_fetch(dst,val) __sync_add_and_fetch((dst),(val)) +#define mog_sync_sub_and_fetch(dst,val) __sync_sub_and_fetch((dst),(val)) +#define mog_sync_lock_test_and_set(dst,val) \ + __sync_lock_test_and_set((dst),(val)) + +/* need the synchronization, right? */ +#define mog_sync_fetch(dst) mog_sync_add_and_fetch((dst),0) diff --git a/iostat.c b/iostat.c new file mode 100644 index 0000000..4410046 --- /dev/null +++ b/iostat.c @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* + * iostat(1) -> global mountlist -> each mog_dev in each mog_svc + */ +#include "cmogstored.h" +#define MOG_IOSTAT_RBUF_SIZE (1024 - MOG_RBUF_OVERHEAD) + +/* called after a stats line for a single device is complete */ +void mog_iostat_line_done(struct mog_iostat *iostat, char *buf, size_t len) +{ + char *dev = buf + iostat->dev_tip; + char util[8]; + + assert(iostat->util_tip > iostat->dev_tip && "BUG: util_tip > dev_tip"); + assert((iostat->dev_len + iostat->dev_tip) < len && "dev_len overflow"); + assert((iostat->util_len + iostat->util_tip) <= len && + "util_len overflow"); + + dev[iostat->dev_len] = 0; + if (iostat->util_len >= 7) { + util[0] = '-'; + util[1] = 0; + } else { + memcpy(util, buf + iostat->util_tip, iostat->util_len); + util[iostat->util_len] = 0; + } + + mog_mnt_update_util(dev, util, sizeof(util)); +} + +/* called every second, after stats for each device line are out */ +void mog_iostat_commit(void) +{ + mog_svc_each(mog_svc_devstats_broadcast, NULL); +} + +static void iostat_advance(struct mog_iostat *iostat, char *buf, size_t len) +{ + size_t end = iostat->line_end + 1; + + if (iostat->line_end == 0) { + syslog(LOG_ERR, "BUG: iostat line too long"); + assert(0 && "BUG: iostat line too long"); + abort(); + } + + assert((buf[iostat->line_end] == '\n' || + buf[iostat->line_end] == '\0') && + "iostat line_end is not LF or NUL"); + assert(iostat->offset > iostat->line_end && + "iostat offset past line_end"); + assert(iostat->line_end < len && "iostat line_end farther than buf"); + + iostat->offset -= end; + memmove(buf, buf + end, len - end); + + if (iostat->dev_tip) iostat->dev_tip -= end; + if (iostat->util_tip) iostat->util_tip -= end; + + iostat->line_end = 0; +} + +static void iostat_close(struct mog_fd *mfd) +{ + assert(mfd->fd_type == MOG_FD_TYPE_IOSTAT && "bad fd_type"); + mog_close(mfd->fd); +} + +void mog_iostat_queue_step(struct mog_fd *mfd) +{ + static const size_t capa = MOG_IOSTAT_RBUF_SIZE; + struct mog_rbuf *rbuf = mog_rbuf_get(capa); + ssize_t r; + off_t off; + struct mog_iostat *iostat = &mfd->as.iostat; + size_t rbuf_used; + + assert(mfd->fd >= 0 && mfd->fd_type == MOG_FD_TYPE_IOSTAT && + "bad iostat mfd"); +retry: + rbuf_used = 0; + off = iostat->offset; + r = read(mfd->fd, rbuf->rptr + off, capa - off); + if (r > 0) { + rbuf_used = r + off; + switch (mog_iostat_parse(iostat, rbuf->rptr, rbuf_used)) { + case MOG_IOSTAT_PARSER_ERROR: + syslog(LOG_ERR, "iostat parser error"); + iostat_close(mfd); + return; + case MOG_IOSTAT_PARSER_DONE: + assert(rbuf_used == iostat->offset && + "iostat parser didn't finish"); + mog_iostat_init(iostat); + goto retry; + case MOG_IOSTAT_PARSER_WANTREAD: + if (capa == iostat->offset) + iostat_advance(iostat, rbuf->rptr, capa); + goto retry; + } + } else if (r == 0) { + /* iostat exits every 30s by default */ + iostat_close(mfd); + } else { + switch (errno) { + case_EAGAIN: + mog_idleq_push(iostat->queue, mfd, MOG_QEV_RD); + return; + case EINTR: goto retry; + } + syslog(LOG_ERR, "iostat read() failed: %m"); + iostat_close(mfd); + } +} diff --git a/iostat.h b/iostat.h new file mode 100644 index 0000000..de09e78 --- /dev/null +++ b/iostat.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +struct mog_queue; +struct mog_fd; +struct mog_iostat { + int cs; + size_t offset; + size_t line_end; + size_t dev_tip; + size_t dev_len; + size_t util_tip; + size_t util_len; + struct mog_queue *queue; +}; + +enum mog_iostat_parser_state { + MOG_IOSTAT_PARSER_ERROR = -1, + MOG_IOSTAT_PARSER_DONE = 0, + MOG_IOSTAT_PARSER_WANTREAD = 1 +}; + +void mog_iostat_init(struct mog_iostat *); +int mog_iostat_parse(struct mog_iostat *, char *buf, size_t len); +void mog_iostat_commit(void); +void mog_iostat_line_done(struct mog_iostat *, char *buf, size_t len); +void mog_iostat_queue_step(struct mog_fd *); diff --git a/iostat_parser.rl b/iostat_parser.rl new file mode 100644 index 0000000..f19b85b --- /dev/null +++ b/iostat_parser.rl @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +%%{ + machine iostat_parser; + eor = '\n'> { iostat->line_end = fpc - buf; }; + ignored_line := ( (any-'\n')* eor ) @ { fgoto main; }; + device = "Device:" (any-'\n')* eor; + stats = space* + ( + ((any - space)+) > { + iostat->dev_tip = fpc - buf; + iostat->dev_len = 0; + iostat->util_tip = 0; + iostat->util_len = 0; + } + ((space - '\n')+) > { + iostat->dev_len = fpc - buf - iostat->dev_tip; + } + ) + + # Skip the middle section for now, some folks may use + # await/svctm here. Not sure how standardized those + # fields are on non-Linux platforms... + (any - '\n')* + (space - '\n')+ + + ((any - space)+) > { iostat->util_tip = fpc - buf; } + eor > { + iostat->util_len = fpc - buf - iostat->util_tip; + mog_iostat_line_done(iostat, buf, len); + }; + blank = (space*)eor; + dev_stats = (device stats+ blank)+ @ { mog_iostat_commit(); }; + main := dev_stats $! { fhold; fgoto ignored_line; }; +}%% + +%% write data; + +void mog_iostat_init(struct mog_iostat *iostat) +{ + int cs; + struct mog_queue *queue = iostat->queue; + + memset(iostat, 0, sizeof(struct mog_iostat)); + %% write init; + iostat->cs = cs; + iostat->queue = queue; +} + +enum mog_iostat_parser_state +mog_iostat_parse(struct mog_iostat *iostat, char *buf, size_t len) +{ + char *p, *pe, *eof = NULL; + int cs = iostat->cs; + size_t off = iostat->offset; + + assert(off <= len && "iostat offset past end of buffer"); + + if (cs == iostat_parser_first_final) + return MOG_IOSTAT_PARSER_DONE; + + p = buf + off; + pe = buf + len; + + assert((void *)(pe - p) == (void *)(len - off) && + "pointers aren't same distance"); + + %% write exec; + + iostat->cs = cs; + iostat->offset = p - buf; + + if (cs == iostat_parser_error) + return MOG_IOSTAT_PARSER_ERROR; + + assert(p <= pe && "buffer overflow after iostat parse"); + assert(iostat->offset <= len && "offset longer than len"); + + if (cs == iostat_parser_first_final) + return MOG_IOSTAT_PARSER_DONE; + + return MOG_IOSTAT_PARSER_WANTREAD; +} diff --git a/iostat_process.c b/iostat_process.c new file mode 100644 index 0000000..cb63ef7 --- /dev/null +++ b/iostat_process.c @@ -0,0 +1,161 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* + * process management for iostat(1) + * Since iostat(1) watches the entire system, we only spawn it once + * regardless of the number of mog_svc objects we have. + */ +#include "cmogstored.h" +#include <sys/wait.h> + +static pid_t iostat_pid; +static time_t iostat_last_fail; +static struct mog_iostat *iostat; +static time_t iostat_fail_timeout = 10; + +static void iostat_atexit(void) +{ + if (iostat_pid > 0) + kill(iostat_pid, SIGTERM); +} + +static void iostat_wait(void) +{ + int status; + pid_t tmp; + + if (iostat_pid <= 0) return; + + tmp = waitpid(iostat_pid, &status, WNOHANG); + if (tmp < 0) { + syslog(LOG_WARNING, "waitpid(pid=%d) failed: %m", + (int)iostat_pid); + return; + } + + if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { + /* syslog(LOG_DEBUG, "iostat done, restarting"); */ + } else { + iostat_last_fail = time(NULL); + syslog(LOG_WARNING, + "iostat done (pid=%d, status=%d), will retry in %ds", + (int)iostat_pid, status, (int)iostat_fail_timeout); + } + iostat_pid = 0; +} + +static int iostat_pipe_init(int *fds) +{ + if (pipe2(fds, O_CLOEXEC) < 0) { + PRESERVE_ERRNO( syslog(LOG_ERR, "pipe2() failed: %m") ); + return -1; + } + + CHECK(int, 0, set_nonblocking_flag(fds[0], true)); + /* fds[1] (write end) stays _blocking_ */ + + return 0; +} + +/* only called in the child process */ +static const char * exec_cmd(const char *cmd) +{ + time_t last_fail = time(NULL) - iostat_last_fail; + time_t delay = iostat_fail_timeout - last_fail; + + if (delay <= 0) + return xasprintf("exec %s", cmd); + + syslog(LOG_DEBUG, + "delaying exec of `%s' for %ds due to previous failure", + cmd, (int)delay); + return xasprintf("sleep %d; exec %s", (int)delay, cmd); +} + +static void dup2_or_die(int oldfd, int newfd, const char *errdesc) +{ + int rc; + + do + rc = dup2(oldfd, newfd); + while (rc < 0 && (errno == EINTR || errno == EBUSY)); + + if (rc < 0) { + syslog(LOG_CRIT, "dup2(%s) failed: %m", errdesc); + abort(); + } +} + +static void preexec_redirect(int out_fd) +{ + int null_fd; + + dup2_or_die(out_fd, STDOUT_FILENO, "iostat_pipe[1],STDOUT"); + mog_close(out_fd); + + null_fd = open("/dev/null", O_RDONLY); + if (null_fd < 0) { + syslog(LOG_CRIT, "open(/dev/null) failed: %m"); + abort(); + } + dup2_or_die(null_fd, STDIN_FILENO, "/dev/null,STDIN"); + mog_close(null_fd); + + /* don't touch stderr */ +} + +static pid_t iostat_fork_exec(int out_fd) +{ + iostat_pid = fork(); + if (iostat_pid < 0) { + syslog(LOG_ERR, "fork() for iostat failed: %m"); + } else if (iostat_pid > 0) { + mog_close(out_fd); + } else { + /* rely on /bin/sh to parse iostat command-line args */ + const char *cmd = getenv("MOG_IOSTAT_CMD"); + if (!cmd) cmd = "iostat -dx 1 30"; + + preexec_redirect(out_fd); + if (! mog_cloexec_atomic) + mog_cloexec_from(STDERR_FILENO + 1); + + cmd = exec_cmd(cmd); + mog_intr_enable(); + execl("/bin/sh", "sh", "-c", cmd, (char *)NULL); + syslog(LOG_CRIT, "execl(%s) failed: %m", cmd); + abort(); + } + return iostat_pid; +} + +struct mog_iostat *mog_iostat_spawn(struct mog_queue *queue) +{ + int fds[2]; + struct mog_fd *mfd; + + iostat_wait(); + if (iostat_pipe_init(fds) < 0) + return NULL; /* EMFILE || ENFILE */ + if (iostat_fork_exec(fds[1]) < 0) + return NULL; /* fork() failure */ + + assert(fds[0] >= 0 && "invalid FD"); + + mfd = mog_fd_get(fds[0]); + + if (iostat == NULL) atexit(iostat_atexit); + iostat = &mfd->as.iostat; + mfd->fd = fds[0]; + iostat->queue = queue; + mog_iostat_init(iostat); + + mfd->fd_type = MOG_FD_TYPE_IOSTAT; + assert((mfd->in_queue = 0) == 0 && "in_queue check"); + mfd->queue_state = MOG_QUEUE_STATE_NEW; + mog_idleq_push(queue, mfd, MOG_QEV_RD); + + return iostat; +} diff --git a/iov_str.h b/iov_str.h new file mode 100644 index 0000000..79a18d9 --- /dev/null +++ b/iov_str.h @@ -0,0 +1,14 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +static void iov_str(struct iovec *iov, const char *str, size_t len) +{ + union { const char *in; char *out; } deconst; + + deconst.in = str; + iov->iov_base = deconst.out; + iov->iov_len = len; +} + +#define IOV_STR(iov,str) iov_str((iov),(str),sizeof(str)-1) diff --git a/listen_parser.h b/listen_parser.h new file mode 100644 index 0000000..9b169ae --- /dev/null +++ b/listen_parser.h @@ -0,0 +1,5 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +struct mog_addrinfo * mog_listen_parse_internal(char *, size_t, char *, size_t); diff --git a/listen_parser.rl b/listen_parser.rl new file mode 100644 index 0000000..751bbc2 --- /dev/null +++ b/listen_parser.rl @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "listen_parser.h" +%%{ + machine listen_parser; + include listen_parser_common "listen_parser_common.rl"; + + main := listen '\0'> { + a = mog_listen_parse_internal(mark_beg, mark_len, + port_beg, port_len); + }; +}%% + +%% write data; + +static struct mog_addrinfo *listen_parse(char *str) +{ + char *p, *pe, *eof = NULL; + char *mark_beg = NULL; + char *port_beg = NULL; + size_t mark_len = 0; + size_t port_len = 0; + struct mog_addrinfo *a = NULL; + int cs; + + %% write init; + + p = str; + pe = str + strlen(str) + 1; + + %% write exec; + + if ((cs == listen_parser_error) && a) + mog_addrinfo_free(&a); + + assert(p <= pe && "buffer overflow after listen parse"); + return a; +} + +struct mog_addrinfo *mog_listen_parse(const char *str) +{ + char *tmp = xstrdup(str); + struct mog_addrinfo *rv = listen_parse(tmp); + free(tmp); + + return rv; +} diff --git a/listen_parser_common.rl b/listen_parser_common.rl new file mode 100644 index 0000000..6f56118 --- /dev/null +++ b/listen_parser_common.rl @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +%%{ + machine listen_parser_common; + + ipv4 = (digit+ '.' digit+ '.' digit+ '.' digit+) + > { mark_beg = fpc; } + @ { mark_len = fpc - mark_beg + 1; }; + port = (digit+) + > { port_beg = fpc; } + @ { port_len = fpc - port_beg + 1; }; + + listen = (((ipv4)? ':')? port ) $! { + syslog(LOG_ERR, "bad character in IPv4 address: %c", fc); + }; +}%% diff --git a/listen_parser_internal.c b/listen_parser_internal.c new file mode 100644 index 0000000..0582280 --- /dev/null +++ b/listen_parser_internal.c @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "listen_parser.h" + +struct mog_addrinfo * +mog_listen_parse_internal( + char *mark_beg, size_t mark_len, char *port_beg, size_t port_len) +{ + const char *node = NULL; + struct addrinfo hints; + struct addrinfo *result = NULL; + struct mog_addrinfo *mog_addr = NULL; + int s; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = IPPROTO_TCP; + + if (mark_len) { + mark_beg[mark_len] = 0; + node = mark_beg; + } + port_beg[port_len] = 0; + s = getaddrinfo(node, port_beg, &hints, &result); + if (s != 0) + syslog(LOG_ERR, "failed to resolve %s:%s - %s", + node ? node : "(nil)", port_beg, gai_strerror(s)); + + if (result) { + mog_addr = xmalloc(sizeof(struct mog_addrinfo)); + mog_addr->addr = result; + if (!node) node = "0.0.0.0"; + mog_addr->orig = xasprintf("%s:%s", node, port_beg); + } + + return mog_addr; +} diff --git a/m4/.gitignore b/m4/.gitignore new file mode 100644 index 0000000..8ed3316 --- /dev/null +++ b/m4/.gitignore @@ -0,0 +1,132 @@ +/00gnulib.m4 +/alloca.m4 +/chdir-long.m4 +/close.m4 +/dirent_h.m4 +/dirfd.m4 +/dirname.m4 +/double-slash-root.m4 +/dprintf.m4 +/dup2.m4 +/errno_h.m4 +/error.m4 +/exponentd.m4 +/extensions.m4 +/fchdir.m4 +/fcntl-o.m4 +/fcntl.m4 +/fcntl_h.m4 +/filenamecat.m4 +/float_h.m4 +/fstat.m4 +/fstatat.m4 +/getcwd.m4 +/getdtablesize.m4 +/gnulib-common.m4 +/gnulib-comp.m4 +/gnulib-tool.m4 +/include_next.m4 +/intmax_t.m4 +/inttypes_h.m4 +/largefile.m4 +/longlong.m4 +/lstat.m4 +/malloc.m4 +/math_h.m4 +/md5.m4 +/memchr.m4 +/memmem.m4 +/mempcpy.m4 +/memrchr.m4 +/mmap-anon.m4 +/mode_t.m4 +/msvc-inval.m4 +/msvc-nothrow.m4 +/multiarch.m4 +/open.m4 +/openat.m4 +/pathmax.m4 +/printf.m4 +/raise.m4 +/realloc.m4 +/safe-read.m4 +/safe-write.m4 +/save-cwd.m4 +/signal_h.m4 +/size_max.m4 +/ssize_t.m4 +/stat.m4 +/stdalign.m4 +/stdbool.m4 +/stddef_h.m4 +/stdint.m4 +/stdint_h.m4 +/stdio_h.m4 +/stdlib_h.m4 +/strdup.m4 +/strerror.m4 +/string_h.m4 +/sys_socket_h.m4 +/sys_stat_h.m4 +/time_h.m4 +/unistd-safer.m4 +/unistd_h.m4 +/vasnprintf.m4 +/warn-on-use.m4 +/warnings.m4 +/wchar_h.m4 +/wchar_t.m4 +/wint_t.m4 +/write.m4 +/xsize.m4 +/inline.m4 +/stdarg.m4 +/vasprintf.m4 +/xalloc.m4 +/xvasprintf.m4 +/fsusage.m4 +/read.m4 +/fstypename.m4 +/ls-mntd-fs.m4 +/mountlist.m4 +/strstr.m4 +/argp.m4 +/getopt.m4 +/nocrash.m4 +/rawmemchr.m4 +/sleep.m4 +/strcase.m4 +/strchrnul.m4 +/strings_h.m4 +/strndup.m4 +/strnlen.m4 +/sys_types_h.m4 +/sysexits.m4 +/vsnprintf.m4 +/pipe2.m4 +/asm-underscore.m4 +/ioctl.m4 +/nonblocking.m4 +/socklen.m4 +/sockpfaf.m4 +/sys_ioctl_h.m4 +/sys_uio_h.m4 +/canonicalize.m4 +/eealloc.m4 +/malloca.m4 +/readlink.m4 +/closedir.m4 +/d-ino.m4 +/dup.m4 +/fdopendir.m4 +/getcwd-abort-bug.m4 +/getcwd-path-max.m4 +/opendir.m4 +/readdir.m4 +/rewinddir.m4 +/same.m4 +/xgetcwd.m4 +/xstrndup.m4 +/accept4.m4 +/onceonly.m4 +/socketlib.m4 diff --git a/m4/ax_pthread.m4 b/m4/ax_pthread.m4 new file mode 100644 index 0000000..e20a388 --- /dev/null +++ b/m4/ax_pthread.m4 @@ -0,0 +1,309 @@ +# =========================================================================== +# http://www.gnu.org/software/autoconf-archive/ax_pthread.html +# =========================================================================== +# +# SYNOPSIS +# +# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]]) +# +# DESCRIPTION +# +# This macro figures out how to build C programs using POSIX threads. It +# sets the PTHREAD_LIBS output variable to the threads library and linker +# flags, and the PTHREAD_CFLAGS output variable to any special C compiler +# flags that are needed. (The user can also force certain compiler +# flags/libs to be tested by setting these environment variables.) +# +# Also sets PTHREAD_CC to any special C compiler that is needed for +# multi-threaded programs (defaults to the value of CC otherwise). (This +# is necessary on AIX to use the special cc_r compiler alias.) +# +# NOTE: You are assumed to not only compile your program with these flags, +# but also link it with them as well. e.g. you should link with +# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS +# +# If you are only building threads programs, you may wish to use these +# variables in your default LIBS, CFLAGS, and CC: +# +# LIBS="$PTHREAD_LIBS $LIBS" +# CFLAGS="$CFLAGS $PTHREAD_CFLAGS" +# CC="$PTHREAD_CC" +# +# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant +# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name +# (e.g. PTHREAD_CREATE_UNDETACHED on AIX). +# +# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the +# PTHREAD_PRIO_INHERIT symbol is defined when compiling with +# PTHREAD_CFLAGS. +# +# ACTION-IF-FOUND is a list of shell commands to run if a threads library +# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it +# is not found. If ACTION-IF-FOUND is not specified, the default action +# will define HAVE_PTHREAD. +# +# Please let the authors know if this macro fails on any platform, or if +# you have any other suggestions or comments. This macro was based on work +# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help +# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by +# Alejandro Forero Cuervo to the autoconf macro repository. We are also +# grateful for the helpful feedback of numerous users. +# +# Updated for Autoconf 2.68 by Daniel Richard G. +# +# LICENSE +# +# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu> +# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG> +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the respective Autoconf Macro's copyright owner +# gives unlimited permission to copy, distribute and modify the configure +# scripts that are the output of Autoconf when processing the Macro. You +# need not follow the terms of the GNU General Public License when using +# or distributing such scripts, even though portions of the text of the +# Macro appear in them. The GNU General Public License (GPL) does govern +# all other use of the material that constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the Autoconf +# Macro released by the Autoconf Archive. When you make and distribute a +# modified version of the Autoconf Macro, you may extend this special +# exception to the GPL to apply to your modified version as well. + +#serial 17 + +AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD]) +AC_DEFUN([AX_PTHREAD], [ +AC_REQUIRE([AC_CANONICAL_HOST]) +AC_LANG_PUSH([C]) +ax_pthread_ok=no + +# We used to check for pthread.h first, but this fails if pthread.h +# requires special compiler flags (e.g. on True64 or Sequent). +# It gets checked for in the link test anyway. + +# First of all, check if the user has set any of the PTHREAD_LIBS, +# etcetera environment variables, and if threads linking works using +# them: +if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS]) + AC_TRY_LINK_FUNC(pthread_join, ax_pthread_ok=yes) + AC_MSG_RESULT($ax_pthread_ok) + if test x"$ax_pthread_ok" = xno; then + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" + fi + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" +fi + +# We must check for the threads library under a number of different +# names; the ordering is very important because some systems +# (e.g. DEC) have both -lpthread and -lpthreads, where one of the +# libraries is broken (non-POSIX). + +# Create a list of thread flags to try. Items starting with a "-" are +# C compiler flags, and other items are library names, except for "none" +# which indicates that we try without any flags at all, and "pthread-config" +# which is a program returning the flags for the Pth emulation library. + +ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config" + +# The ordering *is* (sometimes) important. Some notes on the +# individual items follow: + +# pthreads: AIX (must check this before -lpthread) +# none: in case threads are in libc; should be tried before -Kthread and +# other compiler flags to prevent continual compiler warnings +# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h) +# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able) +# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread) +# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads) +# -pthreads: Solaris/gcc +# -mthreads: Mingw32/gcc, Lynx/gcc +# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it +# doesn't hurt to check since this sometimes defines pthreads too; +# also defines -D_REENTRANT) +# ... -mt is also the pthreads flag for HP/aCC +# pthread: Linux, etcetera +# --thread-safe: KAI C++ +# pthread-config: use pthread-config program (for GNU Pth library) + +case "${host_cpu}-${host_os}" in + *solaris*) + + # On Solaris (at least, for some versions), libc contains stubbed + # (non-functional) versions of the pthreads routines, so link-based + # tests will erroneously succeed. (We need to link with -pthreads/-mt/ + # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather + # a function called by this macro, so we could check for that, but + # who knows whether they'll stub that too in a future libc.) So, + # we'll just look for -pthreads and -lpthread first: + + ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags" + ;; + + *-darwin*) + ax_pthread_flags="-pthread $ax_pthread_flags" + ;; +esac + +if test x"$ax_pthread_ok" = xno; then +for flag in $ax_pthread_flags; do + + case $flag in + none) + AC_MSG_CHECKING([whether pthreads work without any flags]) + ;; + + -*) + AC_MSG_CHECKING([whether pthreads work with $flag]) + PTHREAD_CFLAGS="$flag" + ;; + + pthread-config) + AC_CHECK_PROG(ax_pthread_config, pthread-config, yes, no) + if test x"$ax_pthread_config" = xno; then continue; fi + PTHREAD_CFLAGS="`pthread-config --cflags`" + PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`" + ;; + + *) + AC_MSG_CHECKING([for the pthreads library -l$flag]) + PTHREAD_LIBS="-l$flag" + ;; + esac + + save_LIBS="$LIBS" + save_CFLAGS="$CFLAGS" + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Check for various functions. We must include pthread.h, + # since some functions may be macros. (On the Sequent, we + # need a special flag -Kthread to make this header compile.) + # We check for pthread_join because it is in -lpthread on IRIX + # while pthread_create is in libc. We check for pthread_attr_init + # due to DEC craziness with -lpthreads. We check for + # pthread_cleanup_push because it is one of the few pthread + # functions on Solaris that doesn't have a non-functional libc stub. + # We try pthread_create on general principles. + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h> + static void routine(void *a) { a = 0; } + static void *start_routine(void *a) { return a; }], + [pthread_t th; pthread_attr_t attr; + pthread_create(&th, 0, start_routine, 0); + pthread_join(th, 0); + pthread_attr_init(&attr); + pthread_cleanup_push(routine, 0); + pthread_cleanup_pop(0) /* ; */])], + [ax_pthread_ok=yes], + []) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + AC_MSG_RESULT($ax_pthread_ok) + if test "x$ax_pthread_ok" = xyes; then + break; + fi + + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" +done +fi + +# Various other checks: +if test "x$ax_pthread_ok" = xyes; then + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Detect AIX lossage: JOINABLE attribute is called UNDETACHED. + AC_MSG_CHECKING([for joinable pthread attribute]) + attr_name=unknown + for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>], + [int attr = $attr; return attr /* ; */])], + [attr_name=$attr; break], + []) + done + AC_MSG_RESULT($attr_name) + if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then + AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name, + [Define to necessary symbol if this constant + uses a non-standard name on your system.]) + fi + + AC_MSG_CHECKING([if more special flags are required for pthreads]) + flag=no + case "${host_cpu}-${host_os}" in + *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";; + *-osf* | *-hpux*) flag="-D_REENTRANT";; + *solaris*) + if test "$GCC" = "yes"; then + flag="-D_REENTRANT" + else + flag="-mt -D_REENTRANT" + fi + ;; + esac + AC_MSG_RESULT(${flag}) + if test "x$flag" != xno; then + PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS" + fi + + AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT], + ax_cv_PTHREAD_PRIO_INHERIT, [ + AC_LINK_IFELSE([ + AC_LANG_PROGRAM([[#include <pthread.h>]], [[int i = PTHREAD_PRIO_INHERIT;]])], + [ax_cv_PTHREAD_PRIO_INHERIT=yes], + [ax_cv_PTHREAD_PRIO_INHERIT=no]) + ]) + AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"], + AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], 1, [Have PTHREAD_PRIO_INHERIT.])) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + # More AIX lossage: must compile with xlc_r or cc_r + if test x"$GCC" != xyes; then + AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC}) + else + PTHREAD_CC=$CC + fi +else + PTHREAD_CC="$CC" +fi + +AC_SUBST(PTHREAD_LIBS) +AC_SUBST(PTHREAD_CFLAGS) +AC_SUBST(PTHREAD_CC) + +# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND: +if test x"$ax_pthread_ok" = xyes; then + ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1]) + : +else + ax_pthread_ok=no + $2 +fi +AC_LANG_POP +])dnl AX_PTHREAD diff --git a/m4/gnulib-cache.m4 b/m4/gnulib-cache.m4 new file mode 100644 index 0000000..0b14421 --- /dev/null +++ b/m4/gnulib-cache.m4 @@ -0,0 +1,59 @@ +# Copyright (C) 2002-2012 Free Software Foundation, Inc. +# +# This file is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This file is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this file. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception to the GNU General Public License, +# this file may be distributed as part of a program that +# contains a configuration script generated by Autoconf, under +# the same distribution terms as the rest of that program. +# +# Generated by gnulib-tool. +# +# This file represents the specification of how gnulib-tool is used. +# It acts as a cache: It is written and read by gnulib-tool. +# In projects that use version control, this file is meant to be put under +# version control, like the configure.ac and various Makefile.am files. + + +# Specification in the form of a command-line invocation: +# gnulib-tool --import --dir=. --lib=libgnu --source-base=lib --m4-base=m4 --doc-base=doc --tests-base=tests --aux-dir=. --avoid=fstatat --avoid=ioctl --avoid=openat --no-conditional-dependencies --no-libtool --macro-prefix=gl argp canonicalize cloexec crypto/md5 dprintf hash mempcpy mountlist nonblocking pipe2 verify warnings xvasprintf + +# Specification in the form of a few gnulib-tool.m4 macro invocations: +gl_LOCAL_DIR([]) +gl_MODULES([ + argp + canonicalize + cloexec + crypto/md5 + dprintf + hash + mempcpy + mountlist + nonblocking + pipe2 + verify + warnings + xvasprintf +]) +gl_AVOID([fstatat ioctl openat]) +gl_SOURCE_BASE([lib]) +gl_M4_BASE([m4]) +gl_PO_BASE([]) +gl_DOC_BASE([doc]) +gl_TESTS_BASE([tests]) +gl_LIB([libgnu]) +gl_MAKEFILE_NAME([]) +gl_MACRO_PREFIX([gl]) +gl_PO_DOMAIN([]) +gl_WITNESS_C_DOMAIN([]) diff --git a/maxconns.c b/maxconns.c new file mode 100644 index 0000000..692a72b --- /dev/null +++ b/maxconns.c @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include <sys/time.h> +#include <sys/resource.h> + +#ifndef RLIM_INFINITY +# define RLIM_INFINITY ((rlim_t)(-1)) +#endif + +void mog_set_maxconns(unsigned long maxconns) +{ + struct rlimit r; + rlim_t want; + struct rlimit orig; + + if (getrlimit(RLIMIT_NOFILE, &r) != 0) + die("getrlimit(RLIMIT_NOFILE) failed: %s\n", strerror(errno)); + + memcpy(&orig, &r, sizeof(struct rlimit)); + + if (!maxconns) maxconns = MOG_DEFAULT_MAXCONNS; + want = maxconns * 2; /* open files, listeners, iostat pipe */ + + if (want < 0 || want > MOG_FD_MAX) + want = MOG_FD_MAX; /* LOL :D */ + if (r.rlim_cur >= want) + return; + + if (r.rlim_max == RLIM_INFINITY || r.rlim_cur == RLIM_INFINITY) { + /* insane? maybe... */ + r.rlim_max = r.rlim_cur = want; + } else if (r.rlim_max == 0) { + warn("RLIMIT_NOFILE max=0, trying %ld anyways\n", (long)want); + r.rlim_max = r.rlim_cur = want; + } else if (r.rlim_max < want) { + warn("RLIMIT_NOFILE max=%ld less than wanted value=%ld\n", + (long)r.rlim_max, (long)want); + r.rlim_cur = r.rlim_max; + } else { + r.rlim_max = r.rlim_cur = want; + } + + if (setrlimit(RLIMIT_NOFILE, &r) == 0) return; + + warn("failed to set RLIMIT_NOFILE max=%ld cur=%ld (maxconns=%lu)\n", + (long)r.rlim_max, (long)r.rlim_cur, maxconns); + + while ((want -= 64) >= maxconns) { + r.rlim_max = r.rlim_cur = want; + if (setrlimit(RLIMIT_NOFILE, &r) == 0) + goto eventual_success; + } + + warn("RLIMIT_NOFILE stuck at max=%ld cur=%ld (maxconns=%lu)\n", + (long)orig.rlim_max, (long)orig.rlim_cur, maxconns); + return; + +eventual_success: + warn("set RLIMIT_NOFILE max=%ld cur=%ld (maxconns=%lu)\n", + (long)r.rlim_max, (long)r.rlim_cur, maxconns); +} @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "mgmt.h" +#include "digmd5.h" + +#define MOG_MGMT_RBUF_SIZE (512 - MOG_RBUF_OVERHEAD) + +static void mgmt_md5_step(struct mog_mgmt *mgmt) +{ + struct mog_fd *mfd = mgmt->forward; + ssize_t r = mog_md5_read(mfd->as.file.md5, mfd->fd); + + assert(mgmt->wbuf == NULL && "wbuf should be NULL here"); + if (r > 0) return; + + if (r == 0) /* EOF */ + mog_mgmt_fn_md5_emit(mgmt); + else + mog_mgmt_fn_md5_err(mgmt); + + mog_file_close(mgmt->forward); + mgmt->forward = NULL; +} + +static enum mog_next mgmt_md5_in_progress(struct mog_mgmt *mgmt) +{ + assert(mgmt->forward && mgmt->forward != MOG_IOSTAT && "bad forward"); + mgmt_md5_step(mgmt); + + if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; + if (mgmt->wbuf) return MOG_NEXT_WAIT_WR; + + /* + * we can error on the MD5 but continue if we didn't + * have a socket error (from wbuf == MOG_WR_ERROR) + */ + return MOG_NEXT_ACTIVE; +} + +MOG_NOINLINE static void mgmt_close(struct mog_fd *mfd) +{ + struct mog_mgmt *mgmt = &mfd->as.mgmt; + + mog_rbuf_free(mgmt->rbuf); + assert((mgmt->wbuf == NULL || mgmt->wbuf == MOG_WR_ERROR) && + "would leak mgmt->wbuf on close"); + mog_close(mfd->fd); +} + +void mog_mgmt_writev(struct mog_mgmt *mgmt, struct iovec *iov, int iovcnt) +{ + struct mog_fd *mfd = mog_fd_of(mgmt); + + assert(mgmt->wbuf == NULL && "tried to write while busy"); + mgmt->wbuf = mog_trywritev(mfd->fd, iov, iovcnt); +} + +static void mgmt_iostat_forever(struct mog_mgmt *mgmt) +{ + mog_rbuf_free_and_null(&mgmt->rbuf); /* no coming back from this */ + mog_notify(MOG_NOTIFY_DEVICE_REFRESH); + mog_svc_devstats_subscribe(mgmt); +} + +/* returns true if we can continue queue step, false if not */ +static enum mog_next mgmt_wbuf_in_progress(struct mog_mgmt *mgmt) +{ + assert(mgmt->wbuf != MOG_WR_ERROR && "still active after write error"); + switch (mog_tryflush(mog_fd_of(mgmt)->fd, &mgmt->wbuf)) { + case MOG_WRSTATE_ERR: return MOG_NEXT_CLOSE; + case MOG_WRSTATE_DONE: + return mgmt->forward == MOG_IOSTAT ? + MOG_NEXT_IGNORE : MOG_NEXT_ACTIVE; + case MOG_WRSTATE_BUSY: + /* unlikely, we never put anything big in wbuf */ + return MOG_NEXT_WAIT_WR; + } + assert(0 && "compiler bug?"); + return MOG_NEXT_CLOSE; +} + +/* stash any pipelined data for the next round */ +static void +mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) +{ + struct mog_rbuf *old = mgmt->rbuf; + size_t defer_bytes = buf_len - mgmt->offset; + char *src = rbuf->rptr + mgmt->offset; + + assert(mgmt->offset >= 0 && "mgmt->offset negative"); + assert(defer_bytes <= MOG_MGMT_RBUF_SIZE && "defer bytes overflow"); + + if (defer_bytes == 0) { + mog_rbuf_free_and_null(&mgmt->rbuf); + } else if (old) { /* no allocation needed, reuse existing */ + assert(old == rbuf && "mgmt->rbuf not reused properly"); + memmove(old->rptr, src, defer_bytes); + old->rsize = defer_bytes; + } else { + mgmt->rbuf = mog_rbuf_new(MOG_MGMT_RBUF_SIZE); + memcpy(mgmt->rbuf->rptr, src, defer_bytes); + + /* DANGER: we'll set this back to MGMT_MGMT_RBUF_SIZE later: */ + mgmt->rbuf->rsize = defer_bytes; + } + mgmt->offset = 0; +} + +/* + * this is the main event callback and called whenever mgmt + * is pulled out of a queue (either idle or active) + */ +static enum mog_next mgmt_queue_step(struct mog_fd *mfd) +{ + static const size_t capa = MOG_MGMT_RBUF_SIZE; + struct mog_mgmt *mgmt = &mfd->as.mgmt; + struct mog_rbuf *rbuf; + char *buf; + ssize_t r; + off_t off; + size_t buf_len = 0; + enum mog_parser_state state; + + assert(mfd->fd >= 0 && "mgmt fd is invalid"); + + if (mgmt->wbuf) return mgmt_wbuf_in_progress(mgmt); + if (mgmt->forward) return mgmt_md5_in_progress(mgmt); + + /* we may have pipelined data in mgmt->rbuf */ + rbuf = mgmt->rbuf ? mgmt->rbuf : mog_rbuf_get(capa); + buf = rbuf->rptr; + off = mgmt->offset; + assert(off >= 0 && "offset is negative"); + assert(off < capa && "offset is too big"); + if (mgmt->rbuf && off == 0) { + /* request got "pipelined", resuming now */ + buf_len = mgmt->rbuf->rsize; + mgmt->rbuf->rsize = capa; /* DANGER see mgmt_defer_rbuf */ + goto parse; + } +reread: + r = read(mfd->fd, buf + off, capa - off); + if (r > 0) { + buf_len = r + off; +parse: + state = mog_mgmt_parse(mgmt, buf, buf_len); + if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; + + switch (state) { + case MOG_PARSER_ERROR: + syslog(LOG_ERR, "mgmt parser error"); + return MOG_NEXT_CLOSE; + case MOG_PARSER_CONTINUE: + assert(mgmt->wbuf == NULL && + "tried to write (and failed) with partial req"); + if (mgmt->offset == capa) { + assert(buf_len == capa && "bad rbuf"); + syslog(LOG_ERR, "mgmt request too large"); + return MOG_NEXT_CLOSE; + } + off = mgmt->offset; + goto reread; + case MOG_PARSER_DONE: + if (mgmt->forward == MOG_IOSTAT) + return MOG_NEXT_IGNORE; + + /* stash unread portion in a new buffer */ + mgmt_defer_rbuf(mgmt, rbuf, buf_len); + mog_mgmt_reset_parser(mgmt); + assert(mgmt->wbuf != MOG_WR_ERROR); + return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE; + } + } else if (r == 0) { /* client shut down */ + return MOG_NEXT_CLOSE; + } else { + switch (errno) { + case_EAGAIN: + if ((buf_len > 0) && (mgmt->rbuf == NULL)) + mgmt->rbuf = mog_rbuf_defer(rbuf); + return MOG_NEXT_WAIT_RD; + case EINTR: goto reread; + default: + syslog(LOG_NOTICE, "mgmt client died: %m"); + return MOG_NEXT_CLOSE; + } + } + + assert(0 && "compiler bug?"); + return MOG_NEXT_CLOSE; +} + +/* + * this function is called whenever a mgmt client is pulled out of + * _any_ queue (listen/idle/active). Our queueing model should be + * designed to prevent this function from executing concurrently + * for any fd. + */ +void mog_mgmt_queue_step(struct mog_fd *mfd) +{ + struct mog_queue *q = mfd->as.mgmt.svc->queue; + + /* centralize all queue transitions here: */ + switch (mgmt_queue_step(mfd)) { + case MOG_NEXT_CLOSE: mgmt_close(mfd); return; + case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; + case MOG_NEXT_WAIT_RD: mog_idleq_push(q, mfd, MOG_QEV_RD); return; + case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; + case MOG_NEXT_IGNORE: mgmt_iostat_forever(&mfd->as.mgmt); return; + } +} + +/* called immediately after accept(), this initializes the mfd (once) */ +void mog_mgmt_post_accept(struct mog_fd *mfd, struct mog_svc *svc) +{ + struct mog_mgmt *mgmt = &mfd->as.mgmt; + + mfd->fd_type = MOG_FD_TYPE_MGMT; + assert((mfd->in_queue = 0) == 0 && "in_queue check"); + mog_mgmt_init(mgmt, svc); + mog_mgmt_queue_step(mfd); +} @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +struct mog_svc; +struct mog_wbuf; +struct mog_mgmt; + +/* mgmt_parser.rl */ +void mog_mgmt_init(struct mog_mgmt *, struct mog_svc *); +enum mog_parser_state mog_mgmt_parse(struct mog_mgmt *, char *buf, size_t len); +void mog_mgmt_reset_parser(struct mog_mgmt *); + +/* mgmt_fn.c */ +void mog_mgmt_fn_md5(struct mog_mgmt *, char *buf, size_t len); +void mog_mgmt_fn_size(struct mog_mgmt *, char *buf); +void mog_mgmt_fn_blank(struct mog_mgmt *); +void mog_mgmt_fn_unknown(struct mog_mgmt *, char *buf); +void mog_mgmt_fn_watch_err(struct mog_mgmt *); +void mog_mgmt_fn_md5_emit(struct mog_mgmt *); +void mog_mgmt_fn_md5_err(struct mog_mgmt *); +void mog_mgmt_md5_close(struct mog_mgmt *); diff --git a/mgmt_fn.c b/mgmt_fn.c new file mode 100644 index 0000000..a584359 --- /dev/null +++ b/mgmt_fn.c @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "mgmt.h" +#include "iov_str.h" +#include "digmd5.h" + +static char * +get_path(struct iovec *dst, struct mog_mgmt *mgmt, char *buf, bool sdup) +{ + dst->iov_base = buf + mgmt->mark[0]; + dst->iov_len = mgmt->mark[1] - mgmt->mark[0]; + + if (mog_valid_path(dst->iov_base, dst->iov_len)) { + char *path; + + if (sdup) { + path = mog_cachealign(dst->iov_len + 1); + memcpy(path, dst->iov_base, dst->iov_len); + } else { + path = dst->iov_base; + } + + path[dst->iov_len] = '\0'; + return path; + } else { + struct iovec iov; + + IOV_STR(&iov, "ERROR: uri invalid (contains ..)\r\n"); + mog_mgmt_writev(mgmt, &iov, 1); + return NULL; + } +} + +/* starts the MD5 request */ +void mog_mgmt_fn_md5(struct mog_mgmt *mgmt, char *buf, size_t len) +{ + struct iovec iov[2]; + char *path = get_path(iov, mgmt, buf, true); + + if (!path) return; + + mgmt->forward = mog_file_open_read(mgmt->svc, path); + if (mgmt->forward) { + struct mog_file *file = &mgmt->forward->as.file; + + file->md5 = mog_md5_new(); + file->pathlen = iov[0].iov_len; + assert(file->path == path && "path not set"); + } else { + IOV_STR(&iov[1], " MD5=-1\r\n"); + mog_mgmt_writev(mgmt, iov, 2); + free(path); + } +} + +/* finishes the MD5 request */ +#define CLEN(s) (sizeof(s)-1) +#define PFX " MD5=" + +void mog_mgmt_fn_md5_err(struct mog_mgmt *mgmt) +{ + struct iovec iov[3]; + struct mog_fd *mfd = mgmt->forward; + struct mog_file *file = &mfd->as.file; + long long offset = (long long)lseek(mfd->fd, 0, SEEK_CUR); + char buf[sizeof(" at 18446744073709551615 failed\r\n") - 1]; + + /* offset could be -1 here, but there ain't much we can do */ + + IOV_STR(iov, "ERR read "); + iov[1].iov_base = file->path; + iov[1].iov_len = file->pathlen; + iov[2].iov_base = buf; + iov[2].iov_len = snprintf(buf, sizeof(buf), + " at %lld failed\r\n", offset); + mog_mgmt_writev(mgmt, iov, 3); +} + +/* output: "/$PATH MD5=hex\r\n" */ +void mog_mgmt_fn_md5_emit(struct mog_mgmt *mgmt) +{ + struct iovec iov[2]; + char buf[CLEN(PFX) + 32 + CLEN("\r\n")]; + char *b; + struct mog_fd *mfd = mgmt->forward; + struct mog_file *file = &mfd->as.file; + + iov[0].iov_base = file->path; + iov[0].iov_len = file->pathlen; + + iov[1].iov_base = buf; + iov[1].iov_len = sizeof(buf); + + b = mempcpy(buf, PFX, CLEN(PFX)); + mog_md5_hex(file->md5, b, 32); + b[32] = '\r'; + b[33] = '\n'; + mog_mgmt_writev(mgmt, iov, 2); +} + +/* + * writes to mgmt fd: + * "URI $SIZE\r\n" on success + * "URI -1\r\n" on failure + * "ERROR: uri invalid (contains ..)\r\n" on invalid paths + */ +void mog_mgmt_fn_size(struct mog_mgmt *mgmt, char *buf) +{ + struct stat sb; + struct iovec iov[2]; + char tmp[sizeof(" 18446744073709551615\r\n") - 1]; + char *path = get_path(iov, mgmt, buf, false); + + if (!path) return; + + if (mog_stat(mgmt->svc, path, &sb) == 0) { + long long size = (long long)sb.st_size; + + iov[1].iov_base = tmp; + iov[1].iov_len = snprintf(tmp, sizeof(tmp), " %lld\r\n", size); + } else { + IOV_STR(&iov[1], " -1\r\n"); + } + + mog_mgmt_writev(mgmt, iov, 2); +} + +void mog_mgmt_fn_blank(struct mog_mgmt *mgmt) +{ + struct iovec iov; + + IOV_STR(&iov, "\r\n"); + mog_mgmt_writev(mgmt, &iov, 1); +} + +void mog_mgmt_fn_unknown(struct mog_mgmt *mgmt, char *buf) +{ + struct iovec iov[3]; + + IOV_STR(&iov[0], "ERROR: unknown command: "); + iov[1].iov_base = mgmt->mark[0] + buf; + iov[1].iov_len = mgmt->mark[1] - mgmt->mark[0]; + IOV_STR(&iov[2], "\r\n"); + mog_mgmt_writev(mgmt, iov, 3); +} + +void mog_mgmt_fn_watch_err(struct mog_mgmt *mgmt) +{ + struct iovec iov; + + IOV_STR(&iov, "ERR iostat unavailable\r\n"); + mog_mgmt_writev(mgmt, &iov, 1); +} diff --git a/mgmt_parser.rl b/mgmt_parser.rl new file mode 100644 index 0000000..ddbe6f1 --- /dev/null +++ b/mgmt_parser.rl @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +#include "mgmt.h" +%%{ + machine mgmt_parser; + + eor = '\r'?'\n'; + path = "/"[a-zA-Z0-9/\.\-]*; + invalid_line := ( + [ \t]* + ([^ \t\r]+) > { mgmt->mark[0] = fpc - buf; } + (any-'\n')* > { mgmt->mark[1] = fpc - buf; } + '\n' + ) @ { + mog_mgmt_fn_unknown(mgmt, buf); + really_done = 1; + fbreak; + }; + size = ( + "size "(path) > { mgmt->mark[0] = fpc - buf; } + eor > { mgmt->mark[1] = fpc - buf; } + @ { mog_mgmt_fn_size(mgmt, buf); fbreak; } + ); + md5 = ( + "MD5 "(path) > { mgmt->mark[0] = fpc - buf; } + eor > { mgmt->mark[1] = fpc - buf; } + @ { mog_mgmt_fn_md5(mgmt, buf, len); fbreak; } + ); + watch = "watch" eor @ { + static int have_iostat = 1; + + if (have_iostat) + mgmt->forward = MOG_IOSTAT; + else + mog_mgmt_fn_watch_err(mgmt); + fbreak; + }; + blank = [ \t]* eor @ { mog_mgmt_fn_blank(mgmt); fbreak; }; + + command = (md5|size|watch|blank); + main := command $! { + p = buf; + fhold; + fgoto invalid_line; + }; +}%% + +%% write data; + +void mog_mgmt_reset_parser(struct mog_mgmt *mgmt) +{ + int cs; + %% write init; + mgmt->cs = cs; + mgmt->mark[0] = mgmt->mark[1] = 0; +} + +void mog_mgmt_init(struct mog_mgmt *mgmt, struct mog_svc *svc) +{ + memset(mgmt, 0, sizeof(struct mog_mgmt)); + mog_mgmt_reset_parser(mgmt); + mgmt->svc = svc; +} + +enum mog_parser_state +mog_mgmt_parse(struct mog_mgmt *mgmt, char *buf, size_t len) +{ + char *p, *pe, *eof = NULL; + int cs = mgmt->cs; + int really_done = 0; + size_t off = mgmt->offset; + + assert(mgmt->wbuf == NULL && "unwritten data in buffer"); + assert(off <= len && "mgmt offset past end of buffer"); + + p = buf + off; + pe = buf + len; + + assert((void *)(pe - p) == (void *)(len - off) && + "pointers aren't same distance"); + + %% write exec; + + if (really_done) + cs = mgmt_parser_first_final; + + mgmt->cs = cs; + mgmt->offset = p - buf; + + if (cs == mgmt_parser_error) + return MOG_PARSER_ERROR; + + assert(p <= pe && "buffer overflow after mgmt parse"); + assert(mgmt->offset <= len && "offset longer than len"); + + if (mgmt->cs == mgmt_parser_first_final) return MOG_PARSER_DONE; + return MOG_PARSER_CONTINUE; +} @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* + * Uses the mountlist library in gnulib to map system device IDs and + * system device names to mount entries. + */ +#include "cmogstored.h" + +static pthread_mutex_t mnt_lock = PTHREAD_MUTEX_INITIALIZER; +/* global for the process */ +static Hash_table *by_dev; /* maps (system) device IDs to a mount_entry */ + +struct mog_mntent { + struct mount_entry *me; + char util[8]; +}; + +static void me_free(void *entry) +{ + struct mog_mntent *mntent = entry; + struct mount_entry *me = mntent->me; + + free(me->me_devname); + free(me->me_mountdir); + if (me->me_type_malloced) + free(me->me_type); + free(me); + free(mntent); +} + +static size_t me_dev_hash(const void *entry, size_t tablesize) +{ + const struct mog_mntent *mntent = entry; + + return mntent->me->me_dev % tablesize; +} + +static bool me_dev_cmp(const void *a, const void *b) +{ + const struct mog_mntent *mntent_a = a; + const struct mog_mntent *mntent_b = b; + + return mntent_a->me->me_dev == mntent_b->me->me_dev; +} + +static void mnt_atexit(void) +{ + hash_free(by_dev); +} + +static void mnt_once(void) +{ + by_dev = hash_initialize(7, NULL, me_dev_hash, me_dev_cmp, me_free); + if (!by_dev) + mog_oom(); + atexit(mnt_atexit); +} + +void mog_mnt_refresh(void) +{ + struct mount_entry *head = read_file_system_list(false); + struct mog_mntent *mntent; + struct stat sb; + + CHECK(int, 0, pthread_mutex_lock(&mnt_lock) ); + + if (by_dev) + hash_clear(by_dev); + else + mnt_once(); + + while (head) { + struct mount_entry *next = head->me_next; + + mntent = xmalloc(sizeof(struct mog_mntent)); + mntent->me = head; + mntent->util[0] = '-'; + mntent->util[1] = 0; + + /* the device number may not have been populated, do it */ + if ((head->me_dev == (dev_t)-1) && + (stat(head->me_mountdir, &sb) == 0)) + head->me_dev = sb.st_dev; + + switch (hash_insert_if_absent(by_dev, mntent, NULL)) { + case 0: me_free(mntent); + case 1: break; + default: mog_oom(); + } + head = next; + } + + CHECK(int, 0, pthread_mutex_unlock(&mnt_lock) ); +} + +/* + * Looks up a mount_entry by st_dev, returns NULL if nothing was found + * Users may only acquire one mount entry at a time and MUST release it + */ +const struct mount_entry * mog_mnt_acquire(dev_t st_dev) +{ + struct mount_entry me = { .me_dev = st_dev }; + struct mog_mntent mntent = { .me = &me }; + struct mog_mntent *rv; + + CHECK(int, 0, pthread_mutex_lock(&mnt_lock) ); + rv = hash_lookup(by_dev, &mntent); + + /* user must release this via mog_mnt_release if non-NULL */ + if (rv) return rv->me; + + CHECK(int, 0, pthread_mutex_unlock(&mnt_lock) ); + return NULL; +} + +/* releases the mount entry, allowing mog_mnt_acquire to be called again */ +void mog_mnt_release(const struct mount_entry *me) +{ + struct mog_mntent mntent; + const struct mog_mntent *check_me; + union { const void *in; void *out; } deconst = { .in = me }; + + mntent.me = deconst.out; + check_me = hash_lookup(by_dev, &mntent); + + assert(check_me->me == me && "did not release acquired mount_entry"); + CHECK(int, 0, pthread_mutex_unlock(&mnt_lock) ); +} + +struct mnt_update { + const char *prefix; + size_t prefixlen; + const char *util; +}; + +static bool update_util_each(void *ent, void *upd) +{ + struct mog_mntent *mntent = ent; + struct mnt_update *update = upd; + const char *devname = mntent->me->me_devname; + size_t devnamelen = strlen(devname); + + if (devnamelen >= update->prefixlen && + memcmp(update->prefix, devname, update->prefixlen) == 0) + memcpy(mntent->util, update->util, 8); + return true; /* continue */ +} + +/* this is O(mountpoints) for now */ +void mog_mnt_update_util(const char *devsuffix, const char *util, size_t len) +{ + struct mnt_update update; + + assert(len >= 8 && "len too small"); + update.prefix = xasprintf("/dev/%s", devsuffix); + update.prefixlen = strlen(update.prefix); + update.util = util; + + CHECK(int, 0, pthread_mutex_lock(&mnt_lock) ); + (void)hash_do_for_each(by_dev, update_util_each, &update); + CHECK(int, 0, pthread_mutex_unlock(&mnt_lock) ); + mog_free(update.prefix); +} + +char * mog_mnt_fetch_util(dev_t st_dev, char *dst, size_t len) +{ + struct mount_entry me = { .me_dev = st_dev }; + struct mog_mntent mntent = { .me = &me }; + struct mog_mntent *tmp; + + assert(len >= 8 && "len too small for dst"); + + CHECK(int, 0, pthread_mutex_lock(&mnt_lock) ); + tmp = hash_lookup(by_dev, &mntent); + if (tmp) + memcpy(dst, tmp->util, 8); + CHECK(int, 0, pthread_mutex_unlock(&mnt_lock) ); + + return tmp ? dst : NULL; +} @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "lib/mountlist.h" +void mog_mnt_refresh(void); +const struct mount_entry * mog_mnt_acquire(dev_t); +void mog_mnt_release(const struct mount_entry *); +void mog_mnt_update_util(const char *devname, const char *util, size_t len); +char *mog_mnt_fetch_util(dev_t st_dev, char *dst, size_t len); diff --git a/notify.c b/notify.c new file mode 100644 index 0000000..cb64780 --- /dev/null +++ b/notify.c @@ -0,0 +1,112 @@ +#include "cmogstored.h" + +/* poll() is broken on some *BSDs for pipe */ +#include <sys/select.h> + +static int self_pipe[2]; +static int notes[MOG_NOTIFY_MAX]; +static time_t usage_file_updated_at; +static time_t usage_file_interval = 10; + +void mog_notify_init(void) +{ + const char *interval = getenv("MOG_DISK_USAGE_FILE_INTERVAL"); + + if (pipe2(self_pipe, O_NONBLOCK | O_CLOEXEC) < 0) + die("failed to init self-pipe: %s\n", strerror(errno)); + + if (interval) { + int i = atoi(interval); + + if (i > 0) + usage_file_interval = (time_t)i; + } +} + +static bool svc_mkusage_each(void *svc, void *cb) +{ + mog_svc_scandev((struct mog_svc *)svc, cb); + + return true; +} + +static void global_mkusage(void) +{ + mog_svc_each(svc_mkusage_each, mog_dev_mkusage); + usage_file_updated_at = time(NULL); +} + +static void device_refresh(void) +{ + mog_svc_each(svc_mkusage_each, NULL); +} + +static void drain_pipe(void) +{ + char buf[128]; + ssize_t r; + + do + r = read(self_pipe[0], buf, sizeof(buf)); + while (r > 0 || (r < 0 && errno == EINTR)); + assert(r < 0 && errno == EAGAIN && "self-pipe read failed"); +} + +/* this is the main loop of cmogstored */ +void mog_notify_wait(void) +{ + int rc; + struct timeval tv; + fd_set rfds; + time_t next = usage_file_updated_at + usage_file_interval; + time_t now = time(NULL); + time_t timeout = next - now; + + rc = mog_sync_lock_test_and_set(¬es[MOG_NOTIFY_DEVICE_REFRESH], 0); + if (next <= now) global_mkusage(); + if (rc) device_refresh(); + if (timeout <= 0) return; + + tv.tv_sec = timeout; + tv.tv_usec = 0; + + FD_ZERO(&rfds); + FD_SET(self_pipe[0], &rfds); + + mog_intr_enable(); + rc = select(self_pipe[0] + 1, &rfds, NULL, NULL, &tv); + mog_intr_disable(); + if (rc > 0) + drain_pipe(); +} + +/* this is async-signal safe */ +void mog_notify(enum mog_notification note) +{ + ssize_t w; + + switch (note) { + case MOG_NOTIFY_DEVICE_REFRESH: + mog_sync_add_and_fetch(¬es[note], 1); + break; + /* support changing thread counts later... */ + case MOG_NOTIFY_SIGNAL: break; + default: assert(0 && "bad note passed"); + } + +retry: + w = write(self_pipe[1], "^", 1); + if (w >= 0) return; + + switch (errno) { + case_EAGAIN: return; /* somebody already woke this up */ + case EINTR: goto retry; /* just in case... */ + } + + /* + * we're screwed anyways, at least try this even though syslog() + * isn't safe inside a signal handler. + */ + syslog(LOG_CRIT, "mog_notify write() to pipe failed: %m"); + abort(); +} diff --git a/pidfile.c b/pidfile.c new file mode 100644 index 0000000..697d0d0 --- /dev/null +++ b/pidfile.c @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +static const char *pidfile; + +static void pidfile_atexit(void) +{ + unlink(pidfile); + mog_free(pidfile); +} + +/* + * opens a pid file and returns a file descriptor for it + * mog_pidfile_commit() should be used on the fd returned by + * this function (often in a separate process) + * returns < 0 if there is an error and sets errno=EAGAIN + * if a pid already exists + * + * + * Example: (error checking is left as an exercise to the reader) + * + * pid_t cur_pid; + * int fd = mog_pidfile_open("/path/to/pid", &cur_pid); + * daemon(0, 0); + * mog_pidfile_commit(fd); + */ +int mog_pidfile_open(const char *path, pid_t *cur) +{ + int fd = open(path, O_RDWR|O_CREAT, 0666); + struct stat sb; + char buf[sizeof(pid_t) * 8 / 3 + 1]; + ssize_t r; + + if (fd < 0) return fd; + if (fstat(fd, &sb) < 0) goto err; + + r = pread(fd, buf, sizeof(buf), 0); + if (r < 0) goto err; + if (r > 0) { + char *end; + long tmp = strtol(buf, &end, 10); + + if (*end == '\n' && tmp > 0 && tmp < INT_MAX) { + *cur = tmp; + if (kill(*cur, 0) < 0 && errno == ESRCH) { + *cur = -1; + return fd; + } + errno = EAGAIN; + goto err; + } + } /* else r == 0 => nothing to kill */ + + assert(pidfile == NULL && "already opened pidfile for process"); + pidfile = canonicalize_filename_mode(path, CAN_ALL_BUT_LAST); + if (!pidfile) + goto err; + + return fd; +err: + PRESERVE_ERRNO( close(fd) ); + return -1; +} + +/* + * commits the pidfile pointed to by the given fd + * and closes the given fd on success. + * returns -1 on error and sets errno + * fd should be the return value of mog_pidfile_open(); + */ +int mog_pidfile_commit(int fd) +{ + assert(lseek(fd, 0, SEEK_CUR) == 0 && "pidfile offset != 0"); + assert(pidfile && "mog_pidfile_open not called (or unsuccessful)"); + + if (dprintf(fd, "%d\n", (int)getpid()) <= 1) { + PRESERVE_ERRNO( close(fd) ); + return -1; + } + if (close(fd) < 0 && errno != EINTR) + return -1; + + atexit(pidfile_atexit); + + return 0; +} diff --git a/queue_common.h b/queue_common.h new file mode 100644 index 0000000..77cea5f --- /dev/null +++ b/queue_common.h @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +/* + * used only by queue_epoll.c, but should be usable with queue_kqueue.c + * in the future + */ +MOG_NOINLINE static struct mog_queue *mog_queue_init(int queue_fd) +{ + struct mog_fd *mfd; + struct mog_queue *q; + + CHECK(int, 0, set_cloexec_flag(queue_fd, true)); + + mfd = mog_fd_get(queue_fd); + mfd->fd_type = MOG_FD_TYPE_QUEUE; + mfd->fd = queue_fd; + q = &mfd->as.queue; + q->queue_fd = queue_fd; + SIMPLEQ_INIT(&q->activeq_head); + CHECK(int, 0, pthread_mutex_init(&q->activeq_lock, NULL)); + memset(&q->thrpool, 0, sizeof(struct mog_thrpool)); + + return q; +} diff --git a/queue_epoll.c b/queue_epoll.c new file mode 100644 index 0000000..4f3d03e --- /dev/null +++ b/queue_epoll.c @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +/* epoll-specific parts see queue_common.h and activeq.c for the rest */ +#include "queue_common.h" +/* + * a kqueue implementation should be possible, + * + * a poll/select/libev/libevent-based implementation would have a hard time + * migrating clients between threads + */ +#ifdef HAVE_EPOLL_WAIT +struct mog_queue * mog_queue_new(void) +{ + int size_hint = 666; /* hint, ignored in new kernels */ + int epoll_fd = epoll_create(size_hint); + if (epoll_fd < 0) die("epoll_create failed: %s\n", strerror(errno)); + + return mog_queue_init(epoll_fd); +} + +/* + * grabs one active event off the event queue + * epoll_wait() has "wake-one" behavior (like accept()) + * to avoid thundering herd since 2007 + */ +struct mog_fd * mog_idleq_wait(struct mog_queue *q, int dontwait) +{ + int rc; + struct epoll_event event; + struct mog_fd *mfd; + +retry: + /* epoll_wait is a cancellation point since glibc 2.4 */ + rc = epoll_wait(q->queue_fd, &event, 1, dontwait ? 0 : -1); + switch (rc) { + case 1: + mfd = event.data.ptr; + mog_fd_check_out(mfd); + return mfd; + case 0: + if (dontwait) + return NULL; + goto retry; + } + + switch (errno) { + case EINTR: + goto retry; + case EBADF: + case EINVAL: /* epfd can be hit */ + return NULL; + } + + die("epoll_wait failed with (%d): %s\n", rc, strerror(errno)); + return NULL; +} + +MOG_NOINLINE static void +epoll_ctl_error(struct mog_queue *q, struct mog_fd *mfd) +{ + switch (errno) { + case EPERM: + case EBADF: + /* TODO: check for shutdown races */ + syslog(LOG_ERR, "bad file descriptor for epoll_ctl()"); + return; + case ENOSPC: + syslog(LOG_ERR, + "epoll out-of-space, falling back to active queue"); + mog_activeq_push(q, mfd); + return; + case ENOMEM: mog_oom(); + case ENOENT: + case EEXIST: + case EINVAL: + syslog(LOG_ERR, "unhandled epoll_ctl() error: %m"); + assert(0 && "BUG in our usage of epoll"); + default: + syslog(LOG_ERR, "unhandled epoll_ctl() error: %m"); + } +} + +/* + * Pushes in one mog_fd for epoll to watch. + * + * Only call this from the mog_accept_loop *or* + * if EAGAIN/EWOULDBLOCK is encountered in mog_queue_loop. + */ +void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev) +{ + struct epoll_event event; + int op = mfd->queue_state == MOG_QUEUE_STATE_OLD ? + EPOLL_CTL_MOD : EPOLL_CTL_ADD; + + mfd->queue_state = MOG_QUEUE_STATE_OLD; + event.data.ptr = mfd; + event.events = (int)ev; + + mog_fd_check_in(mfd); + if (epoll_ctl(q->queue_fd, op, mfd->fd, &event) != 0) { + mog_fd_check_out(mfd); + epoll_ctl_error(q, mfd); + } +} +#else /* ! HAVE_EPOLL_WAIT */ +typedef int avoid_empty_file; +#endif /* ! HAVE_EPOLL_WAIT */ diff --git a/queue_epoll.h b/queue_epoll.h new file mode 100644 index 0000000..b14a2a7 --- /dev/null +++ b/queue_epoll.h @@ -0,0 +1,12 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#ifdef HAVE_EPOLL_WAIT +#include <sys/epoll.h> +enum mog_qev { + MOG_QEV_RD = EPOLLIN | EPOLLONESHOT | EPOLLET, + MOG_QEV_WR = EPOLLOUT | EPOLLONESHOT | EPOLLET, + MOG_QEV_RW = EPOLLIN | EPOLLOUT | EPOLLONESHOT | EPOLLET +}; +#endif /* HAVE_EPOLL_WAIT */ diff --git a/queue_loop.c b/queue_loop.c new file mode 100644 index 0000000..e6dc36e --- /dev/null +++ b/queue_loop.c @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +static void queue_loop_cleanup(void *arg) +{ + struct mog_queue *q = arg; + unsigned long self = (unsigned long)pthread_self(); + struct mog_fd *mfd; + + syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self); + + /* + * we need to drain the active queue to prevent starvation + * if we are reducing worker threads (and this thread is + * getting cut). The reason we normally avoid deadlocks + * in other cases is that the thread that pushed into + * active queue is always capable of taking the same mfd + * from the active queue later. + */ + while ((mfd = mog_activeq_trytake(q))) + mog_queue_step(mfd); + + syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self); +} + +static struct mog_fd *cancellable_queue_wait(struct mog_queue *q, int dontwait) +{ + struct mog_fd *mfd; + + mog_cancel_enable(); + mfd = mog_idleq_wait(q, dontwait); + mog_cancel_disable(); + + return mfd; +} + +/* passed as a start_routine to pthread_create */ +void * mog_queue_loop(void *arg) +{ + struct mog_queue *q = arg; + + pthread_cleanup_push(queue_loop_cleanup, arg); + mog_cancel_disable(); + syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready", + (unsigned long)pthread_self()); + + for (;;) { + struct mog_fd *mfd; + + /* + * idle, just-ready clients are the most important + * handle them as much as possible and bounce them + * into the active queue... + */ + while ((mfd = cancellable_queue_wait(q, 1))) + mog_queue_step(mfd); + + /* + * see if there's any already-active clients to work on + * busy servers should loop into here pretty frequently: + */ + if ((mfd = mog_activeq_trytake(q))) { + mog_queue_step(mfd); + } else { + /* + * We'll get here if there's nothing to do. + * Sleep until there's an event. mog_accept_loop + * will push into epoll/kqueue to wake us up here. + */ + mfd = cancellable_queue_wait(q, 0); + if (mfd == NULL) /* queue shutdown */ + break; + mog_queue_step(mfd); + } + } + + pthread_cleanup_pop(1); + + return NULL; +} diff --git a/queue_step.c b/queue_step.c new file mode 100644 index 0000000..5b4f450 --- /dev/null +++ b/queue_step.c @@ -0,0 +1,16 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +void mog_queue_step(struct mog_fd *mfd) +{ + switch (mfd->fd_type) { + case MOG_FD_TYPE_MGMT: mog_mgmt_queue_step(mfd); return; + case MOG_FD_TYPE_HTTP: return; + case MOG_FD_TYPE_IOSTAT: mog_iostat_queue_step(mfd); return; + default: + assert(0 && "BUG: bad fd_type"); + } +} @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* + * we block signals in pool threads, only the main thread receives signals + */ + +void mog_intr_disable(void) +{ + sigset_t set; + + CHECK(int, 0, sigfillset(&set)); + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &set, NULL)); +} + +void mog_intr_enable(void) +{ + sigset_t set; + + CHECK(int, 0, sigemptyset(&set)); + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &set, NULL)); +} @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* same default as MogileFS upstream */ +static pthread_mutex_t svc_lock = PTHREAD_MUTEX_INITIALIZER; +static Hash_table *by_docroot; /* enforce one mog_svc per docroot: */ + +static void svc_free(void *ptr) +{ + struct mog_svc *svc = ptr; + + if (closedir(svc->dir) < 0) + syslog(LOG_ERR, "closedir(%s) failed with: %m", svc->docroot); + CHECK(int, 0, pthread_mutex_destroy(&svc->devstats_lock)); + mog_free(svc->docroot); + if (svc->by_st_dev) + hash_free(svc->by_st_dev); + free(svc->devstats.iov_base); + free(svc); +} + +static size_t svc_hash(const void *x, size_t tablesize) +{ + const struct mog_svc *svc = x; + + return hash_string(svc->docroot, tablesize); +} + +static bool svc_cmp(const void *a, const void *b) +{ + const struct mog_svc *svc_a = a; + const struct mog_svc *svc_b = b; + + return strcmp(svc_a->docroot, svc_b->docroot) == 0; +} + +static void svc_atexit(void) /* called atexit */ +{ + hash_free(by_docroot); +} + +static void svc_once(void) +{ + by_docroot = hash_initialize(7, NULL, svc_hash, svc_cmp, svc_free); + if (!by_docroot) + mog_oom(); + + atexit(svc_atexit); +} + +struct mog_svc * mog_svc_new(const char *docroot) +{ + struct mog_svc *svc; + DIR *dir; + int fd; + + if (!docroot) docroot = MOG_DEFAULT_DOCROOT; + + docroot = mog_canonpath_die(docroot, CAN_EXISTING); + + dir = opendir(docroot); + if (dir == NULL) { + syslog(LOG_ERR, "opendir(%s) failed with: %m", docroot); + mog_free(docroot); + return NULL; + } + + fd = dirfd(dir); + if (fd < 0) { + syslog(LOG_ERR, "dirfd(%s) failed with: %m", docroot); + mog_free(docroot); + return NULL; + } + + CHECK(int, 0, pthread_mutex_lock(&svc_lock)); + + if (!by_docroot) + svc_once(); + + svc = xzalloc(sizeof(struct mog_svc)); + svc->http_fd = svc->httpget_fd = svc->mgmt_fd = -1; + svc->docroot = docroot; + svc->dirfd = fd; + svc->dir = dir; + CHECK(int, 0, pthread_mutex_init(&svc->devstats_lock, NULL)); + + switch (hash_insert_if_absent(by_docroot, svc, NULL)) { + case 0: + svc_free(svc); + svc = NULL; + case 1: break; + default: mog_oom(); + } + + CHECK(int, 0, pthread_mutex_unlock(&svc_lock)); + + return svc; +} + +size_t mog_svc_each(Hash_processor processor, void *data) +{ + size_t rv; + + CHECK(int, 0, pthread_mutex_lock(&svc_lock)); + rv = hash_do_for_each(by_docroot, processor, data); + CHECK(int, 0, pthread_mutex_unlock(&svc_lock)); + + return rv; +} diff --git a/svc_dev.c b/svc_dev.c new file mode 100644 index 0000000..1632e3d --- /dev/null +++ b/svc_dev.c @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +static size_t dev_hash(const void *x, size_t tablesize) +{ + const struct mog_dev *dev = x; + + return dev->st_dev % tablesize; +} + +static bool dev_cmp(const void *a, const void *b) +{ + const struct mog_dev *dev_a = a; + const struct mog_dev *dev_b = b; + + return dev_a->st_dev == dev_b->st_dev; +} + +static void svc_init_dev_hash(struct mog_svc *svc) +{ + if (svc->by_st_dev) { + hash_clear(svc->by_st_dev); + return; + } + svc->by_st_dev = hash_initialize(7, NULL, dev_hash, dev_cmp, free); + if (!svc->by_st_dev) mog_oom(); +} + +int mog_svc_scandev(struct mog_svc *svc, mog_scandev_cb cb) +{ + struct dirent *ent; + int rc = 0; + + CHECK(int, 0, pthread_mutex_lock(&svc->devstats_lock)); + svc_init_dev_hash(svc); + rewinddir(svc->dir); + while ((ent = readdir(svc->dir))) { + unsigned long mog_devid; + char *end; + size_t len = strlen(ent->d_name); + struct mog_dev *dev; + + if (len <= 3) continue; + if (memcmp("dev", ent->d_name, 3) != 0) continue; + + mog_devid = strtoul(ent->d_name + 3, &end, 10); + if (*end != 0) continue; + if (mog_devid > 0xffffff) continue; /* MEDIUMINT in DB */ + + dev = mog_dev_new(svc, (uint32_t)mog_devid); + if (!dev) continue; + + if (cb) rc |= cb(dev, svc); /* mog_dev_mkusage */ + switch (hash_insert_if_absent(svc->by_st_dev, dev, NULL)) { + case 0: free(dev); + case 1: break; + default: mog_oom(); /* -1 */ + } + } + CHECK(int, 0, pthread_mutex_unlock(&svc->devstats_lock)); + + return rc; +} + +static bool write_devstats(void *entry, void *file) +{ + struct mog_dev *dev = entry; + FILE *fp = file; + char util[8]; + char *found = mog_mnt_fetch_util(dev->st_dev, util, sizeof(util)); + + if (found) + fprintf(fp, "%u\t%s\n", dev->devid, found); + else + fprintf(fp, "%u\t-\n", dev->devid); + return true; +} + +/* updates per-svc device stats from the global mount list */ +static void devstats_update(struct mog_svc *svc) +{ + struct iovec *tmp = &svc->devstats; + FILE *fp; + + assert(svc->by_st_dev && "need to scan devices first"); + free(tmp->iov_base); + + fp = open_memstream((char **)&tmp->iov_base, &tmp->iov_len); + if (!fp) mog_oom(); + + hash_do_for_each(svc->by_st_dev, write_devstats, fp); + fprintf(fp, ".\n"); + + CHECK(int, 0, fclose(fp)); +} + +void mog_svc_devstats_subscribe(struct mog_mgmt *mgmt) +{ + struct mog_svc *svc = mgmt->svc; + + CHECK(int, 0, pthread_mutex_lock(&svc->devstats_lock)); + LIST_INSERT_HEAD(&svc->devstats_subscribers, mgmt, subscribed); + CHECK(int, 0, pthread_mutex_unlock(&svc->devstats_lock)); +} + +/* called while iterating through all mog_svc objects */ +bool mog_svc_devstats_broadcast(void *ent, void *ignored) +{ + struct mog_svc *svc = ent; + struct mog_mgmt *mgmt, *tmp; + struct iovec iov; + struct mog_fd *mfd; + + CHECK(int, 0, pthread_mutex_lock(&svc->devstats_lock)); + + devstats_update(svc); + LIST_FOREACH_SAFE(mgmt, &svc->devstats_subscribers, subscribed, tmp) { + assert(mgmt->wbuf == NULL && "wbuf not null"); + iov = svc->devstats; + mog_mgmt_writev(mgmt, &iov, 1); + + if (mgmt->wbuf == NULL) continue; /* success */ + + LIST_REMOVE(mgmt, subscribed); + mfd = mog_fd_of(mgmt); + if (mgmt->wbuf == MOG_WR_ERROR) { + assert(mgmt->rbuf == NULL && "would leak rbuf"); + mog_close(mfd->fd); + } else { /* blocked on write */ + mog_idleq_push(mgmt->svc->queue, mfd, MOG_QEV_WR); + } + } + + CHECK(int, 0, pthread_mutex_unlock(&svc->devstats_lock)); + + return true; +} diff --git a/test/cfg-parser-1.c b/test/cfg-parser-1.c new file mode 100644 index 0000000..201447e --- /dev/null +++ b/test/cfg-parser-1.c @@ -0,0 +1,108 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" +#include "cfg.h" + +int main(void) +{ + struct mog_cfg cfg; + int rc; + + openlog("mog-test-cfg-parser-1", LOG_PID, LOG_USER); + + { + char *s = xstrdup("127.0.0.1:666"); + struct mog_addrinfo *tmp = mog_listen_parse(s); + assert(tmp && "failed to parse"); + mog_addrinfo_free(&tmp); + assert(tmp == NULL && "not nulled"); + free(s); + } + + { + char *s = xstrdup("127-0.0.1:666"); + struct mog_addrinfo *tmp = mog_listen_parse(s); + assert(tmp == NULL && "not null"); + free(s); + } + + { + char buf[] = "httplisten = 127.0.0.1:7500 # foo bar\n"; + memset(&cfg, 0, sizeof(struct mog_cfg)); + rc = mog_cfg_parse(&cfg, buf, sizeof(buf) - 1); + + assert(cfg.httplisten && "httplisten unset"); + assert(! cfg.mgmtlisten && "mgmtlisten set"); + assert(! cfg.httpgetlisten && "httpgetlisten set"); + assert(! cfg.daemonize && "daemonize set"); + assert(! cfg.maxconns && "maxconns set"); + assert(rc == 0 && "parser failed"); + + mog_addrinfo_free(&cfg.httplisten); + } + + { + char buf[] = "httplisten = 127.0.0.1:7500\n" + "mgmtlisten = 127.6.6.6:7501\n" + "httpgetlisten = 127.6.6.6:7502\n" + "\n" + "docroot = /var/mogdata\n" + "daemonize\n" + "pidfile = /tmp/.cmogstored.pid\n" + "# hello \n" + "maxconns = 666666\n"; + + memset(&cfg, 0, sizeof(struct mog_cfg)); + rc = mog_cfg_parse(&cfg, buf, sizeof(buf) - 1); + + assert(rc == 0 && "parser failed"); + assert(cfg.httplisten && "httplisten unset"); + assert(cfg.mgmtlisten && "mgmtlisten set"); + assert(cfg.httpgetlisten && "httpgetlisten set"); + assert(cfg.daemonize == 1 && "daemonize set"); + assert(cfg.maxconns == 666666 && "maxconns set"); + assert(cfg.docroot && "docroot set"); + assert(strcmp(cfg.docroot, "/var/mogdata") == 0 && + "docroot mismatch"); + assert(strcmp(cfg.pidfile, "/tmp/.cmogstored.pid") == 0 && + "pidfile mismatch"); + mog_addrinfo_free(&cfg.httplisten); + mog_addrinfo_free(&cfg.mgmtlisten); + mog_addrinfo_free(&cfg.httpgetlisten); + mog_free(cfg.docroot); + mog_free(cfg.pidfile); + } + + { + char buf[] = "httplisten = 666.0.0.1:7500\n"; + + memset(&cfg, 0, sizeof(struct mog_cfg)); + rc = mog_cfg_parse(&cfg, buf, sizeof(buf) - 1); + + assert(rc == -1 && "parser should fail"); + assert(!cfg.httplisten && "nothing should've been allocated"); + } + + { + char buf[] = "pidfile = /foo\0bar\n"; + + memset(&cfg, 0, sizeof(struct mog_cfg)); + rc = mog_cfg_parse(&cfg, buf, sizeof(buf) - 1); + + assert(rc == -1 && "parser should fail"); + assert(!cfg.pidfile && "nothing should've been allocated"); + } + + { + char buf[] = "mgmtlisten = 666.0.0-1:7500\n"; + + memset(&cfg, 0, sizeof(struct mog_cfg)); + rc = mog_cfg_parse(&cfg, buf, sizeof(buf) - 1); + + assert(!cfg.mgmtlisten && "nothing should've been allocated"); + } + + return 0; +} diff --git a/test/cmogstored-cfg.rb b/test/cmogstored-cfg.rb new file mode 100755 index 0000000..cd65607 --- /dev/null +++ b/test/cmogstored-cfg.rb @@ -0,0 +1,80 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +require 'test/unit' +require 'tmpdir' +require 'socket' +require 'tempfile' +$stderr.sync = $stdout.sync = Thread.abort_on_exception = true + +class TestCmogstoredConfig < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-cfg-test') + @host = "127.#{rand(256)}.#{rand(256)}.#{rand(256)}" + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + cmd = [ "cmogstored" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @cmd = cmd + @pid = nil + @to_close = [] + end + + def teardown + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + end + + def get_client(tries = 300) + begin + c = TCPSocket.new(@host, @port) + @to_close << c + return c + rescue + sleep 0.05 + end while (tries-=1) > 0 + end + + def pre_kill + # need to ensure connections are accepted before we can safely kill + c = get_client + c.write "HI\n" + assert_kind_of String, c.gets + end + + def test_config_file + tmp = Tempfile.new("cmogstored-cfg-test") + tmp.write("mgmtlisten = #@host:#@port\n") + tmp.write("maxconns = 50\n") + tmp.write("docroot = #@tmpdir") + tmp.flush + @cmd << "--config=#{tmp.path}" + @pid = fork { exec(*@cmd) } + pre_kill + Process.kill(:QUIT, @pid) + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + + def test_multi_config_file_fail + Dir.mkdir("#@tmpdir/a") + Dir.mkdir("#@tmpdir/b") + tmp = Tempfile.new("cmogstored-cfg-test") + tmp.write("mgmtlisten = #@host:#@port\n") + tmp.write("maxconns = 50\n") + tmp.write("docroot = #@tmpdir/a") + tmp.flush + @cmd << "--config=#{tmp.path}" + + tmp = Tempfile.new("cmogstored-cfg-test") + tmp.write("mgmtlisten = #@host:#@port\n") + tmp.write("maxconns = 50\n") + tmp.write("docroot = #@tmpdir/b") + tmp.flush + @cmd << "--config=#{tmp.path}" + cmd = @cmd.join(' ') + msg = `#{cmd} 2>&1` + assert ! $?.success?, $?.inspect + assert_match(/--multi/, msg) + end +end diff --git a/test/fdmap-1.c b/test/fdmap-1.c new file mode 100644 index 0000000..5c1b8ec --- /dev/null +++ b/test/fdmap-1.c @@ -0,0 +1,27 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" + +int main(void) +{ + struct mog_fd *mfd; + int open_max = (int)sysconf(_SC_OPEN_MAX); + int i; + + mfd = mog_fd_get(0); + { + struct mog_mgmt *mgmt = &mfd->as.mgmt; + + assert(mog_fd_of(mgmt) == mfd); + } + + /* watch this in valgrind, I suck at math :P */ + for (i = 0; i < open_max; i++) { + mfd = mog_fd_get(i); + mog_fd_put(mfd, i); + } + + return 0; +} diff --git a/test/iostat-mock.rb b/test/iostat-mock.rb new file mode 100755 index 0000000..a0366a9 --- /dev/null +++ b/test/iostat-mock.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +usage = "Usage: #$0 PIDFILE (fast|slow)" +$stdout.binmode +$stdout.sync = $stderr.sync = true +$-w = true +require 'stringio' + +output = <<EOF +Linux 6.6.6 (faster) 01/01/1970 _x86_64_ (666 CPU) + +Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util +sda 0.02 71.67 1.05 5.94 31.95 630.78 94.85 0.82 116.28 0.65 0.45 +sdb 0.18 11.17 0.30 0.34 76.56 92.09 263.80 0.03 40.68 3.14 0.20 +sdc 0.19 0.28 0.32 0.27 125.83 34.07 275.40 0.06 108.49 35.36 2.05 +sdd 0.26 0.01 0.42 0.10 5.44 24.15 56.47 0.72 1373.24 1.88 0.10 +sde 0.61 13.63 1.14 0.38 213.65 112.08 214.80 0.06 42.54 3.28 0.50 + +EOF + +pidfile = ARGV.shift or abort usage +File.open(pidfile, "wb") { |fp| fp.write("#$$\n") } +n = 666666 +begin + case ARGV.shift + when "fast" + n.times { $stdout.write(output * 2) } + when "slow" + n.times do + io = StringIO.new(output) + while buf = io.read(rand(25) + 1) + $stdout.write(buf) + sleep 0.01 + end + sleep 0.1 + end + when "bursty1" + n.times do + io = StringIO.new(output.dup) + add = io.gets + add << io.gets + $stdout.write(output + add) + sleep 2 + while buf = io.read(rand(666) + 1) + $stdout.write(buf) + end + end + when "bursty2" + n.times do + io = StringIO.new(output.dup) + add = io.read(rand(66) + 6) + $stdout.write(output + add) + sleep 2 + while buf = io.read(rand(666) + 1) + $stdout.write(buf) + sleep 0.01 + end + sleep 1 + end + else + abort usage + end +rescue Errno::EPIPE + exit 0 +rescue => e + abort("#{e.message} (#{e.class})") +end diff --git a/test/mgmt-iostat.rb b/test/mgmt-iostat.rb new file mode 100755 index 0000000..61f1b6a --- /dev/null +++ b/test/mgmt-iostat.rb @@ -0,0 +1,112 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/unit' +require 'tmpdir' +require 'socket' +require 'tempfile' +$stderr.sync = $stdout.sync = Thread.abort_on_exception = true + +class TestMgmtIostat < Test::Unit::TestCase + TEST_PATH = File.dirname(__FILE__) + ":#{ENV['PATH']}" + + def setup + @iostat_pid = Tempfile.new('testt-iostat-pid') + @tmpdir = Dir.mktmpdir('cmogstored-mgmt-iostat-test') + @host = "127.#{rand(256)}.#{rand(256)}.#{rand(256)}" + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + cmd = [ "cmogstored", "--docroot=#@tmpdir", "--mgmtlisten=#@host:#@port", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @cmd = cmd + @pid = nil + @to_close = [] + end + + def get_client(tries = 300) + begin + c = TCPSocket.new(@host, @port) + @to_close << c + return c + rescue + sleep 0.05 + end while (tries-=1) > 0 + end + + def teardown + if @pid + Process.kill(:QUIT, @pid) rescue nil + _, status = Process.waitpid2(@pid) + assert status.success?, status.inspect + end + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + end + + def test_iostat_fast + Dir.mkdir "#@tmpdir/dev666" + @pid = fork do + ENV["PATH"] = TEST_PATH + ENV["MOG_IOSTAT_CMD"] = "test/iostat-mock.rb #{@iostat_pid.path} fast" + exec(*@cmd) + end + + og = get_client + og.write "watch\n" + threads = [] + 400.times do + threads << Thread.new do + c = get_client(0) + assert_nothing_raised do + c.write "watch\n" + 100.times { c.gets } + end + c + end + end + + sleep 1 + threads.each { |th| th.value.close } + assert og.readpartial(16384) + sleep 1 + assert og.read_nonblock(512) + sleep 1 + assert og.read_nonblock(512) + iostat_pid = @iostat_pid.read.to_i + if iostat_pid > 0 + Process.kill(:TERM, iostat_pid) + end + end + + def test_iostat_bursty1 + iostat_edge_case("bursty1") + end + + def test_iostat_bursty2 + iostat_edge_case("bursty2") + end + + def test_iostat_slow + iostat_edge_case("slow") + end + + def iostat_edge_case(type) + Dir.mkdir "#@tmpdir/dev666" + @pid = fork do + ENV["PATH"] = TEST_PATH + ENV["MOG_IOSTAT_CMD"] = "test/iostat-mock.rb #{@iostat_pid.path} #{type}" + exec(*@cmd) + end + + og = get_client + og.write "watch\n" + 5.times do + assert_match(/\n$/, x = og.gets) + p(x) if $DEBUG + end + og.close + end +end diff --git a/test/mgmt-usage.rb b/test/mgmt-usage.rb new file mode 100755 index 0000000..57318ec --- /dev/null +++ b/test/mgmt-usage.rb @@ -0,0 +1,79 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/unit' +require 'tmpdir' +require 'socket' +$stderr.sync = $stdout.sync = Thread.abort_on_exception = true + +class TestCmogstoredUsage < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-usage-test') + @host = "127.#{rand(256)}.#{rand(256)}.#{rand(256)}" + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + @cmd = [ 'cmogstored', '--maxconns=50', '--daemonize', + "--mgmtlisten=#@host:#@port", "--docroot=#@tmpdir" ] + end + + def teardown + FileUtils.rm_rf(@tmpdir) + end + + def test_usage_daemon + devdir = "#@tmpdir/dev666" + pidf = "#@tmpdir/pid" + Dir.mkdir(devdir) + usage = "#{devdir}/usage" + assert ! File.exist?(usage) + @cmd << "--pidfile=#{pidf}" + assert(system(*@cmd), proc { $?.to_s }) + sleep(0.05) until File.exist?(usage) + before = check_usage_file(usage, devdir) + sleep 1.2 + after = check_usage_file(usage, devdir) + assert(before[3].split(/ /)[1].to_i <= after[3].split(/ /)[1].to_i) + pid = File.read(pidf).to_i + assert(pid > 0 && pid != $$, pid.inspect) + Process.kill(:TERM, pid) + begin + Process.kill(0, pid) + sleep 0.01 + rescue Errno::ESRCH + break + end while true + assert ! File.exist?(pidf), "pidfile=#{pidf} exists" + end + + def check_usage_file(usage, devdir) + lines = File.readlines(usage) + assert_match(/^available: \d+$/, lines[0]) + assert_match(%r{^device: /\S+$}, lines[1]) + assert_match(/^disk: #{Regexp.escape(devdir)}$/, lines[2]) + assert_match(/^time: \d+\n$/, lines[3]) + assert_match(/^total: \d+\n$/, lines[4]) + assert_match(/^use: \d+%$/, lines[5]) + assert_match(/^used: \d+$/, lines[6]) + lines + end + + def test_usage_daemon_already_running + old_err = $stderr.dup + devdir = "#@tmpdir/dev666" + pidf = "#@tmpdir/pid" + Dir.mkdir(devdir) + usage = "#{devdir}/usage" + File.open(pidf, "w") { |fp| fp.write("#$$\n") } + @cmd << "--pidfile=#{pidf}" + $stderr.reopen("#@tmpdir/err", "ab") + assert(! system(*@cmd)) + assert_equal "#$$\n", File.read(pidf) + assert_equal "already running on PID: #$$\n", File.read("#@tmpdir/err") + assert ! File.exist?(usage) + ensure + $stderr.reopen(old_err) + old_err.close + end +end diff --git a/test/mgmt.rb b/test/mgmt.rb new file mode 100755 index 0000000..7d15545 --- /dev/null +++ b/test/mgmt.rb @@ -0,0 +1,215 @@ +#!/usr/bin/env ruby +# -*- encoding: binary -*- +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +require 'test/unit' +require 'tmpdir' +require 'tempfile' +require 'fileutils' +require 'digest/md5' +require 'socket' +$stderr.sync = $stdout.sync = Thread.abort_on_exception = true + +class TestMgmt < Test::Unit::TestCase + def setup + @tmpdir = Dir.mktmpdir('cmogstored-mgmt-test') + @to_close = [] + @host = "127.#{rand(256)}.#{rand(256)}.#{rand(256)}" + srv = TCPServer.new(@host, 0) + @port = srv.addr[1] + srv.close + @err = Tempfile.new("stderr") + cmd = [ "cmogstored", "--docroot=#@tmpdir", "--mgmtlisten=#@host:#@port", + "--maxconns=500" ] + vg = ENV["VALGRIND"] and cmd = vg.split(/\s+/).concat(cmd) + @pid = fork { + $stderr.reopen(@err) + @err.close + exec(*cmd) + } + tries = 300 + begin + sleep 0.05 + @client = TCPSocket.new(@host, @port) + @to_close << @client + break + rescue + end while (tries-=1) > 0 + end + + def teardown + Process.kill(:QUIT, @pid) rescue nil + _, status = Process.waitpid2(@pid) + @to_close.each { |io| io.close unless io.closed? } + FileUtils.rm_rf(@tmpdir) + @err.rewind + $stderr.write(@err.read) + assert status.success?, status.inspect + end + + def t(expect_out, input) + expect_out += "\r\n" + input += "\r\n" + + @client.write(input) + output = @client.gets + assert_equal expect_out, output + end + + def test_size + File.open("#@tmpdir/foo", "wb") { |fp| fp.write(' ' * 3) } + t("/foo 3", "size /foo") + t("/missing -1", "size /missing") + + Dir.mkdir("#@tmpdir/dev666") + File.open("#@tmpdir/dev666/usage", "wb").close + t("/dev666/usage 0", "size /dev666/usage") + end + + def test_unknown_command + t("ERROR: unknown command: hello", " hello ") + t("ERROR: unknown command: hello", "hello") + t("ERROR: unknown command: size", "size bad") + end + + def test_invalid_uri + t("ERROR: uri invalid (contains ..)", "size /..") + t("ERROR: uri invalid (contains ..)", "size /..") + t("ERROR: uri invalid (contains ..)", "size /a/..") + t("ERROR: uri invalid (contains ..)", "size /a/../b") + longest = "/dev16777215/0/000/000/0123456789.fid" + t("#{longest} -1", "size #{longest}") + + # non-sensical error, but whatever... + too_long = longest + "-" + t("ERROR: uri invalid (contains ..)", "size #{too_long}") + end + + def test_no_command + t("", ""); + t("", " "); + end + + def test_md5 + buf = ' ' * 3 + expect = "/foo MD5=#{Digest::MD5.hexdigest(buf)}" + File.open("#@tmpdir/foo", "wb") { |fp| fp.write(buf) } + t(expect, "MD5 /foo") + t("/missing MD5=-1", "MD5 /missing") + Dir.mkdir("#@tmpdir/dev666") + t("ERR read /dev666 at 0 failed", "MD5 /dev666") + end + + def test_continuous_feed + n = 0 + File.open("#@tmpdir/-1.fid", "wb").close + + 200.times do |i| + File.rename("#@tmpdir/#{i-1}.fid", "#@tmpdir/#{i}.fid") + n += @client.write("size /#{i}.fid\r\n") + x = @client.gets + assert_equal "/#{i}.fid 0\r\n", x + end + + @client.write "bad command\r\n" + assert_equal "ERROR: unknown command: bad\r\n", @client.gets + @client.write " bad command\r\n" + assert_equal "ERROR: unknown command: bad\r\n", @client.gets + + @client.write "size /foo bar\r\n" + assert_equal "ERROR: unknown command: size\r\n", @client.gets + + @client.write "size /foo\r\n" + assert_equal "/foo -1\r\n", @client.gets + end + + def test_continuous_feed_mt + n = 0 + nr = 200 + pfx = rand.to_s + delay = 0.01 + thr = Thread.new do + nr.times do |i| + sleep delay + x = @client.gets + assert_equal "/#{pfx}-#{i} -1\r\n", x + end + end + + nr.times do |i| + n += @client.write("size /#{pfx}-#{i}\r\n") + end + delay = 0 + thr.join + end + + def test_monster_line + assert_raises(Errno::ECONNRESET) { + @client.write("size /#{'0' * (400 * 1024)}\r\n") + @client.gets + } + + @client = TCPSocket.new(@host, @port) + @to_close << @client + t("/missing -1", "size /missing") + end + + def test_trickle + "size /missing\r\n".split(//).each do |c| + @client.write c + sleep 0.01 + end + assert_equal "/missing -1\r\n", @client.gets + end + + def test_client_eof + t("/missing -1", "size /missing") + @client.close + @client = TCPSocket.new(@host, @port) + @to_close << @client + t("/missing -1", "size /missing") + end + + def test_md5_feed + buf = ' ' * 3 + 50.times do |i| + pfx = (rand * i).to_s[0..rand(30)] + "a" + expect = "/#{pfx} MD5=#{Digest::MD5.hexdigest(buf)}\r\n" + File.open("#@tmpdir/#{pfx}", "wb") { |fp| fp.write(buf) } + nr = 100 + th = Thread.new { @client.write("MD5 /#{pfx}\r\n" * nr) } + nr.times do |j| + assert_equal expect, @client.gets, "j=#{j}" # pfx=#{pfx} i=#{i}" + end + th.join + end + end + + def test_size_huge + Dir.mkdir("#@tmpdir/dev666") + big = 2 * 1024 * 1024 * 1024 * 1020 # 2TB + File.open("#@tmpdir/dev666/sparse-file.fid", "w") do |fp| + begin + fp.seek(big - 1) + rescue Errno::EINVAL + big /= 2 + warn "trying large file size: #{big}" + retry + end + fp.write('.') + end + t("/dev666/sparse-file.fid #{big}", "size /dev666/sparse-file.fid") + rescue Errno::ENOSPC + end + + def test_iostat_watch + Dir.mkdir("#@tmpdir/dev666") + @client.write "watch\n" + + # wait for iostat to catch up + 2.times { assert_kind_of String, @client.gets } + + assert_match(/^666\t\d+?\.\d\d\n/, @client.gets) + assert_equal ".\n", @client.gets + end if `which iostat 2>/dev/null`.chomp.size != 0 +end diff --git a/test/queue-1.c b/test/queue-1.c new file mode 100644 index 0000000..3218dad --- /dev/null +++ b/test/queue-1.c @@ -0,0 +1,45 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" + +int main(void) +{ + struct mog_queue *q = mog_queue_new(); + struct mog_fd *in = mog_fd_get(STDIN_FILENO); + struct mog_fd *out = mog_fd_get(STDOUT_FILENO); + struct mog_fd *err = mog_fd_get(STDERR_FILENO); + struct mog_fd *took; + + assert(q && in && "not initialized"); + + in->queue_state = MOG_QUEUE_STATE_OLD; + out->queue_state = MOG_QUEUE_STATE_OLD; + err->queue_state = MOG_QUEUE_STATE_OLD; + + mog_activeq_push(q, in); + + took = mog_activeq_trytake(q); + assert(in == took && "took what we pushed in"); + took = mog_activeq_trytake(q); + assert(NULL == took && "empty queue"); + + mog_activeq_push(q, in); + mog_activeq_push(q, out); + mog_activeq_push(q, err); + + took = mog_activeq_trytake(q); + assert(in == took && "stdin in-order"); + + took = mog_activeq_trytake(q); + assert(out == took && "stdout in-order"); + + took = mog_activeq_trytake(q); + assert(err == took && "stderr in-order"); + + took = mog_activeq_trytake(q); + assert(NULL == took && "empty queue"); + + return 0; +} diff --git a/test/queue-epoll-1.c b/test/queue-epoll-1.c new file mode 100644 index 0000000..a7f40b4 --- /dev/null +++ b/test/queue-epoll-1.c @@ -0,0 +1,72 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" +static int fds[2]; +static char buf[128]; +static struct mog_queue *q; +static struct mog_fd *mfd; + +static void setup(void) +{ + q = mog_queue_new(); + pipe_or_die(fds); + mfd = mog_fd_get(fds[0]); + mfd->fd = fds[0]; + mfd->queue_state = MOG_QUEUE_STATE_NEW; + + set_nonblocking_flag(fds[0], true); + assert(read(fds[0], buf, sizeof(buf)) == -1 && + errno == EAGAIN && "read() should EAGAIN"); +} + +static void teardown(void) +{ + close_pipe(fds); +} + +static void test_nonblocking(void) +{ + setup(); + + mog_idleq_push(q, mfd, MOG_QEV_RD); + assert(NULL == mog_idleq_wait(q, 1) && "q wait should return NULL"); + assert(1 == write(fds[1], ".", 1) && "couldn't write"); + assert(mfd == mog_idleq_wait(q, 1) && "q wait should return mfd"); + + teardown(); +} + +static void * wait_then_write(void *arg) +{ + sleep(1); + assert(1 == write(fds[1], "B", 1) && "couldn't write"); + + return NULL; +} + +static void test_blocking(void) +{ + pthread_t thr; + + setup(); + + mog_idleq_push(q, mfd, MOG_QEV_RD); + CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL)); + printf("start wait: %d\n", (int)time(NULL)); + assert(mfd == mog_idleq_wait(q, 0)); + printf(" end wait: %d\n", (int)time(NULL)); + assert(1 == read(fds[0], buf, 1) && "read failed"); + assert(buf[0] == 'B' && "didn't read expected 'B'"); + + teardown(); +} + +int main(void) +{ + test_nonblocking(); + test_blocking(); + + return 0; +} diff --git a/test/ruby-parallel.mk b/test/ruby-parallel.mk new file mode 100755 index 0000000..0ee1bed --- /dev/null +++ b/test/ruby-parallel.mk @@ -0,0 +1,8 @@ +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) +RUBY = ruby +TESTS := $(shell awk '/def test_/{print FILENAME"--"$$2".n"}' $(FILE)) +all:: $(TESTS) +%.n: arg = $(subst .n,,$(subst --, -n ,$@)) +%.n: + @$(RUBY) -w $(arg) -v diff --git a/test/ruby-parallel.sh b/test/ruby-parallel.sh new file mode 100755 index 0000000..406f5e1 --- /dev/null +++ b/test/ruby-parallel.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (see COPYING for details) + +# unfortunately I haven't figured out how to make this play nice with +# the GNU make jobserver, so we'll outright ignore it... +unset MAKEFLAGS MAKEFILES MAKEOVERRIDES MAKELEVEL MFLAGS +if test -z "$JOBS" +then + JOBS=$(nproc 2>/dev/null) # nproc is in newer GNU coreutils + if test -z "$JOBS" + then + JOBS=1 + else + JOBS=$(($JOBS * 2)) + fi +fi + +exec ${MAKE-make} -j$JOBS -f $(dirname $0)/ruby-parallel.mk FILE="$1" diff --git a/test/thrpool-1.c b/test/thrpool-1.c new file mode 100644 index 0000000..0812401 --- /dev/null +++ b/test/thrpool-1.c @@ -0,0 +1,60 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +/* ensure we can start and stop thread pools properly */ +#include "check.h" +#include <sys/time.h> + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static struct timeval tv; + +void *fn(void *xarg) +{ + const char *s = xarg; + assert(strcmp("whazzup", s) == 0 && "arg passed wrong"); + + for (;;) { + struct timespec t; + t.tv_nsec = tv.tv_usec * 1000; + t.tv_sec = tv.tv_sec + 1; + if (0 && t.tv_nsec >= 1000000000) { + t.tv_nsec -= 1000000000; + t.tv_sec++; + } + + mog_cancel_disable(); + CHECK(int, 0, pthread_mutex_lock(&lock)); + pthread_cond_timedwait(&cond, &lock, &t); + CHECK(int, 0, pthread_mutex_unlock(&lock)); + mog_cancel_enable(); + pthread_testcancel(); + } + assert(strcmp("whazzup", s) == 0 && "arg changed"); + + return NULL; +} + +int main(void) +{ + static struct mog_thrpool tp; + char *tmp = xstrdup("whazzup"); + struct timespec t; + + CHECK(int, 0, gettimeofday(&tv, NULL)); + t.tv_nsec = tv.tv_usec * 1000; + t.tv_sec = tv.tv_sec + 1; + + mog_thrpool_start(&tp, 6, fn, (void *)tmp); + + CHECK(int, 0, pthread_mutex_lock(&lock)); + CHECK(int, ETIMEDOUT, pthread_cond_timedwait(&cond, &lock, &t)); + CHECK(int, 0, pthread_mutex_unlock(&lock)); + + mog_thrpool_quit(&tp); + + free(tmp); + + return 0; +} diff --git a/test/trywrite-1.c b/test/trywrite-1.c new file mode 100644 index 0000000..6709815 --- /dev/null +++ b/test/trywrite-1.c @@ -0,0 +1,148 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" +#include "iov_str.h" + +static void writev_simple(void) +{ + struct iovec iov[1]; + int fds[2]; + + pipe_or_die(fds); + + IOV_STR(iov, "HELLO"); + { + void *x = mog_trywritev(fds[1], iov, 1); + + assert(x == NULL && "couldn't write 5 bytes to pipe"); + } + + close_pipe(fds); +} + +static void writev_wrong(void) +{ + struct iovec iov[1]; + int fds[2]; + + pipe_or_die(fds); + + IOV_STR(iov, "HELLO"); + { + void *x = mog_trywritev(fds[0], iov, 1); + assert(x == MOG_WR_ERROR && "did not return error"); + } + + close_pipe(fds); + + { + void *x = mog_trywritev(fds[1], iov, 1); + + assert(x == MOG_WR_ERROR && "did not return error"); + } +} + +static void writev_buffer(void) +{ + struct iovec iov[1]; + int fds[2]; + ssize_t total = 0; + void *x; + struct mog_wbuf *wbuf = NULL; + int nread; + enum mog_write_state state; + + pipe_or_die(fds); + + IOV_STR(iov, "HELLO"); + + set_nonblocking_flag(fds[1], true); + + for (;;) { + x = mog_trywritev(fds[1], iov, 1); + + if (x == NULL) { + total += iov[0].iov_len = 5; + } else if (x == MOG_WR_ERROR) { + assert(0 && "unexpected MOG_WR_ERROR"); + } else { + assert(x && "wtf"); + wbuf = x; + break; + } + } + + assert(wbuf && "wbuf not initialized, how did we break from loop?"); + + /* pipe is blocked */ + { + ioctl(fds[0], FIONREAD, &nread); + assert(nread == total && "nread != total"); + } + + /* pipe should be busy */ + { + state = mog_tryflush(fds[1], &wbuf); + assert(MOG_WRSTATE_BUSY == state && "not busy?"); + assert(wbuf && "wbuf got nulled"); + } + + /* drain some */ + { + char * tmp = xmalloc(nread); + ssize_t r = read(fds[0], tmp, nread); + + assert(r == nread && "couldn't drain all"); + + free(tmp); + } + + /* flush (succeed) */ + { + state = mog_tryflush(fds[1], &wbuf); + assert(MOG_WRSTATE_DONE == state && "didn't finish"); + assert(wbuf == NULL && "didn't null"); + } + + close_pipe(fds); +} + +static void writev_big(size_t len) +{ + struct iovec iov[3]; + void *ptrv[3]; + int fds[2]; + void *x; + int i; + + pipe_or_die(fds); + + for (i = 0; i < 3; i++) { + ptrv[i] = iov[i].iov_base = xmalloc(len); + iov[i].iov_len = len; + memset(ptrv[i], 'a' + i, len); + } + + set_nonblocking_flag(fds[1], true); + x = mog_trywritev(fds[1], iov, 3); + assert(x != MOG_WR_ERROR && x != NULL && "did not buffer on busy"); + free(x); + + close_pipe(fds); + + for (i = 0; i < 3; i++) + free(ptrv[i]); +} + +int main(void) +{ + writev_simple(); + writev_wrong(); + writev_buffer(); + writev_big(166331); + writev_big(65536); + + return 0; +} diff --git a/test/valid-path-1.c b/test/valid-path-1.c new file mode 100644 index 0000000..6687d59 --- /dev/null +++ b/test/valid-path-1.c @@ -0,0 +1,19 @@ +/* + * Copyright (C) Eric Wong, 2012 + * License: GPLv3 or later (see COPYING for details) + */ +#include "check.h" +#define VP(path) mog_valid_path((path), sizeof(path) - 1) + +int main(void) +{ + assert(0 == VP("hello/..")); + assert(1 == VP("hello/")); + assert(1 == VP("hello.fid")); + assert(0 == VP("../hello.fid")); + assert(0 == VP("/../hello.fid")); + assert(0 == VP("/hello.fid/..")); + assert(0 == VP("/hello/../fid")); + + return 0; +} diff --git a/thrpool.c b/thrpool.c new file mode 100644 index 0000000..fcd5a04 --- /dev/null +++ b/thrpool.c @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* we can lower this if we can test with lower values, but NPTL is 16K */ +#define MOG_THR_STACK_SIZE ((16 * 1024)) + +#if defined(PTHREAD_STACK_MIN) && (PTHREAD_STACK_MIN > MOG_THR_STACK_SIZE) +# undef MOG_THR_STACK_SIZE +# define MOG_THR_STACK_SIZE PTHREAD_STACK_MIN +#endif + +static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; + +void +mog_thrpool_start(struct mog_thrpool *tp, size_t n, + void *(*start_fn)(void *), void *arg) +{ + size_t i; + pthread_t *thr; + + tp->threads = thr = xmalloc(sizeof(pthread_t) * n); + tp->n_threads = n; + + for (i = 0; i < n; i++, thr++) { + pthread_attr_t attr; + + CHECK(int, 0, pthread_attr_init(&attr)); + CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize)); + CHECK(int, 0, pthread_create(thr, &attr, start_fn, arg)); + CHECK(int, 0, pthread_attr_destroy(&attr)); + } +} + +void mog_thrpool_quit(struct mog_thrpool *tp) +{ + pthread_t *thr = tp->threads; + + for (; tp->n_threads--; thr++) { + CHECK(int, 0, pthread_cancel(*thr)); + CHECK(int, 0, pthread_join(*thr, NULL)); + } + mog_free_and_null(&tp->threads); +} diff --git a/trywrite.c b/trywrite.c new file mode 100644 index 0000000..2e0fca7 --- /dev/null +++ b/trywrite.c @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +struct mog_wbuf { + size_t len; + size_t off; + unsigned char buf[FLEXIBLE_ARRAY_MEMBER]; +}; + +static void * wbuf_new(size_t total, struct iovec *iov, int iovcnt) +{ + struct mog_wbuf *wbuf = mog_cachealign(sizeof(struct mog_wbuf) + total); + void *dst = wbuf->buf; + int i; + + wbuf->len = total; + wbuf->off = 0; + + for (i = 0; i < iovcnt; i++) + dst = mempcpy(dst, iov[i].iov_base, iov[i].iov_len); + + return wbuf; +} + +enum mog_write_state mog_tryflush(int fd, struct mog_wbuf **x) +{ + struct mog_wbuf *wbuf = *x; + unsigned char *ptr = wbuf->buf + wbuf->off; + size_t len = wbuf->len - wbuf->off; + + for (;;) { + ssize_t w = write(fd, ptr, len); + + if (w == len) { + mog_free_and_null(x); + return MOG_WRSTATE_DONE; + } + if (w >= 0) { + wbuf->off += w; + ptr += w; + len -= w; + + continue; + } + + assert(w < 0 && "no error from write(2)"); + + switch (errno) { + case_EAGAIN: return MOG_WRSTATE_BUSY; + case EINTR: continue; + } + + mog_free_and_null(x); + return MOG_WRSTATE_ERR; + } +} + +/* + * returns + * - NULL on full write + * - MOG_WR_ERROR on error (and sets errno) + * - address to a new mog_wbuf with unbuffered contents on partial write + */ +void * mog_trywritev(int fd, struct iovec *iov, int iovcnt) +{ + ssize_t total = 0; + ssize_t w; + int i; + + for (i = 0; i < iovcnt; i++) + total += iov[i].iov_len; + + if (total == 0) + return NULL; +retry: + w = writev(fd, iov, iovcnt); + + if (w == total) { + return NULL; + } else if (w < 0) { + switch (errno) { + case_EAGAIN: return wbuf_new(total, iov, iovcnt); + case EINTR: goto retry; + } + return MOG_WR_ERROR; + } else { + struct iovec *new_iov = iov; + + total -= w; + + /* skip over iovecs we've already written completely */ + for (i = 0; i < iovcnt; i++, new_iov++) { + if (w == 0) + break; + /* + * partially written iovec, + * modify and retry with current iovec in front + */ + if (new_iov->iov_len > (size_t)w) { + unsigned char *base = new_iov->iov_base; + + new_iov->iov_len -= w; + base += w; + new_iov->iov_base = (void *)base; + break; + } + + w -= new_iov->iov_len; + } + + /* retry without the already-written iovecs */ + iovcnt -= i; + iov = new_iov; + goto retry; + } +} @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ + +/* + * some systems define EWOULDBLOCK to a different value than EAGAIN, + * but POSIX allows them to be identical. + */ +#if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN) +# define case_EAGAIN case EAGAIN: case EWOULDBLOCK +#else +# define case_EAGAIN case EAGAIN +#endif + +/* free(3) causes compiler warnings on const, so we de-const here */ +static inline void mog_free(const void *ptr) +{ + union { const void *in; void *out; } deconst = { .in = ptr }; + + free(deconst.out); +} + +#define PRESERVE_ERRNO(code) do { \ + int save_err = errno; \ + code; \ + errno = save_err; \ +} while (0) + +# define CHECK(type, expect, expr) do { \ + type checkvar = (expr); \ + assert(checkvar==(expect)&& "BUG" && __FILE__ && __LINE__); \ + } while (0) + +static inline void mog_cancel_enable(void) +{ + int old; + + CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old)); + assert(old == PTHREAD_CANCEL_DISABLE && "redundant cancel enable"); +} + +static inline void mog_cancel_disable(void) +{ + int old; + + CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old)); + assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable"); +} diff --git a/valid_path.rl b/valid_path.rl new file mode 100644 index 0000000..a5019e6 --- /dev/null +++ b/valid_path.rl @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" + +/* + * we could just use strstr(), but it's buggy on some glibc and + * we can expand this later (to tighten down to non-FIDs, for example) + */ +%%{ + machine path_traversal; + main := any* ("..") @ { found = true; fbreak; } any*; +}%% + +%% write data; + +static bool path_traversal_found(const char *buf, size_t len) +{ + const char *p, *pe; + bool found = false; + int cs; + %% write init; + + p = buf; + pe = buf + len; + %% write exec; + + return found; +} + +int mog_valid_path(const char *buf, size_t len) +{ + /* TODO: update if MogileFS supports FIDs >= 10,000,000,000 */ + if (len >= (sizeof("/dev16777215/0/000/000/0123456789.fid"))) + return 0; + + return ! path_traversal_found(buf, len); +} @@ -0,0 +1,13 @@ +/* + * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> + * License: GPLv3 or later (see COPYING for details) + */ +#include "cmogstored.h" +void warn(const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + vdprintf(STDERR_FILENO, fmt, ap); + va_end(ap); +} |