#!/usr/bin/perl -w # This program creates a server session and an infinitude of clients # that connect to it, all in the same process. It's mainly used to # test for memory leaks, but it's also something of a benchmark. # It is possible to split this program into two separate processes: # Change $server_addr to something appropriate. # Make a second copy of this program. # In the "server" copy, comment out the call to &pool_create(); # In the "client" copy, comment out th ecall to &server_create(); use strict; use lib '../lib'; use Socket; #sub POE::Kernel::ASSERT_DEFAULT () { 1 } use POE qw(Wheel::ListenAccept Wheel::ReadWrite Driver::SysRW Filter::Line Wheel::SocketFactory ); sub MAX_SIMULTANEOUS_CLIENTS () { 5 } # make 1 to enable output sub DEBUG () { 0 } # address and port the server binds to my $server_addr = '127.0.0.1'; my $server_port = 32100; ############################################################################### # This is a single client session. It uses two separator wheels: a # SocketFactory to establish a connection, and a ReadWrite to process # data once the connection is made #------------------------------------------------------------------------------ # This is regular Perl sub that helps create new clients. It's not an # event handler. sub client_create { my $serial_number = shift; # create the session POE::Session->create( inline_states => { _start => \&client_start, _stop => \&client_stop, receive => \&client_receive, error => \&client_error, connected => \&client_connected, signals => \&client_signals, _parent => sub {}, }, # ARG0 args => [ $serial_number ] ); } #------------------------------------------------------------------------------ # Accept POE's standard _start event, and create a non-blocking client # socket. sub client_start { my ($kernel, $heap, $serial) = @_[KERNEL, HEAP, ARG0]; DEBUG && print "Client $serial is starting.\n"; # remember this client's serial number $heap->{'serial'} = $serial; # watch for SIGINT $kernel->sig('INT', 'signals'); # create a socket factory $heap->{'wheel'} = POE::Wheel::SocketFactory->new( RemoteAddress => $server_addr, # connecting to address $server_addr RemotePort => $server_port, # connecting to port $server_port SuccessEvent => 'connected', # generating this event when connected FailureEvent => 'error', # generating this event upon an error ); } #------------------------------------------------------------------------------ # Accept POE's standard _stop event. This normally would clean up the # session, but this program doesn't keep anything in the heap that # needs to be cleaned up. sub client_stop { my $heap = $_[HEAP]; DEBUG && print "Client $heap->{'serial'} has stopped.\n"; } #------------------------------------------------------------------------------ # This event handler/state is invoked when a connection has been # established successfully. It replaces the SocketFactory wheel with # a ReadWrite wheel. The new wheel generates different events. sub client_connected { my ($heap, $socket) = @_[HEAP, ARG0]; die "possible filehandle leak" if fileno($socket) > 63; DEBUG && print "Client $heap->{'serial'} is connected.\n"; # switch to read/write behavior $heap->{'wheel'} = POE::Wheel::ReadWrite->new( Handle => $socket, # read and write on this socket Driver => POE::Driver::SysRW->new, # using sysread and syswrite Filter => POE::Filter::Line->new, # and parsing I/O as lines InputEvent => 'receive', # generating this event on input ErrorEvent => 'error', # generating this event on error ); shutdown($socket, 1); } #------------------------------------------------------------------------------ # This state is invoked by the ReadWrite wheel to process complete # chunks of input. sub client_receive { my ($heap, $line) = @_[HEAP, ARG0]; DEBUG && print "Client $heap->{'serial'} received: $line\n"; } #------------------------------------------------------------------------------ # This state is invoked by both the SocketFactory and the ReadWrite # wheels when an error occurs. sub client_error { my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2]; if (DEBUG) { if ($errnum) { print( "Client $heap->{'serial'} encountered ", "$operation error $errnum: $errstr\n" ); } else { print "Client $heap->{'serial'} the server closed the connection.\n"; } } # removing the wheel stops the session delete $heap->{'wheel'}; } #------------------------------------------------------------------------------ # Catch and log signals. Never handle them. sub client_signals { my ($heap, $signal_name) = @_[HEAP, ARG0]; DEBUG && print "Client $heap->{'serial'} caught SIG$signal_name\n"; # doesn't handle SIGINT, so it can stop return 0; } ############################################################################### # This is a client pool session. It ensures that at least five # clients are interacting with the server at any given time. # Actually, there are brief periods where only four clients are # connected. #------------------------------------------------------------------------------ # This is a regular Perl sub that helps create new client pools. It's # not an event handler. sub pool_create { # create the server POE::Session->create( inline_states => { _start => \&pool_start, _stop => \&pool_stop, signals => \&pool_signals, _child => \&pool_child, _parent => sub {}, }, ); } #------------------------------------------------------------------------------ # Accept POE's standard _start event. Initialize benchmark # accumulators, and start the first five clients. sub pool_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; DEBUG && print "Pool starting.\n"; # watch for SIGINT $kernel->sig('INT', 'signals'); # keep track of children $heap->{'children'} = 0; $heap->{'client serial'} = 0; $heap->{'state'} = 'running'; # benchmark accumulators $heap->{'bench start'} = time(); $heap->{'bench count'} = 0; # Start five clients. NOTE: This would not work if clients used # IO::Socket to connect to the server, because IO::Socket's connect # blocks. It would wait for the server to accept a connection # before continuing, which would never happen since this loop is # holding up the event queue. The program can only get away with # this loop because SocketFactory connections do not block. for (my $i = 0; $i < MAX_SIMULTANEOUS_CLIENTS; $i++) { &client_create(++$heap->{'client serial'}); } } #------------------------------------------------------------------------------ # Accept POE's standard stop event. Also stop the server. sub pool_stop { my $kernel = $_[KERNEL]; # send SIGQUIT to the server $kernel->signal('server', 'QUIT'); DEBUG && print "Pool has stopped.\n"; } #------------------------------------------------------------------------------ # Catch and log signals, but never handle them. sub pool_signals { my ($heap, $signal_name) = @_[HEAP, ARG0]; DEBUG && print "Pool caught SIG$signal_name\n"; # doesn't handle SIGINT, so it can stop return 0; } #------------------------------------------------------------------------------ # Keep track of child sessions, starting new ones to replace old ones # that are being lost. If debugging, and a time limit has been # reached, stop creating new clients. my %english = ( create => 'created', lose => 'lost', gain => 'gained' ); sub pool_child { my ($heap, $direction, $child) = @_[HEAP, ARG0, ARG1]; # lost a client if ($direction eq 'lose') { $heap->{'children'}--; # create a new one if still running if ($heap->{'state'} eq 'running') { &client_create(++$heap->{'client serial'}); } } # gained a client; keep track of it else { $heap->{'children'}++; $heap->{'bench count'}++; } DEBUG && print( "Pool $english{$direction} a child session ", "(now has $heap->{'children'}).\n" ); # track clients/second for benchmark my $elapsed = time() - $heap->{'bench start'}; if ($elapsed >= 10) { print "bench: ", $heap->{'bench count'}, ' / ', $elapsed, ' = ', $heap->{'bench count'} / $elapsed, "\n"; $heap->{'bench count'} = 0; $heap->{'bench start'} = time(); # limit run to 60 seconds if debugging if (DEBUG && (time() - $^T >= 60.0)) { $heap->{'state'} = 'quitting'; } } } ############################################################################### # This is a single server session. It is spawned by the daytime # server to handle incoming connections. #------------------------------------------------------------------------------ # This is a regular Perl sub that helps create new sessions. It's not # an event handler. sub session_create { my ($handle, $peer_host, $peer_port) = @_; # create the session POE::Session->create( inline_states => { _start => \&session_start, _stop => \&session_stop, receive => \&session_receive, flushed => \&session_flushed, error => \&session_error, signals => \&session_signals, _child => sub {}, _parent => sub {}, }, # ARG0, ARG1, ARG2 args => [ $handle, $peer_host, $peer_port ] ); } #------------------------------------------------------------------------------ # Accept POE's standard _start event, and start transacting with the # client. sub session_start { my ($kernel, $heap, $handle, $peer_host, $peer_port) = @_[KERNEL, HEAP, ARG0, ARG1, ARG2]; # make the address printable $peer_host = inet_ntoa($peer_host); DEBUG && print "Session with $peer_host $peer_port is starting.\n"; # watch for SIGINT $kernel->sig('INT', 'signals'); # record the client info for later $heap->{'host'} = $peer_host; $heap->{'port'} = $peer_port; # start reading and writing $heap->{'wheel'} = POE::Wheel::ReadWrite->new( Handle => $handle, # on the client's socket Driver => POE::Driver::SysRW->new, # using sysread and syswrite Filter => POE::Filter::Line->new, # and parsing I/O as lines InputEvent => 'receive', # generating this event on input ErrorEvent => 'error', # generating this event on error FlushedEvent => 'flushed', # generating this event on flush ); # give the client the time of day $heap->{'wheel'}->put( "Hi, $peer_host $peer_port! The time is: " . gmtime() . " GMT" ); } #------------------------------------------------------------------------------ # Accept POE's standard _stop event. This normally would clean up the # session, but this program doesn't keep anything in the heap that # needs to be cleaned up. sub session_stop { my $heap = $_[HEAP]; DEBUG && print "Session with $heap->{'host'} $heap->{'port'} has stopped.\n"; } #------------------------------------------------------------------------------ # This state is invoked by the ReadWrite wheel whenever a complete # request has been received. sub session_receive { my ($heap, $line) = @_[HEAP, ARG0]; DEBUG && print "Received from $heap->{'host'} $heap->{'port'}: $line\n"; } #------------------------------------------------------------------------------ # This state is invoked when the ReadWrite wheel encounters an error. sub session_error { my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2]; DEBUG && print( "Session with $heap->{'host'} $heap->{'port'} ", "encountered $operation error $errnum: $errstr\n" ); delete $heap->{'wheel'}; } #------------------------------------------------------------------------------ # This state is invoked when the ReadWrite wheel's output buffer # becomes empty. For a daytime server session, a flushed buffer means # it's okay to close the connection. sub session_flushed { my $heap = $_[HEAP]; DEBUG && print "Output to $heap->{'host'} $heap->{'port'} has flushed.\n"; # removing the wheel stops the session delete $heap->{'wheel'}; } #------------------------------------------------------------------------------ # Catch and log signals, but never handle them. sub session_signals { my ($heap, $signal_name) = @_[HEAP, ARG0]; DEBUG && print( "Session with $heap->{'host'} $heap->{'port'} ", "has received a SIG$signal_name\n" ); # doesn't handle SIGINT, so it can stop return 0; } ############################################################################### # This is a generic daytime server. Its only purpose is to listen on # a socket, accept connections, and spawn daytime sessions to handle # the connections. #------------------------------------------------------------------------------ # This is a regular Perl sub that helps create new servers. It's not # an event handler. sub server_create { # create the server POE::Session->create( inline_states => { _start => \&server_start, _stop => \&server_stop, accept_success => \&server_accept, accept_error => \&server_error, signals => \&server_signals, _child => sub {}, _parent => sub {}, } ); } #------------------------------------------------------------------------------ # Accept POE's standard _start event. Create a non-blocking server. sub server_start { my ($kernel, $heap) = @_[KERNEL, HEAP]; DEBUG && print "Daytime server is starting.\n"; # set an alias so pool_stop can signal $kernel->alias_set('server'); # watch for SIGINT and SIGQUIT $kernel->sig('INT', 'signals'); $kernel->sig('QUIT', 'signals'); # create a socket factory $heap->{'wheel'} = POE::Wheel::SocketFactory->new( BindAddress => $server_addr, # bind the listener to this address BindPort => $server_port, # bind the listener to this port Reuse => 'yes', # and reuse the socket right away SuccessEvent => 'accept_success', # generate this event for connections FailureEvent => 'accept_error', # generate this event for errors ); } #------------------------------------------------------------------------------ # Accept POE's standard _stop event. This normally would clean up the # session, but this program doesn't keep anything in the heap that # needs to be cleaned up. sub server_stop { my $heap = $_[HEAP]; DEBUG && print "Daytime server has stopped.\n"; } #------------------------------------------------------------------------------ # This state is invoked by the SocketFactory when an error occurs. sub server_error { my ($operation, $errnum, $errstr) = @_[ARG0, ARG1, ARG2]; DEBUG && print "Daytime server encountered $operation error $errnum: $errstr\n"; } #------------------------------------------------------------------------------ # The SocketFactory invokes this state when a new client connection # has been accepted. The parameters include the client socket, # address and port. sub server_accept { my ($handle, $host, $port) = @_[ARG0, ARG1, ARG2]; # spawn a server session die "possible filehandle leak" if fileno($handle) > 63; &session_create($handle, $host, $port); } #------------------------------------------------------------------------------ # Catch and log signals, but never handle them. sub server_signals { my $signal_name = $_[ARG0]; DEBUG && print "Daytime server caught SIG$signal_name\n"; # doesn't handle SIGINT, so it can stop return 0; } ############################################################################### # Start the daytime server and a pool of clients to transact with it. &server_create(); &pool_create(); $poe_kernel->run(); exit;