Hi there,
I'd like to contribute the following modules to Pike:
. A non blocking SMTP/LMTP server API. I borrowed some code from a SMTP
server for Roxen copyright Mike Knott, David Hedbor and On The Verge
these parts are protected under the GPL version 2.
I'd like to know if the Pike team and them agree or not for the
addition.
For now I did not really test the SMTP server facing the real Internet
world. However I use the LMTP server (which inherit the SMTP server)
for some months now on a production server behind a Postfix server.
. A non blocking process starter with the features of
Process.create_process without the need to use the wait() of
Process.spawn (it works with callback functions).
--
David Gourdelier
class connection {
inherit .SMTP.connection;
array(string) commands = ({ "lhlo", "mail", "rcpt", "data", "rset", "vrfy", "quit", "noop" });
};
class server {
private object fdport;
private array(string) domains;
private function cb_mailfrom;
private function cb_rcptto;
private function cb_data;
private void accept_callback()
{
object fd = fdport->accept();
if(!fd)
error("Can't accept connections from socket\n");
connection(fd, domains, cb_mailfrom, cb_rcptto, cb_data);
destruct(fd);
}
void create(array(string) _domains, void|int port, void|string ip, function _cb_mailfrom,
function _cb_rcptto, function _cb_data)
{
domains = _domains;
cb_mailfrom = _cb_mailfrom;
cb_rcptto = _cb_rcptto;
cb_data = _cb_data;
if(!port)
port = 26;
fdport = Stdio.Port(port, accept_callback, ip);
if(!fdport)
{
error("Cannot bind to socket, already bound ?\n");
}
}
};
/*
A SMTP server
19 September 2003 <vida(a)caudium.net> David Gourdelier
Some parts of this code © Mike Knott, David Hedbor and On The Verge.
The use of Protocols.SMTP.server is quite easy and allow you
to design custom functions to process mail. This module does not
handle mail storage and has not been very tested, please test it
before using it and report me feedback.
First call the module with
Protocols.SMTP.server(array(string) domains, int port, string listenip,
function cb_mailfrom, function cb_rcptto, function cb_data);
domain : domains name this server relay, you need to provide at least one
domain (the first one will be used for MAILER-DAEMON address).
if you want to relay everything you can put a '*' after this
first domain.
port : port this server listen on
listenip : ip on which server listen
cb_mailfrom : mailfrom callback function, this function will be called
when a client send a mail from command. This function must take a string
(corresponding to the sender's email) and return an SMTP code to output to
the client.
cb_rcptto : same as cb_mailfrom but called when a client sends a rcpt to.
cb_data : This function is called each time a client send a data content.
It must have the following synopsis:
int cb_data(object mime, string sender, array(string) recipients)
object mime : the mime data object
sender : sender of the mail (from the mailfrom command)
recipients : one or more recipients given by the rcpt to command
return : SMTP code to output to the client
Here is an example of silly program that does nothing except outputting
informations to stdout.
int cb_mailfrom(string mail)
{
return 250;
}
int cb_rcptto(string email)
{
// check the user's mailbox here
return 250;
}
int cb_data(object mime, string sender, array(string) recipients)
{
write(sprintf("smtpd: mailfrom=%s, to=%s, headers=%O\ndata=%s\n",
sender, recipients * ", ", mime->headers, mime->getdata()));
// check the data and deliver the mail here
if(mime->body_parts)
{
foreach(mime->body_parts, object mpart)
write(sprintf("smtpd: mpart data = %O\n", mpart->getdata()));
}
return 250;
}
int main(int argc, array(string) argv)
{
Protocols.SMTP.server(({ "ece.fr" }), 2500, "127.0.0.1", cb_mailfrom, cb_rcptto, cb_data);
return -1;
}
*/
/* Implements:
- RFC 2821
- RFC 2822
- RFC 1854
*/
class configuration {
// message max size
int maxsize = 100240000;
// max number of recipients
int maxrcpt = 1000;
// verify sender domain for MX
int checkdns = 0;
// lamme check email from validity
int checkemail = 1;
// give raw data and normal MIME data, if set to
// yes your cb_data function should take an extra
// string argument
int givedata = 1;
};
class connection {
inherit configuration;
// the commands this module supports
array(string) commands = ({ "ehlo", "helo", "mail", "rcpt", "data", "rset", "vrfy", "quit", "noop" });
// the fd of the socket
private object fd = Stdio.File();
// the domains for each i relay
private array(string) mydomains = ({ });
// the input buffer for read_cb
private string inputbuffer = "";
// the size of the old data string in read_cb
private int sizeofpreviousdata = 0;
// the from address
private string mailfrom = "";
// to the address(es)
private array(string) mailto = ({ });
// the ident we get from ehlo/helo
private string ident = "";
// these are obvious
private string remoteaddr, localaddr;
private int localport;
// my name
private string localhost = gethostname();
// the sequence of commands the client send
private array(string) sequence = ({ });
// the callback functions used to guess if user is ok or not
private function cb_rcptto;
private function cb_data;
private function cb_mailfrom;
// whether you are in data mode or not...
int datamode = 0;
constant reply_codes =
([ 211:"System status, or system help reply",
214:"Help message",
220:"<host> Service ready",
221:"<host> Service closing transmission channel",
250:"Requested mail action okay, completed",
252:"Cannot VRFY user, but will accept message and attempt delivery",
251:"User not local; will forward to <forward-path>",
354:"Start mail input; end with <CRLF>.<CRLF>",
421:"<host> Service not available, closing transmission channel "
"[This may be a reply to any command if the service knows it "
"must shut down]",
450:"Requested mail action not taken: mailbox unavailable "
"[E.g., mailbox busy]",
451:"Requested action aborted: local error in processing",
452:"Requested action not taken: insufficient system storage",
500:"Syntax error, command unrecognized "
"[This may include errors such as command line too long]",
501:"Syntax error in parameters or arguments",
502:"Command not implemented",
503:"Bad sequence of commands",
504:"Command parameter not implemented",
550:"Requested action not taken: mailbox unavailable "
"[E.g., mailbox not found, no access]",
551:"User not local; please try <forward-path>",
552:"Requested mail action aborted: exceeded storage allocation",
553:"Requested action not taken: mailbox name not allowed "
"[E.g., mailbox syntax incorrect or relaying denied]",
554:"Transaction failed" ]);
array(string) features = ({ "PIPELINING", "8BITMIME", "SIZE " + maxsize });
private void handle_timeout()
{
catch(fd->write("421 Error: timeout exceeded\r\n"));
close_cb();
}
private void outcode(int code)
{
fd->write(sprintf("%d %s\r\n", code, reply_codes[code]));
#ifdef DEBUG
log(sprintf("%d %s\r\n", code, reply_codes[code]));
#endif
}
private void log(string what)
{
werror("Pike SMTP server : " + what + "\n");
}
/* begin copyright GPL code Mike Knott, David Hedbor and On The Verge */
// make the received header
private string received(int messageid)
{
string remotehost =
Protocols.DNS.client()->gethostbyaddr(remoteaddr)[0]
|| remoteaddr;
string rec;
rec=sprintf("from %s (%s [%s]) "
"by %s (PSS) with %s id %d ; %s",
ident, remotehost, remoteaddr,
localhost, "ESMTP", messageid,
ctime(time())- "\n");
return rec;
}
/* end copyrighted code */
void helo(string argument)
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
if(sizeof(argument) > 0)
{
fd->write(sprintf("250 %s\r\n", localhost));
ident = argument;
#ifdef DEBUG
log(sprintf("helo from %s", ident));
#endif
sequence += ({ "helo" });
}
else
outcode(501);
}
void ehlo(string argument)
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
if(sizeof(argument) > 0)
{
string out = "250-" + localhost + "\r\n";
int i = 0;
for(; i < sizeof(features) - 1; i++)
{
out += "250-" + features[i] + "\r\n";
}
out += "250 " + features[i] + "\r\n";
fd->write(out);
sequence += ({ "ehlo" });
ident = argument;
#ifdef DEBUG
log(sprintf("helo from %s", ident));
#endif
}
else
outcode(501);
}
void lhlo(string argument)
{
ehlo(argument);
}
// fetch the email address from the mail from: or rcpt to: commands
// content: the input line like mail from:<toto(a)caudium.net>
// what: the action either from or to
int|string parse_email(string content, string what)
{
array parts = content / ":";
if(lower_case(parts[0]) != what)
return 500;
string validating_mail;
parts[1] = String.trim_all_whites(parts[1]);
if(!sscanf(parts[1], "<%s>", validating_mail))
sscanf(parts[1], "%s", validating_mail);
if(validating_mail == "")
validating_mail = "MAILER-DAEMON@" + mydomains[0];
array emailparts = validating_mail / "@";
array(string) temp = lower_case(emailparts[1]) / ".";
string domain = temp[sizeof(temp)-2..] * ".";
if(checkemail && sizeof(emailparts) != 2)
{
log(sprintf("invalid mail address '%O', command=%O\n", emailparts, what));
return 553;
}
if(checkdns)
{
write("checking dns\n");
if(what == "from" && !Protocols.DNS.client()->get_primary_mx(domain))
{
log(sprintf("check dns failed, command=%O, domain=%O\n", what, domain));
return 553;
}
}
if(what == "to" && search(mydomains, domain) == -1 && search(mydomains, "*") == -1)
{
log(sprintf("relaying denied, command=%O, mydomains=%O, domain=%O\n", what, mydomains, domain));
return 553;
}
return validating_mail;
}
void mail(string argument)
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
int sequence_ok = 0;
foreach(({ "ehlo", "helo", "lhlo" }), string needle)
{
if(search(sequence, needle) != -1)
sequence_ok = 1;
}
if(sequence_ok)
{
mixed email = parse_email(argument, "from");
if(intp(email))
outcode(email);
else
{
mixed err;
int check;
err = catch(check = cb_mailfrom(email));
if(err || !check)
{
outcode(451);
log(describe_backtrace(err));
return;
}
if(check/100 == 2)
{
mailfrom = email;
mailto = ({ });
/* this is used to avoid this problem:
250 Requested mail action okay, completed
mail from: vida(a)caudium.net
250 Requested mail action okay, completed
rcpt to: toto(a)ece.Fr
250 Requested mail action okay, completed
mail from: vida(a)caudium.net
250 Requested mail action okay, completed
rcpt to: tux(a)iteam.org
553 Requested action not taken: mailbox name not allowed [E.g., mailbox syntax incorrect or relaying denied]
data
354 Start mail input; end with <CRLF>.<CRLF>
*/
sequence -= ({ "rcpt to" });
sequence += ({ "mail from" });
}
outcode(check);
}
}
else
outcode(503);
}
void rcpt(string argument)
{
mixed err;
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
if(search(sequence, "mail from") == -1)
{
outcode(503);
return;
}
if(sizeof(mailto) >= maxrcpt)
{
outcode(552);
return;
}
mixed email = parse_email(argument, "to");
if(intp(email))
outcode(email);
else
{
int check;
err = catch(check = cb_rcptto(email));
if(err || !check)
{
outcode(451);
log(describe_backtrace(err));
return;
}
if(check/100 == 2)
{
mailto += ({ email });
sequence += ({ "rcpt to" });
}
outcode(check);
}
}
void data(string argument)
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
if(search(sequence, "rcpt to") == -1)
{
outcode(503);
return;
}
datamode = 1;
outcode(354);
}
void message(string content)
{
datamode = 0;
if(sizeof(content) > maxsize)
{
outcode(552);
return;
}
object message;
int messageid = hash(content);
/* begin copyright GPL code Mike Knott, David Hedbor and On The Verge */
mixed err = catch (message = MIME.Message(content));
if(err)
{
outcode(554);
log(describe_backtrace(err));
log(sprintf("content is %O\n", content));
return;
}
err = catch {
if(!sizeof(message->headers))
{
// Data without headers...
message=MIME.Message(content,([ "to": "Undisclosed.recipients",
"from": mailfrom,
"received":received(messageid)]));
}
else if(message->headers->received && sizeof(message->headers->received))
{
message->headers->received=received(messageid)+"\0"+message->headers->received;
}
else
{
message->headers->received=received(messageid);
}
if(!message->headers["message-id"])
{
message->headers["message-id"]=sprintf("<%d@%s>",messageid, localhost);
}
/* end copyrighted code */
};
if(err)
{
outcode(554);
log(describe_backtrace(err));
return;
}
int check;
if(givedata)
err = catch(check = cb_data(message, mailfrom, mailto, content));
else
err = catch(check = cb_data(message, mailfrom, mailto));
if(err || !check)
{
outcode(554);
log(describe_backtrace(err));
return;
}
outcode(check);
}
void noop()
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
outcode(250);
}
void rset()
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
inputbuffer = "";
mailfrom = "";
mailto = ({ });
//sequence = ({ });
outcode(250);
}
void vrfy()
{
remove_call_out(handle_timeout);
call_out(handle_timeout, 300);
outcode(252);
}
void quit()
{
fd->write("221 " + replace(reply_codes[221], "<host>", localhost) + "\r\n");
close_cb();
}
private int launch_functions(string line)
{
array(string) command = line / " ";
// success
if(sizeof(command) > 0)
{
string _command = lower_case(command[0]);
mixed err = 0;
if(search(commands, _command) != -1)
{
err = catch
{
#ifdef DEBUG
log(sprintf("calling %O\n", _command));
#endif
function fun = this_object()[_command];
fun(command[1..] * " ");
};
}
else
{
log(sprintf("command %O not recognized", _command));
outcode(500);
}
if(err)
{
log(sprintf("error while executing command %O", _command));
outcode(554);
}
}
}
private void read_cb(mixed id, string data)
{
string pattern;
int bufferposition;
inputbuffer += replace(data, "\r\n", "\n");
int sizeofdata = sizeof(data);
// optimization : don't search all the data, only the last one
int searchpos = sizeof(inputbuffer) - sizeofpreviousdata-sizeofdata;
sizeofpreviousdata = sizeofdata;
if(searchpos < 0)
searchpos = 0;
datamode ? (pattern = "\n.\n"):(pattern = "\n");
bufferposition = search(inputbuffer, pattern, searchpos);
while(bufferposition != -1)
{
#ifdef DEBUG
write(sprintf("buffposition=%d, inputbuffer=%O\n", bufferposition, inputbuffer));
#endif
bufferposition += sizeof(pattern);
int end = bufferposition-(1+sizeof(pattern));
if(!datamode)
{
launch_functions(inputbuffer[..end]);
if(lower_case(inputbuffer[..end]) == "quit")
{
destruct(this_object());
return;
}
pattern = "\n";
}
if(datamode)
{
if(pattern=="\n.\n")
message(inputbuffer[..end+1]);
pattern = "\n.\n";
}
// end of buffer detection
if(bufferposition + sizeof(pattern) >= sizeof(inputbuffer))
{
#ifdef DEBUG
write("breaking\n");
#endif
inputbuffer = "";
break;
}
inputbuffer = inputbuffer[bufferposition..];
bufferposition = search(inputbuffer, pattern);
}
}
private void write_cb()
{
fd->write("220 " + replace(reply_codes[220], "<host>", localhost) + "\r\n");
fd->set_write_callback(0);
}
private void close_cb()
{
catch (fd->close());
remove_call_out(handle_timeout);
}
void create(object _fd, array(string) _domains, function _cb_mailfrom, function _cb_rcptto, function _cb_data)
{
foreach(_domains, string domain)
mydomains += ({ lower_case(domain) });
cb_mailfrom = _cb_mailfrom;
cb_rcptto = _cb_rcptto;
cb_data = _cb_data;
fd->assign(_fd);
catch(remoteaddr=((fd->query_address()||"")/" ")[0]);
catch(localaddr=((fd->query_address(1)||"")/" ")[0]);
catch(localport=(int)((fd->query_address(1)||"")/" ")[1]);
if(!remoteaddr)
{
fd->write("421 " + replace(reply_codes[421], "<host>", localhost) + "\r\n");
close_cb();
return;
}
if(!localaddr)
{
fd->write("421 " + replace(reply_codes[421], "<host>", localhost) + "\r\n");
close_cb();
return;
}
//log(sprintf("connection from %s to %s:%d", remoteaddr, localaddr, localport));
fd->set_nonblocking(read_cb, write_cb, close_cb);
call_out(handle_timeout, 300);
}
};
class server {
private object fdport;
private array(string) domains;
private function cb_mailfrom;
private function cb_rcptto;
private function cb_data;
private void accept_callback()
{
object fd = fdport->accept();
if(!fd)
error("Can't accept connections from socket\n");
connection(fd, domains, cb_mailfrom, cb_rcptto, cb_data);
destruct(fd);
}
void create(array(string) _domains, void|int port, void|string ip, function _cb_mailfrom,
function _cb_rcptto, function _cb_data)
{
domains = _domains;
cb_mailfrom = _cb_mailfrom;
cb_rcptto = _cb_rcptto;
cb_data = _cb_data;
if(!port)
port = 25;
fdport = Stdio.Port(port, accept_callback, ip);
if(!fdport)
{
error("Cannot bind to socket, already bound ?\n");
}
}
};
/*
A non blocking Process starter for Pike
19 September 2003 <vida(a)caudium.net> David Gourdelier
Example of use:
void normal_finish(string res)
{
write("process finished with res %s\n", res);
}
void timeout()
{
write("process timeout\n");
exit(1);
}
int main(int argc, array(string) argv)
{
create_nbprocess(({ argv[1] }), 3, normal_finish, timeout);
return -1;
}
*/
class create_nbprocess {
inherit Process.create_process;
private object file3;
private object process;
private function cb;
private function timeout_cb;
// start a process. It inherits Process.create_process
// command_args and modifiers are the same as in create_process
// timeout is the timeout in seconds after which we will kill the process
// and call back the timeout_callback function
// calllback function is called when process has finished
void create(array(string) command_args, int timeout,
void|function(string:void) callback, void|function(void:void) timeout_callback,
void|mapping modifiers)
{
object file1 = Stdio.File();
object file2 = file1->pipe();
file3 = Stdio.File();
object file4 = file3->pipe();
modifiers += ([ "stdin": file2, "stdout": file4 ]);
cb = callback;
timeout_cb = timeout_callback;
process = ::create(command_args, modifiers);
call_out(watcher, 0.1);
call_out(killer, timeout);
file1->close();
file2->close();
file4->close();
}
void destroy()
{
remove_call_out(watcher);
remove_call_out(killer);
if(file3)
{
catch(file3->close());
destruct(file3);
}
}
private void watcher()
{
// it was another sigchld but not the one from our process
if(::status() == 0)
{
call_out(watcher, 0.1);
}
else
{
/* process has finished */
remove_call_out(watcher);
string res = file3->read();
file3->close();
remove_call_out(killer);
if(cb)
cb(res);
}
}
private void killer()
{
remove_call_out(killer);
remove_call_out(watcher);
catch(file3->close());
#if constant(kill)
::kill(signum("SIGKILL"));
#endif
if(timeout_cb)
timeout_cb();
}
};